001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.InetAddressUtil;
035 import com.liferay.portal.kernel.util.MethodHandler;
036 import com.liferay.portal.kernel.util.PropsKeys;
037 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039 import com.liferay.portal.util.PortalPortEventListener;
040 import com.liferay.portal.util.PortalUtil;
041 import com.liferay.portal.util.PropsUtil;
042 import com.liferay.portal.util.PropsValues;
043
044 import java.io.Serializable;
045
046 import java.net.InetAddress;
047
048 import java.util.ArrayList;
049 import java.util.Collection;
050 import java.util.Collections;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.Properties;
054 import java.util.concurrent.BlockingQueue;
055 import java.util.concurrent.ConcurrentHashMap;
056 import java.util.concurrent.CopyOnWriteArrayList;
057 import java.util.concurrent.ExecutorService;
058 import java.util.concurrent.TimeUnit;
059 import java.util.concurrent.TimeoutException;
060
061 import org.jgroups.JChannel;
062
063
067 @DoPrivileged
068 public class ClusterExecutorImpl
069 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
070
071 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
072 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
073
074 @Override
075 public void addClusterEventListener(
076 ClusterEventListener clusterEventListener) {
077
078 if (!isEnabled()) {
079 return;
080 }
081
082 _clusterEventListeners.addIfAbsent(clusterEventListener);
083 }
084
085 @Override
086 public void afterPropertiesSet() {
087 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
088 addClusterEventListener(new DebuggingClusterEventListenerImpl());
089 }
090
091 if (PropsValues.LIVE_USERS_ENABLED) {
092 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
093 }
094
095 super.afterPropertiesSet();
096 }
097
098 @Override
099 public void destroy() {
100 if (!isEnabled()) {
101 return;
102 }
103
104 PortalExecutorManagerUtil.shutdown(
105 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
106
107 _controlJChannel.setReceiver(null);
108
109 _controlJChannel.close();
110
111 _clusterEventListeners.clear();
112 _clusterNodeAddresses.clear();
113 _futureClusterResponses.clear();
114 _liveInstances.clear();
115 _localAddress = null;
116 _localClusterNode = null;
117 }
118
119 @Override
120 public FutureClusterResponses execute(ClusterRequest clusterRequest)
121 throws SystemException {
122
123 if (!isEnabled()) {
124 return null;
125 }
126
127 List<Address> addresses = prepareAddresses(clusterRequest);
128
129 FutureClusterResponses futureClusterResponses =
130 new FutureClusterResponses(addresses);
131
132 if (!clusterRequest.isFireAndForget()) {
133 String uuid = clusterRequest.getUuid();
134
135 _futureClusterResponses.put(uuid, futureClusterResponses);
136 }
137
138 if (_shortcutLocalMethod &&
139 addresses.remove(getLocalClusterNodeAddress())) {
140
141 runLocalMethod(clusterRequest, futureClusterResponses);
142 }
143
144 if (clusterRequest.isMulticast()) {
145 try {
146 _controlJChannel.send(null, clusterRequest);
147 }
148 catch (Exception e) {
149 throw new SystemException(
150 "Unable to send multicast request", e);
151 }
152 }
153 else {
154 for (Address address : addresses) {
155 org.jgroups.Address jGroupsAddress =
156 (org.jgroups.Address)address.getRealAddress();
157
158 try {
159 _controlJChannel.send(jGroupsAddress, clusterRequest);
160 }
161 catch (Exception e) {
162 throw new SystemException(
163 "Unable to send unicast request", e);
164 }
165 }
166 }
167
168 return futureClusterResponses;
169 }
170
171 @Override
172 public void execute(
173 ClusterRequest clusterRequest,
174 ClusterResponseCallback clusterResponseCallback)
175 throws SystemException {
176
177 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
178
179 ClusterResponseCallbackJob clusterResponseCallbackJob =
180 new ClusterResponseCallbackJob(
181 clusterResponseCallback, futureClusterResponses);
182
183 _executorService.execute(clusterResponseCallbackJob);
184 }
185
186 @Override
187 public void execute(
188 ClusterRequest clusterRequest,
189 ClusterResponseCallback clusterResponseCallback, long timeout,
190 TimeUnit timeUnit)
191 throws SystemException {
192
193 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
194
195 ClusterResponseCallbackJob clusterResponseCallbackJob =
196 new ClusterResponseCallbackJob(
197 clusterResponseCallback, futureClusterResponses, timeout,
198 timeUnit);
199
200 _executorService.execute(clusterResponseCallbackJob);
201 }
202
203 @Override
204 public List<ClusterEventListener> getClusterEventListeners() {
205 if (!isEnabled()) {
206 return Collections.emptyList();
207 }
208
209 return Collections.unmodifiableList(_clusterEventListeners);
210 }
211
212 @Override
213 public List<Address> getClusterNodeAddresses() {
214 if (!isEnabled()) {
215 return Collections.emptyList();
216 }
217
218 return getAddresses(_controlJChannel);
219 }
220
221 @Override
222 public List<ClusterNode> getClusterNodes() {
223 if (!isEnabled()) {
224 return Collections.emptyList();
225 }
226
227 return new ArrayList<ClusterNode>(_liveInstances.values());
228 }
229
230 @Override
231 public ClusterNode getLocalClusterNode() {
232 if (!isEnabled()) {
233 return null;
234 }
235
236 return _localClusterNode;
237 }
238
239 @Override
240 public Address getLocalClusterNodeAddress() {
241 if (!isEnabled()) {
242 return null;
243 }
244
245 return _localAddress;
246 }
247
248 @Override
249 public void initialize() {
250 if (!isEnabled()) {
251 return;
252 }
253
254 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
255 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
256
257 PortalUtil.addPortalPortEventListener(this);
258
259 _localAddress = new AddressImpl(_controlJChannel.getAddress());
260
261 try {
262 initLocalClusterNode();
263
264 memberJoined(_localAddress, _localClusterNode);
265
266 sendNotifyRequest();
267 }
268 catch (Exception e) {
269 _log.error("Unable to determine local network address", e);
270 }
271
272 ClusterRequestReceiver clusterRequestReceiver =
273 (ClusterRequestReceiver)_controlJChannel.getReceiver();
274
275 clusterRequestReceiver.openLatch();
276 }
277
278 @Override
279 public boolean isClusterNodeAlive(Address address) {
280 if (!isEnabled()) {
281 return false;
282 }
283
284 List<Address> addresses = getAddresses(_controlJChannel);
285
286 return addresses.contains(address);
287 }
288
289 @Override
290 public boolean isClusterNodeAlive(String clusterNodeId) {
291 if (!isEnabled()) {
292 return false;
293 }
294
295 return _clusterNodeAddresses.containsKey(clusterNodeId);
296 }
297
298 @Override
299 public void portalPortConfigured(int port) {
300 if (!isEnabled() ||
301 (_localClusterNode.getPort() ==
302 PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
303
304 return;
305 }
306
307 try {
308 _localClusterNode.setPort(port);
309
310 memberJoined(_localAddress, _localClusterNode);
311
312 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
313 ClusterMessageType.UPDATE, _localClusterNode);
314
315 _controlJChannel.send(null, clusterRequest);
316 }
317 catch (Exception e) {
318 _log.error("Unable to determine configure node port", e);
319 }
320 }
321
322 @Override
323 public void removeClusterEventListener(
324 ClusterEventListener clusterEventListener) {
325
326 if (!isEnabled()) {
327 return;
328 }
329
330 _clusterEventListeners.remove(clusterEventListener);
331 }
332
333 public void setClusterEventListeners(
334 List<ClusterEventListener> clusterEventListeners) {
335
336 if (!isEnabled()) {
337 return;
338 }
339
340 _clusterEventListeners.addAllAbsent(clusterEventListeners);
341 }
342
343 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
344 if (!isEnabled()) {
345 return;
346 }
347
348 _shortcutLocalMethod = shortcutLocalMethod;
349 }
350
351 protected void fireClusterEvent(ClusterEvent clusterEvent) {
352 for (ClusterEventListener listener : _clusterEventListeners) {
353 listener.processClusterEvent(clusterEvent);
354 }
355 }
356
357 protected ClusterNodeResponse generateClusterNodeResponse(
358 ClusterRequest clusterRequest, Object returnValue,
359 Exception exception) {
360
361 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
362
363 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
364 clusterNodeResponse.setClusterNode(getLocalClusterNode());
365 clusterNodeResponse.setClusterMessageType(
366 clusterRequest.getClusterMessageType());
367 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
368 clusterNodeResponse.setUuid(clusterRequest.getUuid());
369
370 if (exception != null) {
371 clusterNodeResponse.setException(exception);
372 }
373 else {
374 if (returnValue instanceof Serializable) {
375 clusterNodeResponse.setResult(returnValue);
376 }
377 else if (returnValue != null) {
378 clusterNodeResponse.setException(
379 new ClusterException("Return value is not serializable"));
380 }
381 }
382
383 return clusterNodeResponse;
384 }
385
386 protected JChannel getControlChannel() {
387 return _controlJChannel;
388 }
389
390 protected FutureClusterResponses getExecutionResults(String uuid) {
391 return _futureClusterResponses.get(uuid);
392 }
393
394 @Override
395 protected void initChannels() throws Exception {
396 Properties controlProperties = PropsUtil.getProperties(
397 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
398
399 String controlProperty = controlProperties.getProperty(
400 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
401
402 ClusterRequestReceiver clusterRequestReceiver =
403 new ClusterRequestReceiver(this);
404
405 _controlJChannel = createJChannel(
406 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
407 }
408
409 protected void initLocalClusterNode() throws Exception {
410 InetAddress inetAddress = bindInetAddress;
411
412 if (inetAddress == null) {
413 inetAddress = InetAddressUtil.getLocalInetAddress();
414 }
415
416 ClusterNode localClusterNode = new ClusterNode(
417 PortalUUIDUtil.generate(), inetAddress);
418
419 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
420 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
421 }
422 else {
423 localClusterNode.setPort(PortalUtil.getPortalPort(false));
424 }
425
426 _localClusterNode = localClusterNode;
427 }
428
429 protected boolean isShortcutLocalMethod() {
430 return _shortcutLocalMethod;
431 }
432
433 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
434 _liveInstances.put(joinAddress, clusterNode);
435
436 Address previousAddress = _clusterNodeAddresses.put(
437 clusterNode.getClusterNodeId(), joinAddress);
438
439 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
440 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
441
442
443
444 fireClusterEvent(clusterEvent);
445 }
446 }
447
448 protected void memberRemoved(List<Address> departAddresses) {
449 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
450
451 for (Address departAddress : departAddresses) {
452 ClusterNode departClusterNode = _liveInstances.remove(
453 departAddress);
454
455 if (departClusterNode == null) {
456 continue;
457 }
458
459 departClusterNodes.add(departClusterNode);
460
461 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
462 }
463
464 if (departClusterNodes.isEmpty()) {
465 return;
466 }
467
468 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
469
470 fireClusterEvent(clusterEvent);
471 }
472
473 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
474 boolean isMulticast = clusterRequest.isMulticast();
475
476 List<Address> addresses = null;
477
478 if (isMulticast) {
479 addresses = getAddresses(_controlJChannel);
480 }
481 else {
482 addresses = new ArrayList<Address>();
483
484 Collection<Address> clusterNodeAddresses =
485 clusterRequest.getTargetClusterNodeAddresses();
486
487 if (clusterNodeAddresses != null) {
488 addresses.addAll(clusterNodeAddresses);
489 }
490
491 Collection<String> clusterNodeIds =
492 clusterRequest.getTargetClusterNodeIds();
493
494 if (clusterNodeIds != null) {
495 for (String clusterNodeId : clusterNodeIds) {
496 Address address = _clusterNodeAddresses.get(clusterNodeId);
497
498 addresses.add(address);
499 }
500 }
501 }
502
503 if (clusterRequest.isSkipLocal()) {
504 addresses.remove(getLocalClusterNodeAddress());
505 }
506
507 return addresses;
508 }
509
510 protected void runLocalMethod(
511 ClusterRequest clusterRequest,
512 FutureClusterResponses futureClusterResponses) {
513
514 MethodHandler methodHandler = clusterRequest.getMethodHandler();
515
516 Object returnValue = null;
517 Exception exception = null;
518
519 if (methodHandler == null) {
520 exception = new ClusterException(
521 "Payload is not of type " + MethodHandler.class.getName());
522 }
523 else {
524 try {
525 returnValue = methodHandler.invoke(true);
526 }
527 catch (Exception e) {
528 exception = e;
529 }
530 }
531
532 if (!clusterRequest.isFireAndForget()) {
533 ClusterNodeResponse clusterNodeResponse =
534 generateClusterNodeResponse(
535 clusterRequest, returnValue, exception);
536
537 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
538 }
539 }
540
541 protected void sendNotifyRequest() {
542 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
543 ClusterMessageType.NOTIFY, _localClusterNode);
544
545 try {
546 _controlJChannel.send(null, clusterRequest);
547 }
548 catch (Exception e) {
549 _log.error("Unable to send notify message", e);
550 }
551 }
552
553 private static final String _DEFAULT_CLUSTER_NAME =
554 "LIFERAY-CONTROL-CHANNEL";
555
556 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
557
558 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
559 new CopyOnWriteArrayList<ClusterEventListener>();
560 private Map<String, Address> _clusterNodeAddresses =
561 new ConcurrentHashMap<String, Address>();
562 private JChannel _controlJChannel;
563 private ExecutorService _executorService;
564 private Map<String, FutureClusterResponses> _futureClusterResponses =
565 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
566 private Map<Address, ClusterNode> _liveInstances =
567 new ConcurrentHashMap<Address, ClusterNode>();
568 private Address _localAddress;
569 private ClusterNode _localClusterNode;
570 private boolean _shortcutLocalMethod;
571
572 private class ClusterResponseCallbackJob implements Runnable {
573
574 public ClusterResponseCallbackJob(
575 ClusterResponseCallback clusterResponseCallback,
576 FutureClusterResponses futureClusterResponses) {
577
578 _clusterResponseCallback = clusterResponseCallback;
579 _futureClusterResponses = futureClusterResponses;
580 _timeout = -1;
581 _timeoutGet = false;
582 _timeUnit = TimeUnit.SECONDS;
583 }
584
585 public ClusterResponseCallbackJob(
586 ClusterResponseCallback clusterResponseCallback,
587 FutureClusterResponses futureClusterResponses, long timeout,
588 TimeUnit timeUnit) {
589
590 _clusterResponseCallback = clusterResponseCallback;
591 _futureClusterResponses = futureClusterResponses;
592 _timeout = timeout;
593 _timeoutGet = true;
594 _timeUnit = timeUnit;
595 }
596
597 @Override
598 public void run() {
599 BlockingQueue<ClusterNodeResponse> blockingQueue =
600 _futureClusterResponses.getPartialResults();
601
602 _clusterResponseCallback.callback(blockingQueue);
603
604 ClusterNodeResponses clusterNodeResponses = null;
605
606 try {
607 if (_timeoutGet) {
608 clusterNodeResponses = _futureClusterResponses.get(
609 _timeout, _timeUnit);
610 }
611 else {
612 clusterNodeResponses = _futureClusterResponses.get();
613 }
614
615 _clusterResponseCallback.callback(clusterNodeResponses);
616 }
617 catch (InterruptedException ie) {
618 _clusterResponseCallback.processInterruptedException(ie);
619 }
620 catch (TimeoutException te) {
621 _clusterResponseCallback.processTimeoutException(te);
622 }
623 }
624
625 private final ClusterResponseCallback _clusterResponseCallback;
626 private final FutureClusterResponses _futureClusterResponses;
627 private final long _timeout;
628 private final boolean _timeoutGet;
629 private final TimeUnit _timeUnit;
630
631 }
632
633 }