001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.concurrent.ConcurrentReferenceValueHashMap;
030 import com.liferay.portal.kernel.exception.SystemException;
031 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.memory.FinalizeManager;
035 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
036 import com.liferay.portal.kernel.util.CharPool;
037 import com.liferay.portal.kernel.util.GetterUtil;
038 import com.liferay.portal.kernel.util.Http;
039 import com.liferay.portal.kernel.util.MethodHandler;
040 import com.liferay.portal.kernel.util.PropsKeys;
041 import com.liferay.portal.kernel.util.StringUtil;
042 import com.liferay.portal.kernel.util.Validator;
043 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
044 import com.liferay.portal.util.PortalInetSocketAddressEventListener;
045 import com.liferay.portal.util.PortalUtil;
046 import com.liferay.portal.util.PropsUtil;
047 import com.liferay.portal.util.PropsValues;
048
049 import java.io.Serializable;
050
051 import java.net.InetAddress;
052 import java.net.InetSocketAddress;
053
054 import java.util.ArrayList;
055 import java.util.Collection;
056 import java.util.Collections;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.Properties;
060 import java.util.concurrent.BlockingQueue;
061 import java.util.concurrent.ConcurrentHashMap;
062 import java.util.concurrent.CopyOnWriteArrayList;
063 import java.util.concurrent.ExecutorService;
064 import java.util.concurrent.TimeUnit;
065 import java.util.concurrent.TimeoutException;
066
067 import org.jgroups.JChannel;
068
069
073 @DoPrivileged
074 public class ClusterExecutorImpl
075 extends ClusterBase
076 implements ClusterExecutor, PortalInetSocketAddressEventListener {
077
078 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
079 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
080
081 @Override
082 public void addClusterEventListener(
083 ClusterEventListener clusterEventListener) {
084
085 if (!isEnabled()) {
086 return;
087 }
088
089 _clusterEventListeners.addIfAbsent(clusterEventListener);
090 }
091
092 @Override
093 public void afterPropertiesSet() {
094 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
095 addClusterEventListener(new DebuggingClusterEventListenerImpl());
096 }
097
098 if (PropsValues.LIVE_USERS_ENABLED) {
099 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
100 }
101
102 _secure = StringUtil.equalsIgnoreCase(
103 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
104
105 super.afterPropertiesSet();
106 }
107
108 @Override
109 public void destroy() {
110 if (!isEnabled()) {
111 return;
112 }
113
114 PortalExecutorManagerUtil.shutdown(
115 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
116
117 _controlJChannel.setReceiver(null);
118
119 _controlJChannel.close();
120
121 _clusterEventListeners.clear();
122 _clusterNodeAddresses.clear();
123 _futureClusterResponses.clear();
124 _liveInstances.clear();
125 _localAddress = null;
126 _localClusterNode = null;
127 }
128
129 @Override
130 public FutureClusterResponses execute(ClusterRequest clusterRequest) {
131 if (!isEnabled()) {
132 return null;
133 }
134
135 List<Address> addresses = prepareAddresses(clusterRequest);
136
137 FutureClusterResponses futureClusterResponses =
138 new FutureClusterResponses(addresses);
139
140 if (!clusterRequest.isFireAndForget()) {
141 String uuid = clusterRequest.getUuid();
142
143 _futureClusterResponses.put(uuid, futureClusterResponses);
144 }
145
146 if (_shortcutLocalMethod &&
147 addresses.remove(getLocalClusterNodeAddress())) {
148
149 runLocalMethod(clusterRequest, futureClusterResponses);
150 }
151
152 if (clusterRequest.isMulticast()) {
153 try {
154 _controlJChannel.send(null, clusterRequest);
155 }
156 catch (Exception e) {
157 throw new SystemException(
158 "Unable to send multicast request", e);
159 }
160 }
161 else {
162 for (Address address : addresses) {
163 org.jgroups.Address jGroupsAddress =
164 (org.jgroups.Address)address.getRealAddress();
165
166 try {
167 _controlJChannel.send(jGroupsAddress, clusterRequest);
168 }
169 catch (Exception e) {
170 throw new SystemException(
171 "Unable to send unicast request", e);
172 }
173 }
174 }
175
176 return futureClusterResponses;
177 }
178
179 @Override
180 public void execute(
181 ClusterRequest clusterRequest,
182 ClusterResponseCallback clusterResponseCallback) {
183
184 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
185
186 ClusterResponseCallbackJob clusterResponseCallbackJob =
187 new ClusterResponseCallbackJob(
188 clusterResponseCallback, futureClusterResponses);
189
190 _executorService.execute(clusterResponseCallbackJob);
191 }
192
193 @Override
194 public void execute(
195 ClusterRequest clusterRequest,
196 ClusterResponseCallback clusterResponseCallback, long timeout,
197 TimeUnit timeUnit) {
198
199 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
200
201 ClusterResponseCallbackJob clusterResponseCallbackJob =
202 new ClusterResponseCallbackJob(
203 clusterResponseCallback, futureClusterResponses, timeout,
204 timeUnit);
205
206 _executorService.execute(clusterResponseCallbackJob);
207 }
208
209 @Override
210 public List<ClusterEventListener> getClusterEventListeners() {
211 if (!isEnabled()) {
212 return Collections.emptyList();
213 }
214
215 return Collections.unmodifiableList(_clusterEventListeners);
216 }
217
218 @Override
219 public List<Address> getClusterNodeAddresses() {
220 if (!isEnabled()) {
221 return Collections.emptyList();
222 }
223
224 return getAddresses(_controlJChannel);
225 }
226
227 @Override
228 public List<ClusterNode> getClusterNodes() {
229 if (!isEnabled()) {
230 return Collections.emptyList();
231 }
232
233 return new ArrayList<ClusterNode>(_liveInstances.values());
234 }
235
236 @Override
237 public ClusterNode getLocalClusterNode() {
238 if (!isEnabled()) {
239 return null;
240 }
241
242 return _localClusterNode;
243 }
244
245 @Override
246 public Address getLocalClusterNodeAddress() {
247 if (!isEnabled()) {
248 return null;
249 }
250
251 return _localAddress;
252 }
253
254 @Override
255 public void initialize() {
256 if (!isEnabled()) {
257 return;
258 }
259
260 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
261 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
262
263 PortalUtil.addPortalInetSocketAddressEventListener(this);
264
265 _localAddress = new AddressImpl(_controlJChannel.getAddress());
266
267 initLocalClusterNode();
268
269 memberJoined(_localAddress, _localClusterNode);
270
271 sendNotifyRequest();
272
273 ClusterRequestReceiver clusterRequestReceiver =
274 (ClusterRequestReceiver)_controlJChannel.getReceiver();
275
276 clusterRequestReceiver.openLatch();
277 }
278
279 @Override
280 public boolean isClusterNodeAlive(Address address) {
281 if (!isEnabled()) {
282 return false;
283 }
284
285 List<Address> addresses = getAddresses(_controlJChannel);
286
287 return addresses.contains(address);
288 }
289
290 @Override
291 public boolean isClusterNodeAlive(String clusterNodeId) {
292 if (!isEnabled()) {
293 return false;
294 }
295
296 return _clusterNodeAddresses.containsKey(clusterNodeId);
297 }
298
299 @Override
300 public void portalLocalInetSockAddressConfigured(
301 InetSocketAddress inetSocketAddress) {
302
303 if (!isEnabled() ||
304 (_localClusterNode.getPortalInetSocketAddress() != null)) {
305
306 return;
307 }
308
309 try {
310 _localClusterNode.setPortalInetSocketAddress(inetSocketAddress);
311
312 memberJoined(_localAddress, _localClusterNode);
313
314 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
315 ClusterMessageType.UPDATE, _localClusterNode);
316
317 _controlJChannel.send(null, clusterRequest);
318 }
319 catch (Exception e) {
320 _log.error("Unable to determine configure node port", e);
321 }
322 }
323
324 @Override
325 public void portalServerInetSocketAddressConfigured(
326 InetSocketAddress inetSocketAddress) {
327 }
328
329 @Override
330 public void removeClusterEventListener(
331 ClusterEventListener clusterEventListener) {
332
333 if (!isEnabled()) {
334 return;
335 }
336
337 _clusterEventListeners.remove(clusterEventListener);
338 }
339
340 public void setClusterEventListeners(
341 List<ClusterEventListener> clusterEventListeners) {
342
343 if (!isEnabled()) {
344 return;
345 }
346
347 _clusterEventListeners.addAllAbsent(clusterEventListeners);
348 }
349
350 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
351 if (!isEnabled()) {
352 return;
353 }
354
355 _shortcutLocalMethod = shortcutLocalMethod;
356 }
357
358 protected void fireClusterEvent(ClusterEvent clusterEvent) {
359 for (ClusterEventListener listener : _clusterEventListeners) {
360 listener.processClusterEvent(clusterEvent);
361 }
362 }
363
364 protected ClusterNodeResponse generateClusterNodeResponse(
365 ClusterRequest clusterRequest, Object returnValue,
366 Exception exception) {
367
368 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
369
370 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
371 clusterNodeResponse.setClusterNode(getLocalClusterNode());
372 clusterNodeResponse.setClusterMessageType(
373 clusterRequest.getClusterMessageType());
374 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
375 clusterNodeResponse.setUuid(clusterRequest.getUuid());
376
377 if (exception != null) {
378 clusterNodeResponse.setException(exception);
379 }
380 else {
381 if (returnValue instanceof Serializable) {
382 clusterNodeResponse.setResult(returnValue);
383 }
384 else if (returnValue != null) {
385 clusterNodeResponse.setException(
386 new ClusterException("Return value is not serializable"));
387 }
388 }
389
390 return clusterNodeResponse;
391 }
392
393 protected InetSocketAddress getConfiguredPortalInetSockAddress(
394 boolean secure) {
395
396 InetSocketAddress inetSocketAddress = null;
397
398 String portalInetSocketAddressValue = null;
399
400 if (secure) {
401 portalInetSocketAddressValue =
402 PropsValues.PORTAL_INSTANCE_HTTPS_INET_SOCKET_ADDRESS;
403 }
404 else {
405 portalInetSocketAddressValue =
406 PropsValues.PORTAL_INSTANCE_HTTP_INET_SOCKET_ADDRESS;
407 }
408
409 if (Validator.isNotNull(portalInetSocketAddressValue)) {
410 String[] parts = StringUtil.split(
411 portalInetSocketAddressValue, CharPool.COLON);
412
413 if (parts.length == 2) {
414 try {
415 inetSocketAddress = new InetSocketAddress(
416 InetAddress.getByName(parts[0]),
417 GetterUtil.getIntegerStrict(parts[1]));
418 }
419 catch (Exception e) {
420 _log.error(
421 "Unable to parse portal InetSocketAddress from " +
422 portalInetSocketAddressValue,
423 e);
424 }
425 }
426 }
427
428 return inetSocketAddress;
429 }
430
431 protected JChannel getControlChannel() {
432 return _controlJChannel;
433 }
434
435 protected FutureClusterResponses getExecutionResults(String uuid) {
436 return _futureClusterResponses.get(uuid);
437 }
438
439 @Override
440 protected void initChannels() throws Exception {
441 Properties controlProperties = PropsUtil.getProperties(
442 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
443
444 String controlProperty = controlProperties.getProperty(
445 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
446
447 ClusterRequestReceiver clusterRequestReceiver =
448 new ClusterRequestReceiver(this);
449
450 _controlJChannel = createJChannel(
451 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
452 }
453
454 protected void initLocalClusterNode() {
455 InetAddress inetAddress = getBindInetAddress(_controlJChannel);
456
457 ClusterNode clusterNode = new ClusterNode(
458 PortalUUIDUtil.generate(), inetAddress);
459
460 InetSocketAddress inetSocketAddress =
461 getConfiguredPortalInetSockAddress(_secure);
462
463 if (inetSocketAddress != null) {
464 clusterNode.setPortalInetSocketAddress(inetSocketAddress);
465 }
466
467 _localClusterNode = clusterNode;
468 }
469
470 protected boolean isShortcutLocalMethod() {
471 return _shortcutLocalMethod;
472 }
473
474 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
475 _liveInstances.put(joinAddress, clusterNode);
476
477 Address previousAddress = _clusterNodeAddresses.put(
478 clusterNode.getClusterNodeId(), joinAddress);
479
480 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
481 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
482
483
484
485 fireClusterEvent(clusterEvent);
486 }
487 }
488
489 protected void memberRemoved(List<Address> departAddresses) {
490 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
491
492 for (Address departAddress : departAddresses) {
493 ClusterNode departClusterNode = _liveInstances.remove(
494 departAddress);
495
496 if (departClusterNode == null) {
497 continue;
498 }
499
500 departClusterNodes.add(departClusterNode);
501
502 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
503 }
504
505 if (departClusterNodes.isEmpty()) {
506 return;
507 }
508
509 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
510
511 fireClusterEvent(clusterEvent);
512 }
513
514 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
515 boolean isMulticast = clusterRequest.isMulticast();
516
517 List<Address> addresses = null;
518
519 if (isMulticast) {
520 addresses = getAddresses(_controlJChannel);
521 }
522 else {
523 addresses = new ArrayList<Address>();
524
525 Collection<Address> clusterNodeAddresses =
526 clusterRequest.getTargetClusterNodeAddresses();
527
528 if (clusterNodeAddresses != null) {
529 addresses.addAll(clusterNodeAddresses);
530 }
531
532 Collection<String> clusterNodeIds =
533 clusterRequest.getTargetClusterNodeIds();
534
535 if (clusterNodeIds != null) {
536 for (String clusterNodeId : clusterNodeIds) {
537 Address address = _clusterNodeAddresses.get(clusterNodeId);
538
539 addresses.add(address);
540 }
541 }
542 }
543
544 if (clusterRequest.isSkipLocal()) {
545 addresses.remove(getLocalClusterNodeAddress());
546 }
547
548 return addresses;
549 }
550
551 protected void runLocalMethod(
552 ClusterRequest clusterRequest,
553 FutureClusterResponses futureClusterResponses) {
554
555 MethodHandler methodHandler = clusterRequest.getMethodHandler();
556
557 Object returnValue = null;
558 Exception exception = null;
559
560 if (methodHandler == null) {
561 exception = new ClusterException(
562 "Payload is not of type " + MethodHandler.class.getName());
563 }
564 else {
565 try {
566 returnValue = methodHandler.invoke();
567 }
568 catch (Exception e) {
569 exception = e;
570 }
571 }
572
573 if (!clusterRequest.isFireAndForget()) {
574 ClusterNodeResponse clusterNodeResponse =
575 generateClusterNodeResponse(
576 clusterRequest, returnValue, exception);
577
578 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
579 }
580 }
581
582 protected void sendNotifyRequest() {
583 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
584 ClusterMessageType.NOTIFY, _localClusterNode);
585
586 try {
587 _controlJChannel.send(null, clusterRequest);
588 }
589 catch (Exception e) {
590 _log.error("Unable to send notify message", e);
591 }
592 }
593
594 private static final String _DEFAULT_CLUSTER_NAME =
595 "LIFERAY-CONTROL-CHANNEL";
596
597 private static final Log _log = LogFactoryUtil.getLog(
598 ClusterExecutorImpl.class);
599
600 private final CopyOnWriteArrayList<ClusterEventListener>
601 _clusterEventListeners =
602 new CopyOnWriteArrayList<ClusterEventListener>();
603 private final Map<String, Address> _clusterNodeAddresses =
604 new ConcurrentHashMap<String, Address>();
605 private JChannel _controlJChannel;
606 private ExecutorService _executorService;
607 private Map<String, FutureClusterResponses> _futureClusterResponses =
608 new ConcurrentReferenceValueHashMap<String, FutureClusterResponses>(
609 FinalizeManager.WEAK_REFERENCE_FACTORY);
610 private final Map<Address, ClusterNode> _liveInstances =
611 new ConcurrentHashMap<Address, ClusterNode>();
612 private Address _localAddress;
613 private ClusterNode _localClusterNode;
614 private boolean _secure;
615 private boolean _shortcutLocalMethod;
616
617 private class ClusterResponseCallbackJob implements Runnable {
618
619 public ClusterResponseCallbackJob(
620 ClusterResponseCallback clusterResponseCallback,
621 FutureClusterResponses futureClusterResponses) {
622
623 _clusterResponseCallback = clusterResponseCallback;
624 _futureClusterResponses = futureClusterResponses;
625 _timeout = -1;
626 _timeoutGet = false;
627 _timeUnit = TimeUnit.SECONDS;
628 }
629
630 public ClusterResponseCallbackJob(
631 ClusterResponseCallback clusterResponseCallback,
632 FutureClusterResponses futureClusterResponses, long timeout,
633 TimeUnit timeUnit) {
634
635 _clusterResponseCallback = clusterResponseCallback;
636 _futureClusterResponses = futureClusterResponses;
637 _timeout = timeout;
638 _timeoutGet = true;
639 _timeUnit = timeUnit;
640 }
641
642 @Override
643 public void run() {
644 BlockingQueue<ClusterNodeResponse> blockingQueue =
645 _futureClusterResponses.getPartialResults();
646
647 _clusterResponseCallback.callback(blockingQueue);
648
649 ClusterNodeResponses clusterNodeResponses = null;
650
651 try {
652 if (_timeoutGet) {
653 clusterNodeResponses = _futureClusterResponses.get(
654 _timeout, _timeUnit);
655 }
656 else {
657 clusterNodeResponses = _futureClusterResponses.get();
658 }
659
660 _clusterResponseCallback.callback(clusterNodeResponses);
661 }
662 catch (InterruptedException ie) {
663 _clusterResponseCallback.processInterruptedException(ie);
664 }
665 catch (TimeoutException te) {
666 _clusterResponseCallback.processTimeoutException(te);
667 }
668 }
669
670 private final ClusterResponseCallback _clusterResponseCallback;
671 private final FutureClusterResponses _futureClusterResponses;
672 private final long _timeout;
673 private final boolean _timeoutGet;
674 private final TimeUnit _timeUnit;
675
676 }
677
678 }