001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
018 import com.liferay.portal.kernel.cluster.Address;
019 import com.liferay.portal.kernel.cluster.ClusterException;
020 import com.liferay.portal.kernel.cluster.ClusterMessageType;
021 import com.liferay.portal.kernel.cluster.ClusterNode;
022 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
023 import com.liferay.portal.kernel.cluster.ClusterRequest;
024 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
025 import com.liferay.portal.kernel.log.Log;
026 import com.liferay.portal.kernel.log.LogFactoryUtil;
027 import com.liferay.portal.kernel.util.MethodInvoker;
028 import com.liferay.portal.kernel.util.MethodWrapper;
029
030 import java.io.Serializable;
031
032 import java.util.ArrayList;
033 import java.util.Iterator;
034 import java.util.List;
035 import java.util.Vector;
036
037 import org.jgroups.Channel;
038 import org.jgroups.ChannelException;
039 import org.jgroups.Message;
040 import org.jgroups.View;
041
042
046 public class ClusterRequestReceiver extends BaseReceiver {
047
048 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
049 _clusterExecutorImpl = clusterExecutorImpl;
050 }
051
052 public void receive(Message message) {
053 org.jgroups.Address sourceAddress = message.getSrc();
054
055 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
056
057 org.jgroups.Address localAddress = controlChannel.getAddress();
058
059 Object obj = message.getObject();
060
061 if (obj == null) {
062 if (_log.isWarnEnabled()) {
063 _log.warn("Message content is null");
064 }
065
066 return;
067 }
068
069 if (localAddress.equals(sourceAddress)) {
070 boolean isProcessed = processLocalMessage(obj, sourceAddress);
071
072 if (isProcessed) {
073 return;
074 }
075 }
076
077 if (obj instanceof ClusterRequest) {
078 ClusterRequest clusterRequest = (ClusterRequest)obj;
079
080 processClusterRequest(clusterRequest, sourceAddress, localAddress);
081 }
082 else if (obj instanceof ClusterNodeResponse) {
083 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
084
085 processClusterResponse(
086 clusterNodeResponse, sourceAddress, localAddress);
087 }
088 else {
089 if (_log.isWarnEnabled()) {
090 _log.warn(
091 "Unable to process message content of type " +
092 obj.getClass().getName());
093 }
094 }
095 }
096
097 public void viewAccepted(View view) {
098 if (_log.isDebugEnabled()) {
099 _log.debug("Accepted view " + view);
100 }
101
102 if (_lastView == null) {
103 _lastView = view;
104
105 return;
106 }
107
108 List<Address> departAddresses = getDepartAddresses(view);
109
110 _lastView = view;
111
112 if (departAddresses.isEmpty()) {
113 return;
114 }
115
116 _clusterExecutorImpl.memberRemoved(departAddresses);
117 }
118
119 protected Object invoke(
120 String servletContextName, MethodWrapper methodWrapper)
121 throws Exception {
122
123 if (servletContextName == null) {
124 return MethodInvoker.invoke(methodWrapper);
125 }
126
127 Thread currentThread = Thread.currentThread();
128
129 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
130
131 try {
132 ClassLoader classLoader =
133 (ClassLoader)PortletBeanLocatorUtil.locate(
134 servletContextName, "portletClassLoader");
135
136 currentThread.setContextClassLoader(classLoader);
137
138 return MethodInvoker.invoke(methodWrapper);
139 }
140 catch(Exception e) {
141 throw e;
142 }
143 finally {
144 currentThread.setContextClassLoader(contextClassLoader);
145 }
146 }
147
148 protected List<Address> getDepartAddresses(View view) {
149 List<Address> departAddresses = new ArrayList<Address>();
150
151 Vector<org.jgroups.Address> jGroupsAddresses = view.getMembers();
152 Vector<org.jgroups.Address> lastJGroupsAddresses =
153 _lastView.getMembers();
154
155 List<org.jgroups.Address> tempAddresses =
156 new ArrayList<org.jgroups.Address>(jGroupsAddresses.size());
157
158 tempAddresses.addAll(jGroupsAddresses);
159
160 List<org.jgroups.Address> lastAddresses =
161 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses.size());
162
163 lastAddresses.addAll(lastJGroupsAddresses);
164
165 tempAddresses.retainAll(lastJGroupsAddresses);
166 lastAddresses.removeAll(tempAddresses);
167
168 if (!lastAddresses.isEmpty()) {
169 Iterator<org.jgroups.Address> itr = lastAddresses.iterator();
170
171 while (itr.hasNext()) {
172 departAddresses.add(new AddressImpl(itr.next()));
173 }
174 }
175
176 return departAddresses;
177 }
178
179 protected void processClusterRequest(
180 ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
181 org.jgroups.Address localAddress) {
182
183 ClusterMessageType clusterMessageType =
184 clusterRequest.getClusterMessageType();
185
186 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
187
188 try {
189 ClusterNode localClusterNode =
190 _clusterExecutorImpl.getLocalClusterNode();
191
192 clusterNodeResponse.setClusterNode(localClusterNode);
193 }
194 catch (Exception e) {
195 clusterNodeResponse.setException(e);
196 }
197
198 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
199 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
200
201 ClusterNode originatingClusterNode =
202 clusterRequest.getOriginatingClusterNode();
203
204 if (originatingClusterNode != null) {
205 _clusterExecutorImpl.memberJoined(
206 new AddressImpl(sourceAddress), originatingClusterNode);
207
208 clusterNodeResponse.setClusterMessageType(clusterMessageType);
209 }
210 else {
211 if (_log.isWarnEnabled()) {
212 _log.warn(
213 "Content of notify message does not contain cluster " +
214 "node information");
215 }
216
217 return;
218 }
219 }
220 else {
221 clusterNodeResponse.setClusterMessageType(
222 ClusterMessageType.EXECUTE);
223 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
224 clusterNodeResponse.setUuid(clusterRequest.getUuid());
225
226 MethodWrapper methodWrapper = clusterRequest.getMethodWrapper();
227
228 if (methodWrapper != null) {
229 try {
230 ClusterInvokeThreadLocal.setEnabled(false);
231
232 Object returnValue = invoke(
233 clusterRequest.getServletContextName(), methodWrapper);
234
235 if (returnValue instanceof Serializable) {
236 clusterNodeResponse.setResult(returnValue);
237 }
238 else if (returnValue != null) {
239 clusterNodeResponse.setException(
240 new ClusterException(
241 "Return value is not serializable"));
242 }
243 }
244 catch (Exception e) {
245 clusterNodeResponse.setException(e);
246
247 _log.error("Failed to invoke method " + methodWrapper, e);
248 }
249 finally {
250 ClusterInvokeThreadLocal.setEnabled(true);
251 }
252 }
253 else {
254 clusterNodeResponse.setException(
255 new ClusterException(
256 "Payload is not of type " +
257 MethodWrapper.class.getName()));
258 }
259 }
260
261 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
262
263 try {
264 controlChannel.send(
265 sourceAddress, localAddress, clusterNodeResponse);
266 }
267 catch (ChannelException ce) {
268 _log.error(
269 "Unable to send response message " + clusterNodeResponse, ce);
270 }
271 catch (Throwable t) {
272 _log.error(t, t);
273 }
274 }
275
276 protected void processClusterResponse(
277 ClusterNodeResponse clusterNodeResponse,
278 org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
279
280 ClusterMessageType clusterMessageType =
281 clusterNodeResponse.getClusterMessageType();
282
283 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
284 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
285
286 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
287
288 if (clusterNode != null) {
289 Address joinAddress = new AddressImpl(sourceAddress);
290
291 _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
292 }
293 else {
294 if (_log.isWarnEnabled()) {
295 _log.warn(
296 "Response of notify message does not contain cluster " +
297 "node information");
298 }
299 }
300
301 return;
302 }
303
304 String uuid = clusterNodeResponse.getUuid();
305
306 FutureClusterResponses futureClusterResponses =
307 _clusterExecutorImpl.getExecutionResults(uuid);
308
309 if (futureClusterResponses == null) {
310 if (_log.isInfoEnabled()) {
311 _log.info("Unable to find response container for " + uuid);
312 }
313
314 return;
315 }
316
317 Address address = new AddressImpl(sourceAddress);
318
319 if (futureClusterResponses.expectsReply(address)) {
320 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
321 }
322 else {
323 if (_log.isWarnEnabled()) {
324 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
325 }
326 }
327 }
328
329 protected boolean processLocalMessage(
330 Object message, org.jgroups.Address sourceAddress) {
331
332 if (message instanceof ClusterRequest) {
333 ClusterRequest clusterRequest = (ClusterRequest)message;
334
335 if (clusterRequest.isSkipLocal()) {
336 return true;
337 }
338
339 ClusterMessageType clusterMessageType =
340 clusterRequest.getClusterMessageType();
341
342 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
343 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
344
345 ClusterNode originatingClusterNode =
346 clusterRequest.getOriginatingClusterNode();
347
348 if (originatingClusterNode != null) {
349 Address joinAddress = new AddressImpl(sourceAddress);
350
351 _clusterExecutorImpl.memberJoined(
352 joinAddress, originatingClusterNode);
353 }
354 else {
355 if (_log.isWarnEnabled()) {
356 _log.warn(
357 "Content of notify message does not contain " +
358 "cluster node information");
359 }
360 }
361
362 return true;
363 }
364 }
365
366 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
367 return true;
368 }
369
370 return false;
371 }
372
373 private static Log _log = LogFactoryUtil.getLog(
374 ClusterRequestReceiver.class);
375
376 private ClusterExecutorImpl _clusterExecutorImpl;
377 private View _lastView;
378
379 }