001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterException;
019 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
020 import com.liferay.portal.kernel.cluster.ClusterMessageType;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.log.Log;
025 import com.liferay.portal.kernel.log.LogFactoryUtil;
026 import com.liferay.portal.kernel.util.MethodHandler;
027
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.List;
031
032 import org.jgroups.Channel;
033 import org.jgroups.Message;
034 import org.jgroups.View;
035
036
040 public class ClusterRequestReceiver extends BaseReceiver {
041
042 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
043 _clusterExecutorImpl = clusterExecutorImpl;
044 }
045
046 @Override
047 protected void doReceive(Message message) {
048 Object obj = message.getObject();
049
050 if (obj == null) {
051 if (_log.isWarnEnabled()) {
052 _log.warn("Message content is null");
053 }
054
055 return;
056 }
057
058 Address sourceAddress = new AddressImpl(message.getSrc());
059
060 if (sourceAddress.equals(
061 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
062
063 boolean isProcessed = processLocalMessage(obj);
064
065 if (isProcessed) {
066 return;
067 }
068 }
069
070 if (obj instanceof ClusterRequest) {
071 ClusterRequest clusterRequest = (ClusterRequest)obj;
072
073 processClusterRequest(clusterRequest, sourceAddress);
074 }
075 else if (obj instanceof ClusterNodeResponse) {
076 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
077
078 processClusterResponse(clusterNodeResponse, sourceAddress);
079 }
080 else if (_log.isWarnEnabled()) {
081 _log.warn(
082 "Unable to process message content of type " + obj.getClass());
083 }
084 }
085
086 @Override
087 protected void doViewAccepted(View oldView, View newView) {
088 List<Address> departAddresses = getDepartAddresses(oldView, newView);
089 List<Address> newAddresses = getNewAddresses(oldView, newView);
090
091 if (!newAddresses.isEmpty()) {
092 _clusterExecutorImpl.sendNotifyRequest();
093 }
094
095 if (!departAddresses.isEmpty()) {
096 _clusterExecutorImpl.memberRemoved(departAddresses);
097 }
098 }
099
100 protected List<Address> getDepartAddresses(View oldView, View newView) {
101 List<org.jgroups.Address> currentJGroupsAddresses =
102 newView.getMembers();
103 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
104
105 List<org.jgroups.Address> departJGroupsAddresses =
106 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
107
108 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
109
110 if (departJGroupsAddresses.isEmpty()) {
111 return Collections.emptyList();
112 }
113
114 List<Address> departAddresses = new ArrayList<Address>(
115 departJGroupsAddresses.size());
116
117 for (org.jgroups.Address departJGroupsAddress :
118 departJGroupsAddresses) {
119
120 Address departAddress = new AddressImpl(departJGroupsAddress);
121
122 departAddresses.add(departAddress);
123 }
124
125 return departAddresses;
126 }
127
128 protected List<Address> getNewAddresses(View oldView, View newView) {
129 List<org.jgroups.Address> currentJGroupsAddresses =
130 newView.getMembers();
131 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
132
133 List<org.jgroups.Address> newJGroupsAddresses =
134 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
135
136 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
137
138 if (newJGroupsAddresses.isEmpty()) {
139 return Collections.emptyList();
140 }
141
142 List<Address> newAddresses = new ArrayList<Address>(
143 newJGroupsAddresses.size());
144
145 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
146 Address newAddress = new AddressImpl(newJGroupsAddress);
147
148 newAddresses.add(newAddress);
149 }
150
151 return newAddresses;
152 }
153
154 protected void handleResponse(
155 Address address, ClusterRequest clusterRequest, Object returnValue,
156 Exception exception) {
157
158 ClusterNodeResponse clusterNodeResponse =
159 _clusterExecutorImpl.generateClusterNodeResponse(
160 clusterRequest, returnValue, exception);
161
162 Channel channel = _clusterExecutorImpl.getControlChannel();
163
164 try {
165 channel.send(
166 (org.jgroups.Address)address.getRealAddress(),
167 clusterNodeResponse);
168 }
169 catch (Exception e) {
170 _log.error(
171 "Unable to send response message " + clusterNodeResponse, e);
172 }
173 catch (Throwable t) {
174 _log.error(t, t);
175 }
176 }
177
178 protected void processClusterRequest(
179 ClusterRequest clusterRequest, Address sourceAddress) {
180
181 ClusterMessageType clusterMessageType =
182 clusterRequest.getClusterMessageType();
183
184 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
185 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
186
187 _clusterExecutorImpl.memberJoined(
188 sourceAddress, clusterRequest.getOriginatingClusterNode());
189
190 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
191 handleResponse(sourceAddress, clusterRequest, null, null);
192 }
193
194 return;
195 }
196
197 MethodHandler methodHandler = clusterRequest.getMethodHandler();
198
199 Object returnValue = null;
200 Exception exception = null;
201
202 if (methodHandler != null) {
203 try {
204 ClusterInvokeThreadLocal.setEnabled(false);
205
206 returnValue = methodHandler.invoke(true);
207 }
208 catch (Exception e) {
209 exception = e;
210
211 _log.error("Unable to invoke method " + methodHandler, e);
212 }
213 finally {
214 ClusterInvokeThreadLocal.setEnabled(true);
215 }
216 }
217 else {
218 exception = new ClusterException(
219 "Payload is not of type " + MethodHandler.class.getName());
220 }
221
222 if (!clusterRequest.isFireAndForget()) {
223 handleResponse(
224 sourceAddress, clusterRequest, returnValue, exception);
225 }
226 }
227
228 protected void processClusterResponse(
229 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
230
231 ClusterMessageType clusterMessageType =
232 clusterNodeResponse.getClusterMessageType();
233
234 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
235 _clusterExecutorImpl.memberJoined(
236 sourceAddress, clusterNodeResponse.getClusterNode());
237
238 return;
239 }
240
241 String uuid = clusterNodeResponse.getUuid();
242
243 FutureClusterResponses futureClusterResponses =
244 _clusterExecutorImpl.getExecutionResults(uuid);
245
246 if (futureClusterResponses == null) {
247 if (_log.isInfoEnabled()) {
248 _log.info("Unable to find response container for " + uuid);
249 }
250
251 return;
252 }
253
254 if (futureClusterResponses.expectsReply(sourceAddress)) {
255 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
256 }
257 else {
258 if (_log.isWarnEnabled()) {
259 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
260 }
261 }
262 }
263
264 protected boolean processLocalMessage(Object message) {
265 if (message instanceof ClusterRequest) {
266 ClusterRequest clusterRequest = (ClusterRequest)message;
267
268 if (clusterRequest.isSkipLocal()) {
269 return true;
270 }
271 }
272
273 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
274 return true;
275 }
276
277 return false;
278 }
279
280 private static Log _log = LogFactoryUtil.getLog(
281 ClusterRequestReceiver.class);
282
283 private ClusterExecutorImpl _clusterExecutorImpl;
284
285 }