001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.Http;
035 import com.liferay.portal.kernel.util.InetAddressUtil;
036 import com.liferay.portal.kernel.util.MethodHandler;
037 import com.liferay.portal.kernel.util.PropsKeys;
038 import com.liferay.portal.kernel.util.StringBundler;
039 import com.liferay.portal.kernel.util.StringUtil;
040 import com.liferay.portal.kernel.util.Validator;
041 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
042 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043 import com.liferay.portal.util.PortalPortEventListener;
044 import com.liferay.portal.util.PortalPortProtocolEventListener;
045 import com.liferay.portal.util.PortalUtil;
046 import com.liferay.portal.util.PropsUtil;
047 import com.liferay.portal.util.PropsValues;
048
049 import java.io.Serializable;
050
051 import java.net.InetAddress;
052
053 import java.util.ArrayList;
054 import java.util.Collection;
055 import java.util.Collections;
056 import java.util.List;
057 import java.util.Map;
058 import java.util.Properties;
059 import java.util.concurrent.BlockingQueue;
060 import java.util.concurrent.ConcurrentHashMap;
061 import java.util.concurrent.CopyOnWriteArrayList;
062 import java.util.concurrent.ExecutorService;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.TimeoutException;
065
066 import org.jgroups.JChannel;
067
068
072 @DoPrivileged
073 public class ClusterExecutorImpl
074 extends ClusterBase
075 implements ClusterExecutor, PortalPortEventListener,
076 PortalPortProtocolEventListener {
077
078 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
079 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
080
081 @Override
082 public void addClusterEventListener(
083 ClusterEventListener clusterEventListener) {
084
085 if (!isEnabled()) {
086 return;
087 }
088
089 _clusterEventListeners.addIfAbsent(clusterEventListener);
090 }
091
092 @Override
093 public void afterPropertiesSet() {
094 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
095 addClusterEventListener(new DebuggingClusterEventListenerImpl());
096 }
097
098 if (PropsValues.LIVE_USERS_ENABLED) {
099 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
100 }
101
102 super.afterPropertiesSet();
103 }
104
105 @Override
106 public void destroy() {
107 if (!isEnabled()) {
108 return;
109 }
110
111 PortalExecutorManagerUtil.shutdown(
112 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
113
114 _controlJChannel.setReceiver(null);
115
116 _controlJChannel.close();
117
118 _clusterEventListeners.clear();
119 _clusterNodeAddresses.clear();
120 _futureClusterResponses.clear();
121 _liveInstances.clear();
122 _localAddress = null;
123 _localClusterNode = null;
124 }
125
126 @Override
127 public FutureClusterResponses execute(ClusterRequest clusterRequest)
128 throws SystemException {
129
130 if (!isEnabled()) {
131 return null;
132 }
133
134 List<Address> addresses = prepareAddresses(clusterRequest);
135
136 FutureClusterResponses futureClusterResponses =
137 new FutureClusterResponses(addresses);
138
139 if (!clusterRequest.isFireAndForget()) {
140 String uuid = clusterRequest.getUuid();
141
142 _futureClusterResponses.put(uuid, futureClusterResponses);
143 }
144
145 if (_shortcutLocalMethod &&
146 addresses.remove(getLocalClusterNodeAddress())) {
147
148 runLocalMethod(clusterRequest, futureClusterResponses);
149 }
150
151 if (clusterRequest.isMulticast()) {
152 try {
153 _controlJChannel.send(null, clusterRequest);
154 }
155 catch (Exception e) {
156 throw new SystemException(
157 "Unable to send multicast request", e);
158 }
159 }
160 else {
161 for (Address address : addresses) {
162 org.jgroups.Address jGroupsAddress =
163 (org.jgroups.Address)address.getRealAddress();
164
165 try {
166 _controlJChannel.send(jGroupsAddress, clusterRequest);
167 }
168 catch (Exception e) {
169 throw new SystemException(
170 "Unable to send unicast request", e);
171 }
172 }
173 }
174
175 return futureClusterResponses;
176 }
177
178 @Override
179 public void execute(
180 ClusterRequest clusterRequest,
181 ClusterResponseCallback clusterResponseCallback)
182 throws SystemException {
183
184 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
185
186 ClusterResponseCallbackJob clusterResponseCallbackJob =
187 new ClusterResponseCallbackJob(
188 clusterResponseCallback, futureClusterResponses);
189
190 _executorService.execute(clusterResponseCallbackJob);
191 }
192
193 @Override
194 public void execute(
195 ClusterRequest clusterRequest,
196 ClusterResponseCallback clusterResponseCallback, long timeout,
197 TimeUnit timeUnit)
198 throws SystemException {
199
200 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
201
202 ClusterResponseCallbackJob clusterResponseCallbackJob =
203 new ClusterResponseCallbackJob(
204 clusterResponseCallback, futureClusterResponses, timeout,
205 timeUnit);
206
207 _executorService.execute(clusterResponseCallbackJob);
208 }
209
210 @Override
211 public List<ClusterEventListener> getClusterEventListeners() {
212 if (!isEnabled()) {
213 return Collections.emptyList();
214 }
215
216 return Collections.unmodifiableList(_clusterEventListeners);
217 }
218
219 @Override
220 public List<Address> getClusterNodeAddresses() {
221 if (!isEnabled()) {
222 return Collections.emptyList();
223 }
224
225 return getAddresses(_controlJChannel);
226 }
227
228 @Override
229 public List<ClusterNode> getClusterNodes() {
230 if (!isEnabled()) {
231 return Collections.emptyList();
232 }
233
234 return new ArrayList<ClusterNode>(_liveInstances.values());
235 }
236
237 @Override
238 public ClusterNode getLocalClusterNode() {
239 if (!isEnabled()) {
240 return null;
241 }
242
243 return _localClusterNode;
244 }
245
246 @Override
247 public Address getLocalClusterNodeAddress() {
248 if (!isEnabled()) {
249 return null;
250 }
251
252 return _localAddress;
253 }
254
255 @Override
256 public void initialize() {
257 if (!isEnabled()) {
258 return;
259 }
260
261 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
262 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
263
264 PortalUtil.addPortalPortProtocolEventListener(this);
265
266 _localAddress = new AddressImpl(_controlJChannel.getAddress());
267
268 try {
269 initLocalClusterNode();
270
271 memberJoined(_localAddress, _localClusterNode);
272
273 sendNotifyRequest();
274 }
275 catch (Exception e) {
276 _log.error("Unable to determine local network address", e);
277 }
278
279 ClusterRequestReceiver clusterRequestReceiver =
280 (ClusterRequestReceiver)_controlJChannel.getReceiver();
281
282 clusterRequestReceiver.openLatch();
283 }
284
285 @Override
286 public boolean isClusterNodeAlive(Address address) {
287 if (!isEnabled()) {
288 return false;
289 }
290
291 List<Address> addresses = getAddresses(_controlJChannel);
292
293 return addresses.contains(address);
294 }
295
296 @Override
297 public boolean isClusterNodeAlive(String clusterNodeId) {
298 if (!isEnabled()) {
299 return false;
300 }
301
302 return _clusterNodeAddresses.containsKey(clusterNodeId);
303 }
304
305
309 @Override
310 public void portalPortConfigured(int port) {
311 portalPortProtocolConfigured(port, null);
312 }
313
314 @Override
315 public void portalPortProtocolConfigured(int port, Boolean secure) {
316 if (!isEnabled() || (port <= 0) || (secure == null)) {
317 return;
318 }
319
320 if (Validator.isNotNull(_localClusterNode.getPortalProtocol())) {
321 return;
322 }
323
324 if (secure) {
325 _localClusterNode.setPortalProtocol(Http.HTTPS);
326 }
327 else {
328 _localClusterNode.setPortalProtocol(Http.HTTP);
329 }
330
331 try {
332 _localClusterNode.setPort(port);
333
334 memberJoined(_localAddress, _localClusterNode);
335
336 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
337 ClusterMessageType.UPDATE, _localClusterNode);
338
339 _controlJChannel.send(null, clusterRequest);
340 }
341 catch (Exception e) {
342 _log.error("Unable to determine configure node port", e);
343 }
344 }
345
346 @Override
347 public void removeClusterEventListener(
348 ClusterEventListener clusterEventListener) {
349
350 if (!isEnabled()) {
351 return;
352 }
353
354 _clusterEventListeners.remove(clusterEventListener);
355 }
356
357 public void setClusterEventListeners(
358 List<ClusterEventListener> clusterEventListeners) {
359
360 if (!isEnabled()) {
361 return;
362 }
363
364 _clusterEventListeners.addAllAbsent(clusterEventListeners);
365 }
366
367 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
368 if (!isEnabled()) {
369 return;
370 }
371
372 _shortcutLocalMethod = shortcutLocalMethod;
373 }
374
375 protected void fireClusterEvent(ClusterEvent clusterEvent) {
376 for (ClusterEventListener listener : _clusterEventListeners) {
377 listener.processClusterEvent(clusterEvent);
378 }
379 }
380
381 protected ClusterNodeResponse generateClusterNodeResponse(
382 ClusterRequest clusterRequest, Object returnValue,
383 Exception exception) {
384
385 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
386
387 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
388 clusterNodeResponse.setClusterNode(getLocalClusterNode());
389 clusterNodeResponse.setClusterMessageType(
390 clusterRequest.getClusterMessageType());
391 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
392 clusterNodeResponse.setUuid(clusterRequest.getUuid());
393
394 if (exception != null) {
395 clusterNodeResponse.setException(exception);
396 }
397 else {
398 if (returnValue instanceof Serializable) {
399 clusterNodeResponse.setResult(returnValue);
400 }
401 else if (returnValue != null) {
402 clusterNodeResponse.setException(
403 new ClusterException("Return value is not serializable"));
404 }
405 }
406
407 return clusterNodeResponse;
408 }
409
410 protected JChannel getControlChannel() {
411 return _controlJChannel;
412 }
413
414 protected FutureClusterResponses getExecutionResults(String uuid) {
415 return _futureClusterResponses.get(uuid);
416 }
417
418 @Override
419 protected void initChannels() throws Exception {
420 Properties controlProperties = PropsUtil.getProperties(
421 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
422
423 String controlProperty = controlProperties.getProperty(
424 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
425
426 ClusterRequestReceiver clusterRequestReceiver =
427 new ClusterRequestReceiver(this);
428
429 _controlJChannel = createJChannel(
430 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
431 }
432
433 protected void initLocalClusterNode() throws Exception {
434 InetAddress inetAddress = bindInetAddress;
435
436 if (inetAddress == null) {
437 inetAddress = InetAddressUtil.getLocalInetAddress();
438 }
439
440 ClusterNode localClusterNode = new ClusterNode(
441 PortalUUIDUtil.generate(), inetAddress);
442
443 if (StringUtil.equalsIgnoreCase(
444 Http.HTTPS, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
445 (PropsValues.PORTAL_INSTANCE_HTTPS_PORT > 0)) {
446
447 localClusterNode.setPortalProtocol(Http.HTTPS);
448 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTPS_PORT);
449 }
450 else if (StringUtil.equalsIgnoreCase(
451 Http.HTTP, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
452 (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0)) {
453
454 localClusterNode.setPortalProtocol(Http.HTTP);
455 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
456 }
457 else {
458 if (_log.isWarnEnabled()) {
459 StringBundler sb = new StringBundler(8);
460
461 sb.append("Unable to configure node protocol and port. ");
462 sb.append("This configuration will be inferred dynamically ");
463 sb.append("from the first request but static configuration ");
464 sb.append("is recomended to avoid comunications problems ");
465 sb.append("between nodes. Please set the right values for ");
466 sb.append("\"portal.instance.protocol\" and ");
467 sb.append("\"portal.instance.http.port\" or ");
468 sb.append("\"portal.instance.https.port\".");
469
470 _log.warn(sb.toString());
471 }
472 }
473
474 _localClusterNode = localClusterNode;
475
476 if (_log.isDebugEnabled()) {
477 _log.debug(
478 "Initialized cluster node: " + localClusterNode.toString());
479 }
480 }
481
482 protected boolean isShortcutLocalMethod() {
483 return _shortcutLocalMethod;
484 }
485
486 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
487 _liveInstances.put(joinAddress, clusterNode);
488
489 Address previousAddress = _clusterNodeAddresses.put(
490 clusterNode.getClusterNodeId(), joinAddress);
491
492 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
493 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
494
495
496
497 fireClusterEvent(clusterEvent);
498 }
499 }
500
501 protected void memberRemoved(List<Address> departAddresses) {
502 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
503
504 for (Address departAddress : departAddresses) {
505 ClusterNode departClusterNode = _liveInstances.remove(
506 departAddress);
507
508 if (departClusterNode == null) {
509 continue;
510 }
511
512 departClusterNodes.add(departClusterNode);
513
514 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
515 }
516
517 if (departClusterNodes.isEmpty()) {
518 return;
519 }
520
521 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
522
523 fireClusterEvent(clusterEvent);
524 }
525
526 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
527 boolean isMulticast = clusterRequest.isMulticast();
528
529 List<Address> addresses = null;
530
531 if (isMulticast) {
532 addresses = getAddresses(_controlJChannel);
533 }
534 else {
535 addresses = new ArrayList<Address>();
536
537 Collection<Address> clusterNodeAddresses =
538 clusterRequest.getTargetClusterNodeAddresses();
539
540 if (clusterNodeAddresses != null) {
541 addresses.addAll(clusterNodeAddresses);
542 }
543
544 Collection<String> clusterNodeIds =
545 clusterRequest.getTargetClusterNodeIds();
546
547 if (clusterNodeIds != null) {
548 for (String clusterNodeId : clusterNodeIds) {
549 Address address = _clusterNodeAddresses.get(clusterNodeId);
550
551 addresses.add(address);
552 }
553 }
554 }
555
556 if (clusterRequest.isSkipLocal()) {
557 addresses.remove(getLocalClusterNodeAddress());
558 }
559
560 return addresses;
561 }
562
563 protected void runLocalMethod(
564 ClusterRequest clusterRequest,
565 FutureClusterResponses futureClusterResponses) {
566
567 MethodHandler methodHandler = clusterRequest.getMethodHandler();
568
569 Object returnValue = null;
570 Exception exception = null;
571
572 if (methodHandler == null) {
573 exception = new ClusterException(
574 "Payload is not of type " + MethodHandler.class.getName());
575 }
576 else {
577 try {
578 returnValue = methodHandler.invoke(true);
579 }
580 catch (Exception e) {
581 exception = e;
582 }
583 }
584
585 if (!clusterRequest.isFireAndForget()) {
586 ClusterNodeResponse clusterNodeResponse =
587 generateClusterNodeResponse(
588 clusterRequest, returnValue, exception);
589
590 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
591 }
592 }
593
594 protected void sendNotifyRequest() {
595 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
596 ClusterMessageType.NOTIFY, _localClusterNode);
597
598 try {
599 _controlJChannel.send(null, clusterRequest);
600 }
601 catch (Exception e) {
602 _log.error("Unable to send notify message", e);
603 }
604 }
605
606 private static final String _DEFAULT_CLUSTER_NAME =
607 "LIFERAY-CONTROL-CHANNEL";
608
609 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
610
611 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
612 new CopyOnWriteArrayList<ClusterEventListener>();
613 private Map<String, Address> _clusterNodeAddresses =
614 new ConcurrentHashMap<String, Address>();
615 private JChannel _controlJChannel;
616 private ExecutorService _executorService;
617 private Map<String, FutureClusterResponses> _futureClusterResponses =
618 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
619 private Map<Address, ClusterNode> _liveInstances =
620 new ConcurrentHashMap<Address, ClusterNode>();
621 private Address _localAddress;
622 private ClusterNode _localClusterNode;
623 private boolean _shortcutLocalMethod;
624
625 private class ClusterResponseCallbackJob implements Runnable {
626
627 public ClusterResponseCallbackJob(
628 ClusterResponseCallback clusterResponseCallback,
629 FutureClusterResponses futureClusterResponses) {
630
631 _clusterResponseCallback = clusterResponseCallback;
632 _futureClusterResponses = futureClusterResponses;
633 _timeout = -1;
634 _timeoutGet = false;
635 _timeUnit = TimeUnit.SECONDS;
636 }
637
638 public ClusterResponseCallbackJob(
639 ClusterResponseCallback clusterResponseCallback,
640 FutureClusterResponses futureClusterResponses, long timeout,
641 TimeUnit timeUnit) {
642
643 _clusterResponseCallback = clusterResponseCallback;
644 _futureClusterResponses = futureClusterResponses;
645 _timeout = timeout;
646 _timeoutGet = true;
647 _timeUnit = timeUnit;
648 }
649
650 @Override
651 public void run() {
652 BlockingQueue<ClusterNodeResponse> blockingQueue =
653 _futureClusterResponses.getPartialResults();
654
655 _clusterResponseCallback.callback(blockingQueue);
656
657 ClusterNodeResponses clusterNodeResponses = null;
658
659 try {
660 if (_timeoutGet) {
661 clusterNodeResponses = _futureClusterResponses.get(
662 _timeout, _timeUnit);
663 }
664 else {
665 clusterNodeResponses = _futureClusterResponses.get();
666 }
667
668 _clusterResponseCallback.callback(clusterNodeResponses);
669 }
670 catch (InterruptedException ie) {
671 _clusterResponseCallback.processInterruptedException(ie);
672 }
673 catch (TimeoutException te) {
674 _clusterResponseCallback.processTimeoutException(te);
675 }
676 }
677
678 private final ClusterResponseCallback _clusterResponseCallback;
679 private final FutureClusterResponses _futureClusterResponses;
680 private final long _timeout;
681 private final boolean _timeoutGet;
682 private final TimeUnit _timeUnit;
683
684 }
685
686 }