001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterMessageType;
022 import com.liferay.portal.kernel.cluster.ClusterNode;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.util.MethodHandler;
029 import com.liferay.portal.kernel.util.Validator;
030 import com.liferay.portal.security.pacl.PACLClassLoaderUtil;
031
032 import java.io.Serializable;
033
034 import java.util.ArrayList;
035 import java.util.Collections;
036 import java.util.List;
037 import java.util.concurrent.CountDownLatch;
038
039 import org.jgroups.Channel;
040 import org.jgroups.ChannelException;
041 import org.jgroups.Message;
042 import org.jgroups.View;
043
044
048 public class ClusterRequestReceiver extends BaseReceiver {
049
050 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
051 _countDownLatch = new CountDownLatch(1);
052 _clusterExecutorImpl = clusterExecutorImpl;
053 }
054
055 public void openLatch() {
056 _countDownLatch.countDown();
057 }
058
059 @Override
060 public void receive(Message message) {
061 try {
062 _countDownLatch.await();
063 }
064 catch (InterruptedException ie) {
065 _log.error(
066 "Latch opened prematurely by interruption. Dependence may " +
067 "not be ready.");
068 }
069
070 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
071
072 org.jgroups.Address localAddress = controlChannel.getAddress();
073
074 Object obj = message.getObject();
075
076 if (obj == null) {
077 if (_log.isWarnEnabled()) {
078 _log.warn("Message content is null");
079 }
080
081 return;
082 }
083
084 org.jgroups.Address sourceAddress = message.getSrc();
085
086 if (localAddress.equals(sourceAddress)) {
087 boolean isProcessed = processLocalMessage(obj, sourceAddress);
088
089 if (isProcessed) {
090 return;
091 }
092 }
093
094 if (obj instanceof ClusterRequest) {
095 ClusterRequest clusterRequest = (ClusterRequest)obj;
096
097 processClusterRequest(clusterRequest, sourceAddress, localAddress);
098 }
099 else if (obj instanceof ClusterNodeResponse) {
100 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
101
102 processClusterResponse(
103 clusterNodeResponse, sourceAddress, localAddress);
104 }
105 else if (_log.isWarnEnabled()) {
106 _log.warn(
107 "Unable to process message content of type " + obj.getClass());
108 }
109 }
110
111 @Override
112 public void viewAccepted(View view) {
113 super.viewAccepted(view);
114
115 if (_lastView == null) {
116 _lastView = view;
117
118 return;
119 }
120
121 List<Address> departAddresses = getDepartAddresses(view);
122 List<Address> newAddresses = getNewAddresses(view);
123
124 _lastView = view;
125
126 try {
127 _countDownLatch.await();
128 }
129 catch (InterruptedException ie) {
130 _log.error(
131 "Latch opened prematurely by interruption. Dependence may " +
132 "not be ready.");
133 }
134
135 if (!newAddresses.isEmpty()) {
136 _clusterExecutorImpl.sendNotifyRequest();
137 }
138
139 if (!departAddresses.isEmpty()) {
140 _clusterExecutorImpl.memberRemoved(departAddresses);
141 }
142 }
143
144 protected List<Address> getDepartAddresses(View view) {
145 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
146 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
147
148 List<org.jgroups.Address> departJGroupsAddresses =
149 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
150
151 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
152
153 if (departJGroupsAddresses.isEmpty()) {
154 return Collections.emptyList();
155 }
156
157 List<Address> departAddresses = new ArrayList<Address>(
158 departJGroupsAddresses.size());
159
160 for (org.jgroups.Address departJGroupsAddress :
161 departJGroupsAddresses) {
162
163 Address departAddress = new AddressImpl(departJGroupsAddress);
164
165 departAddresses.add(departAddress);
166 }
167
168 return departAddresses;
169 }
170
171 protected List<Address> getNewAddresses(View view) {
172 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
173 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
174
175 List<org.jgroups.Address> newJGroupsAddresses =
176 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
177
178 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
179
180 if (newJGroupsAddresses.isEmpty()) {
181 return Collections.emptyList();
182 }
183
184 List<Address> newAddresses = new ArrayList<Address>(
185 newJGroupsAddresses.size());
186
187 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
188 Address newAddress = new AddressImpl(newJGroupsAddress);
189
190 newAddresses.add(newAddress);
191 }
192
193 return newAddresses;
194 }
195
196 protected Object invoke(
197 String servletContextName, String beanIdentifier,
198 MethodHandler methodHandler)
199 throws Exception {
200
201 if (servletContextName == null) {
202 if (Validator.isNull(beanIdentifier)) {
203 return methodHandler.invoke(true);
204 }
205 else {
206 Object bean = PortalBeanLocatorUtil.locate(beanIdentifier);
207
208 return methodHandler.invoke(bean);
209 }
210 }
211
212 ClassLoader contextClassLoader =
213 PACLClassLoaderUtil.getContextClassLoader();
214
215 try {
216 ClassLoader classLoader =
217 (ClassLoader)PortletBeanLocatorUtil.locate(
218 servletContextName, "portletClassLoader");
219
220 PACLClassLoaderUtil.setContextClassLoader(classLoader);
221
222 if (Validator.isNull(beanIdentifier)) {
223 return methodHandler.invoke(true);
224 }
225 else {
226 Object bean = PortletBeanLocatorUtil.locate(
227 servletContextName, beanIdentifier);
228
229 return methodHandler.invoke(bean);
230 }
231 }
232 catch (Exception e) {
233 throw e;
234 }
235 finally {
236 PACLClassLoaderUtil.setContextClassLoader(contextClassLoader);
237 }
238 }
239
240 protected void processClusterRequest(
241 ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
242 org.jgroups.Address localAddress) {
243
244 ClusterMessageType clusterMessageType =
245 clusterRequest.getClusterMessageType();
246
247 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
248
249 Address address = new AddressImpl(localAddress);
250
251 clusterNodeResponse.setAddress(address);
252
253 ClusterNode localClusterNode =
254 _clusterExecutorImpl.getLocalClusterNode();
255
256 clusterNodeResponse.setClusterNode(localClusterNode);
257
258 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
259 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
260
261 ClusterNode originatingClusterNode =
262 clusterRequest.getOriginatingClusterNode();
263
264 if (originatingClusterNode != null) {
265 _clusterExecutorImpl.memberJoined(
266 new AddressImpl(sourceAddress), originatingClusterNode);
267
268 clusterNodeResponse.setClusterMessageType(clusterMessageType);
269 }
270 else {
271 if (_log.isWarnEnabled()) {
272 _log.warn(
273 "Content of notify message does not contain cluster " +
274 "node information");
275 }
276
277 return;
278 }
279 }
280 else {
281 clusterNodeResponse.setClusterMessageType(
282 ClusterMessageType.EXECUTE);
283 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
284 clusterNodeResponse.setUuid(clusterRequest.getUuid());
285
286 MethodHandler methodHandler = clusterRequest.getMethodHandler();
287
288 if (methodHandler != null) {
289 try {
290 ClusterInvokeThreadLocal.setEnabled(false);
291
292 Object returnValue = invoke(
293 clusterRequest.getServletContextName(),
294 clusterRequest.getBeanIdentifier(), methodHandler);
295
296 if (returnValue instanceof Serializable) {
297 clusterNodeResponse.setResult(returnValue);
298 }
299 else if (returnValue != null) {
300 clusterNodeResponse.setException(
301 new ClusterException(
302 "Return value is not serializable"));
303 }
304 }
305 catch (Exception e) {
306 clusterNodeResponse.setException(e);
307
308 _log.error("Failed to invoke method " + methodHandler, e);
309 }
310 finally {
311 ClusterInvokeThreadLocal.setEnabled(true);
312 }
313 }
314 else {
315 clusterNodeResponse.setException(
316 new ClusterException(
317 "Payload is not of type " +
318 MethodHandler.class.getName()));
319 }
320 }
321
322 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
323
324 try {
325 controlChannel.send(
326 sourceAddress, localAddress, clusterNodeResponse);
327 }
328 catch (ChannelException ce) {
329 _log.error(
330 "Unable to send response message " + clusterNodeResponse, ce);
331 }
332 catch (Throwable t) {
333 _log.error(t, t);
334 }
335 }
336
337 protected void processClusterResponse(
338 ClusterNodeResponse clusterNodeResponse,
339 org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
340
341 ClusterMessageType clusterMessageType =
342 clusterNodeResponse.getClusterMessageType();
343
344 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
345 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
346
347 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
348
349 if (clusterNode != null) {
350 Address joinAddress = new AddressImpl(sourceAddress);
351
352 _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
353 }
354 else {
355 if (_log.isWarnEnabled()) {
356 _log.warn(
357 "Response of notify message does not contain cluster " +
358 "node information");
359 }
360 }
361
362 return;
363 }
364
365 String uuid = clusterNodeResponse.getUuid();
366
367 FutureClusterResponses futureClusterResponses =
368 _clusterExecutorImpl.getExecutionResults(uuid);
369
370 if (futureClusterResponses == null) {
371 if (_log.isInfoEnabled()) {
372 _log.info("Unable to find response container for " + uuid);
373 }
374
375 return;
376 }
377
378 Address address = new AddressImpl(sourceAddress);
379
380 if (futureClusterResponses.expectsReply(address)) {
381 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
382 }
383 else {
384 if (_log.isWarnEnabled()) {
385 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
386 }
387 }
388 }
389
390 protected boolean processLocalMessage(
391 Object message, org.jgroups.Address sourceAddress) {
392
393 if (message instanceof ClusterRequest) {
394 ClusterRequest clusterRequest = (ClusterRequest)message;
395
396 if (clusterRequest.isSkipLocal()) {
397 return true;
398 }
399 }
400
401 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
402 return true;
403 }
404
405 return false;
406 }
407
408 private static Log _log = LogFactoryUtil.getLog(
409 ClusterRequestReceiver.class);
410
411 private ClusterExecutorImpl _clusterExecutorImpl;
412 private CountDownLatch _countDownLatch;
413 private volatile View _lastView;
414
415 }