001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.Http;
035 import com.liferay.portal.kernel.util.InetAddressUtil;
036 import com.liferay.portal.kernel.util.MethodHandler;
037 import com.liferay.portal.kernel.util.PropsKeys;
038 import com.liferay.portal.kernel.util.StringBundler;
039 import com.liferay.portal.kernel.util.StringUtil;
040 import com.liferay.portal.kernel.util.Validator;
041 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
042 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043 import com.liferay.portal.util.PortalPortEventListener;
044 import com.liferay.portal.util.PortalPortProtocolEventListener;
045 import com.liferay.portal.util.PortalUtil;
046 import com.liferay.portal.util.PropsUtil;
047 import com.liferay.portal.util.PropsValues;
048
049 import java.io.Serializable;
050
051 import java.net.InetAddress;
052
053 import java.util.ArrayList;
054 import java.util.Collection;
055 import java.util.Collections;
056 import java.util.List;
057 import java.util.Map;
058 import java.util.concurrent.BlockingQueue;
059 import java.util.concurrent.ConcurrentHashMap;
060 import java.util.concurrent.CopyOnWriteArrayList;
061 import java.util.concurrent.ExecutorService;
062 import java.util.concurrent.TimeUnit;
063 import java.util.concurrent.TimeoutException;
064
065 import org.jgroups.JChannel;
066
067
071 @DoPrivileged
072 public class ClusterExecutorImpl
073 extends ClusterBase
074 implements ClusterExecutor, PortalPortEventListener,
075 PortalPortProtocolEventListener {
076
077 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
078 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
079
080 public Address getCoordinatorAddress() {
081 return new AddressImpl(_baseReceiver.getCoordinator());
082 }
083
084 @Override
085 public void addClusterEventListener(
086 ClusterEventListener clusterEventListener) {
087
088 if (!isEnabled()) {
089 return;
090 }
091
092 _clusterEventListeners.addIfAbsent(clusterEventListener);
093 }
094
095 @Override
096 public void afterPropertiesSet() {
097 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
098 addClusterEventListener(new DebuggingClusterEventListenerImpl());
099 }
100
101 if (PropsValues.LIVE_USERS_ENABLED) {
102 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
103 }
104
105 super.afterPropertiesSet();
106 }
107
108 @Override
109 public void destroy() {
110 if (!isEnabled()) {
111 return;
112 }
113
114 PortalExecutorManagerUtil.shutdown(
115 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
116
117 _controlJChannel.setReceiver(null);
118
119 _controlJChannel.close();
120
121 _clusterEventListeners.clear();
122 _clusterNodeAddresses.clear();
123 _futureClusterResponses.clear();
124 _liveInstances.clear();
125 _localAddress = null;
126 _localClusterNode = null;
127 }
128
129 @Override
130 public FutureClusterResponses execute(ClusterRequest clusterRequest)
131 throws SystemException {
132
133 if (!isEnabled()) {
134 return null;
135 }
136
137 List<Address> addresses = prepareAddresses(clusterRequest);
138
139 FutureClusterResponses futureClusterResponses =
140 new FutureClusterResponses(addresses);
141
142 if (!clusterRequest.isFireAndForget()) {
143 String uuid = clusterRequest.getUuid();
144
145 _futureClusterResponses.put(uuid, futureClusterResponses);
146 }
147
148 if (_shortcutLocalMethod &&
149 addresses.remove(getLocalClusterNodeAddress())) {
150
151 runLocalMethod(clusterRequest, futureClusterResponses);
152 }
153
154 if (clusterRequest.isMulticast()) {
155 try {
156 sendJGroupsMessage(_controlJChannel, null, clusterRequest);
157 }
158 catch (Exception e) {
159 throw new SystemException(
160 "Unable to send multicast request", e);
161 }
162 }
163 else {
164 for (Address address : addresses) {
165 org.jgroups.Address jGroupsAddress =
166 (org.jgroups.Address)address.getRealAddress();
167
168 try {
169 sendJGroupsMessage(
170 _controlJChannel, jGroupsAddress, clusterRequest);
171 }
172 catch (Exception e) {
173 throw new SystemException(
174 "Unable to send unicast request", e);
175 }
176 }
177 }
178
179 return futureClusterResponses;
180 }
181
182 @Override
183 public void execute(
184 ClusterRequest clusterRequest,
185 ClusterResponseCallback clusterResponseCallback)
186 throws SystemException {
187
188 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
189
190 ClusterResponseCallbackJob clusterResponseCallbackJob =
191 new ClusterResponseCallbackJob(
192 clusterResponseCallback, futureClusterResponses);
193
194 _executorService.execute(clusterResponseCallbackJob);
195 }
196
197 @Override
198 public void execute(
199 ClusterRequest clusterRequest,
200 ClusterResponseCallback clusterResponseCallback, long timeout,
201 TimeUnit timeUnit)
202 throws SystemException {
203
204 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
205
206 ClusterResponseCallbackJob clusterResponseCallbackJob =
207 new ClusterResponseCallbackJob(
208 clusterResponseCallback, futureClusterResponses, timeout,
209 timeUnit);
210
211 _executorService.execute(clusterResponseCallbackJob);
212 }
213
214 @Override
215 public List<ClusterEventListener> getClusterEventListeners() {
216 if (!isEnabled()) {
217 return Collections.emptyList();
218 }
219
220 return Collections.unmodifiableList(_clusterEventListeners);
221 }
222
223 @Override
224 public List<Address> getClusterNodeAddresses() {
225 if (!isEnabled()) {
226 return Collections.emptyList();
227 }
228
229 return getAddresses(_controlJChannel);
230 }
231
232 @Override
233 public List<ClusterNode> getClusterNodes() {
234 if (!isEnabled()) {
235 return Collections.emptyList();
236 }
237
238 return new ArrayList<ClusterNode>(_liveInstances.values());
239 }
240
241 @Override
242 public ClusterNode getLocalClusterNode() {
243 if (!isEnabled()) {
244 return null;
245 }
246
247 return _localClusterNode;
248 }
249
250 @Override
251 public Address getLocalClusterNodeAddress() {
252 if (!isEnabled()) {
253 return null;
254 }
255
256 return _localAddress;
257 }
258
259 @Override
260 public void initialize() {
261 if (!isEnabled()) {
262 return;
263 }
264
265 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
266 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
267
268 PortalUtil.addPortalPortProtocolEventListener(this);
269
270 _localAddress = new AddressImpl(_controlJChannel.getAddress());
271
272 try {
273 initLocalClusterNode();
274
275 memberJoined(_localAddress, _localClusterNode);
276
277 sendNotifyRequest();
278 }
279 catch (Exception e) {
280 _log.error("Unable to determine local network address", e);
281 }
282
283 _baseReceiver.openLatch();
284 }
285
286 @Override
287 public boolean isClusterNodeAlive(Address address) {
288 if (!isEnabled()) {
289 return false;
290 }
291
292 List<Address> addresses = getAddresses(_controlJChannel);
293
294 return addresses.contains(address);
295 }
296
297 @Override
298 public boolean isClusterNodeAlive(String clusterNodeId) {
299 if (!isEnabled()) {
300 return false;
301 }
302
303 return _clusterNodeAddresses.containsKey(clusterNodeId);
304 }
305
306
310 @Override
311 public void portalPortConfigured(int port) {
312 portalPortProtocolConfigured(port, null);
313 }
314
315 @Override
316 public void portalPortProtocolConfigured(int port, Boolean secure) {
317 if (!isEnabled() || (port <= 0) || (secure == null)) {
318 return;
319 }
320
321 if (Validator.isNotNull(_localClusterNode.getPortalProtocol())) {
322 return;
323 }
324
325 if (secure) {
326 _localClusterNode.setPortalProtocol(Http.HTTPS);
327 }
328 else {
329 _localClusterNode.setPortalProtocol(Http.HTTP);
330 }
331
332 try {
333 _localClusterNode.setPort(port);
334
335 memberJoined(_localAddress, _localClusterNode);
336
337 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
338 ClusterMessageType.UPDATE, _localClusterNode);
339
340 sendJGroupsMessage(_controlJChannel, null, clusterRequest);
341 }
342 catch (Exception e) {
343 _log.error("Unable to determine configure node port", e);
344 }
345 }
346
347 @Override
348 public void removeClusterEventListener(
349 ClusterEventListener clusterEventListener) {
350
351 if (!isEnabled()) {
352 return;
353 }
354
355 _clusterEventListeners.remove(clusterEventListener);
356 }
357
358 public void setClusterEventListeners(
359 List<ClusterEventListener> clusterEventListeners) {
360
361 if (!isEnabled()) {
362 return;
363 }
364
365 _clusterEventListeners.addAllAbsent(clusterEventListeners);
366 }
367
368 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
369 if (!isEnabled()) {
370 return;
371 }
372
373 _shortcutLocalMethod = shortcutLocalMethod;
374 }
375
376 protected void fireClusterEvent(ClusterEvent clusterEvent) {
377 for (ClusterEventListener listener : _clusterEventListeners) {
378 listener.processClusterEvent(clusterEvent);
379 }
380 }
381
382 protected ClusterNodeResponse generateClusterNodeResponse(
383 ClusterRequest clusterRequest, Object returnValue,
384 Exception exception) {
385
386 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
387
388 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
389 clusterNodeResponse.setClusterNode(getLocalClusterNode());
390 clusterNodeResponse.setClusterMessageType(
391 clusterRequest.getClusterMessageType());
392 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
393 clusterNodeResponse.setUuid(clusterRequest.getUuid());
394
395 if (exception != null) {
396 clusterNodeResponse.setException(exception);
397 }
398 else {
399 if (returnValue instanceof Serializable) {
400 clusterNodeResponse.setResult(returnValue);
401 }
402 else if (returnValue != null) {
403 clusterNodeResponse.setException(
404 new ClusterException("Return value is not serializable"));
405 }
406 }
407
408 return clusterNodeResponse;
409 }
410
411 protected JChannel getControlChannel() {
412 return _controlJChannel;
413 }
414
415 protected FutureClusterResponses getExecutionResults(String uuid) {
416 return _futureClusterResponses.get(uuid);
417 }
418
419 @Override
420 protected void initChannels() throws Exception {
421 String channelName = PropsUtil.get(
422 PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL);
423
424 if (Validator.isNull(channelName)) {
425 throw new IllegalStateException(
426 "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL +
427 "\"");
428 }
429
430 String controlProperty = PropsUtil.get(
431 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
432
433 if (Validator.isNull(controlProperty)) {
434 throw new IllegalStateException(
435 "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL +
436 "\"");
437 }
438
439 _baseReceiver = new ClusterRequestReceiver(this);
440
441 _controlJChannel = createJChannel(
442 controlProperty, _baseReceiver, channelName);
443 }
444
445 protected void initLocalClusterNode() throws Exception {
446 InetAddress inetAddress = bindInetAddress;
447
448 if (inetAddress == null) {
449 inetAddress = InetAddressUtil.getLocalInetAddress();
450 }
451
452 ClusterNode localClusterNode = new ClusterNode(
453 PortalUUIDUtil.generate(), inetAddress);
454
455 if (StringUtil.equalsIgnoreCase(
456 Http.HTTPS, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
457 (PropsValues.PORTAL_INSTANCE_HTTPS_PORT > 0)) {
458
459 localClusterNode.setPortalProtocol(Http.HTTPS);
460 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTPS_PORT);
461 }
462 else if (StringUtil.equalsIgnoreCase(
463 Http.HTTP, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
464 (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0)) {
465
466 localClusterNode.setPortalProtocol(Http.HTTP);
467 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
468 }
469 else {
470 if (_log.isWarnEnabled()) {
471 StringBundler sb = new StringBundler(8);
472
473 sb.append("Unable to configure node protocol and port. ");
474 sb.append("This configuration will be inferred dynamically ");
475 sb.append("from the first request but static configuration ");
476 sb.append("is recomended to avoid comunications problems ");
477 sb.append("between nodes. Please set the right values for ");
478 sb.append("\"portal.instance.protocol\" and ");
479 sb.append("\"portal.instance.http.port\" or ");
480 sb.append("\"portal.instance.https.port\".");
481
482 _log.warn(sb.toString());
483 }
484 }
485
486 _localClusterNode = localClusterNode;
487
488 if (_log.isDebugEnabled()) {
489 _log.debug(
490 "Initialized cluster node: " + localClusterNode.toString());
491 }
492 }
493
494 protected boolean isShortcutLocalMethod() {
495 return _shortcutLocalMethod;
496 }
497
498 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
499 _liveInstances.put(joinAddress, clusterNode);
500
501 Address previousAddress = _clusterNodeAddresses.put(
502 clusterNode.getClusterNodeId(), joinAddress);
503
504 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
505 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
506
507
508
509 fireClusterEvent(clusterEvent);
510 }
511 }
512
513 protected void memberRemoved(List<Address> departAddresses) {
514 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
515
516 for (Address departAddress : departAddresses) {
517 ClusterNode departClusterNode = _liveInstances.remove(
518 departAddress);
519
520 if (departClusterNode == null) {
521 continue;
522 }
523
524 departClusterNodes.add(departClusterNode);
525
526 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
527 }
528
529 if (departClusterNodes.isEmpty()) {
530 return;
531 }
532
533 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
534
535 fireClusterEvent(clusterEvent);
536 }
537
538 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
539 boolean isMulticast = clusterRequest.isMulticast();
540
541 List<Address> addresses = null;
542
543 if (isMulticast) {
544 addresses = getAddresses(_controlJChannel);
545 }
546 else {
547 addresses = new ArrayList<Address>();
548
549 Collection<Address> clusterNodeAddresses =
550 clusterRequest.getTargetClusterNodeAddresses();
551
552 if (clusterNodeAddresses != null) {
553 addresses.addAll(clusterNodeAddresses);
554 }
555
556 Collection<String> clusterNodeIds =
557 clusterRequest.getTargetClusterNodeIds();
558
559 if (clusterNodeIds != null) {
560 for (String clusterNodeId : clusterNodeIds) {
561 Address address = _clusterNodeAddresses.get(clusterNodeId);
562
563 addresses.add(address);
564 }
565 }
566 }
567
568 if (clusterRequest.isSkipLocal()) {
569 addresses.remove(getLocalClusterNodeAddress());
570 }
571
572 return addresses;
573 }
574
575 protected void runLocalMethod(
576 ClusterRequest clusterRequest,
577 FutureClusterResponses futureClusterResponses) {
578
579 MethodHandler methodHandler = clusterRequest.getMethodHandler();
580
581 Object returnValue = null;
582 Exception exception = null;
583
584 if (methodHandler == null) {
585 exception = new ClusterException(
586 "Payload is not of type " + MethodHandler.class.getName());
587 }
588 else {
589 try {
590 returnValue = methodHandler.invoke(true);
591 }
592 catch (Exception e) {
593 exception = e;
594 }
595 }
596
597 if (!clusterRequest.isFireAndForget()) {
598 ClusterNodeResponse clusterNodeResponse =
599 generateClusterNodeResponse(
600 clusterRequest, returnValue, exception);
601
602 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
603 }
604 }
605
606 protected void sendNotifyRequest() {
607 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
608 ClusterMessageType.NOTIFY, _localClusterNode);
609
610 try {
611 sendJGroupsMessage(_controlJChannel, null, clusterRequest);
612 }
613 catch (Exception e) {
614 _log.error("Unable to send notify message", e);
615 }
616 }
617
618 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
619
620 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
621 new CopyOnWriteArrayList<ClusterEventListener>();
622 private Map<String, Address> _clusterNodeAddresses =
623 new ConcurrentHashMap<String, Address>();
624 private JChannel _controlJChannel;
625 private ExecutorService _executorService;
626 private Map<String, FutureClusterResponses> _futureClusterResponses =
627 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
628 private Map<Address, ClusterNode> _liveInstances =
629 new ConcurrentHashMap<Address, ClusterNode>();
630 private Address _localAddress;
631 private ClusterNode _localClusterNode;
632 private boolean _shortcutLocalMethod;
633 private BaseReceiver _baseReceiver;
634
635 private class ClusterResponseCallbackJob implements Runnable {
636
637 public ClusterResponseCallbackJob(
638 ClusterResponseCallback clusterResponseCallback,
639 FutureClusterResponses futureClusterResponses) {
640
641 _clusterResponseCallback = clusterResponseCallback;
642 _futureClusterResponses = futureClusterResponses;
643 _timeout = -1;
644 _timeoutGet = false;
645 _timeUnit = TimeUnit.SECONDS;
646 }
647
648 public ClusterResponseCallbackJob(
649 ClusterResponseCallback clusterResponseCallback,
650 FutureClusterResponses futureClusterResponses, long timeout,
651 TimeUnit timeUnit) {
652
653 _clusterResponseCallback = clusterResponseCallback;
654 _futureClusterResponses = futureClusterResponses;
655 _timeout = timeout;
656 _timeoutGet = true;
657 _timeUnit = timeUnit;
658 }
659
660 @Override
661 public void run() {
662 BlockingQueue<ClusterNodeResponse> blockingQueue =
663 _futureClusterResponses.getPartialResults();
664
665 _clusterResponseCallback.callback(blockingQueue);
666
667 ClusterNodeResponses clusterNodeResponses = null;
668
669 try {
670 if (_timeoutGet) {
671 clusterNodeResponses = _futureClusterResponses.get(
672 _timeout, _timeUnit);
673 }
674 else {
675 clusterNodeResponses = _futureClusterResponses.get();
676 }
677
678 _clusterResponseCallback.callback(clusterNodeResponses);
679 }
680 catch (InterruptedException ie) {
681 _clusterResponseCallback.processInterruptedException(ie);
682 }
683 catch (TimeoutException te) {
684 _clusterResponseCallback.processTimeoutException(te);
685 }
686 }
687
688 private final ClusterResponseCallback _clusterResponseCallback;
689 private final FutureClusterResponses _futureClusterResponses;
690 private final long _timeout;
691 private final boolean _timeoutGet;
692 private final TimeUnit _timeUnit;
693
694 }
695
696 }