001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.Http;
035 import com.liferay.portal.kernel.util.InetAddressUtil;
036 import com.liferay.portal.kernel.util.MethodHandler;
037 import com.liferay.portal.kernel.util.PropsKeys;
038 import com.liferay.portal.kernel.util.StringUtil;
039 import com.liferay.portal.kernel.util.Validator;
040 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
041 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
042 import com.liferay.portal.util.PortalPortEventListener;
043 import com.liferay.portal.util.PortalPortProtocolEventListener;
044 import com.liferay.portal.util.PortalUtil;
045 import com.liferay.portal.util.PropsUtil;
046 import com.liferay.portal.util.PropsValues;
047
048 import java.io.Serializable;
049
050 import java.net.InetAddress;
051
052 import java.util.ArrayList;
053 import java.util.Collection;
054 import java.util.Collections;
055 import java.util.List;
056 import java.util.Map;
057 import java.util.Properties;
058 import java.util.concurrent.BlockingQueue;
059 import java.util.concurrent.ConcurrentHashMap;
060 import java.util.concurrent.CopyOnWriteArrayList;
061 import java.util.concurrent.ExecutorService;
062 import java.util.concurrent.TimeUnit;
063 import java.util.concurrent.TimeoutException;
064
065 import org.jgroups.JChannel;
066
067
071 @DoPrivileged
072 public class ClusterExecutorImpl
073 extends ClusterBase
074 implements ClusterExecutor, PortalPortEventListener,
075 PortalPortProtocolEventListener {
076
077 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
078 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
079
080 @Override
081 public void addClusterEventListener(
082 ClusterEventListener clusterEventListener) {
083
084 if (!isEnabled()) {
085 return;
086 }
087
088 _clusterEventListeners.addIfAbsent(clusterEventListener);
089 }
090
091 @Override
092 public void afterPropertiesSet() {
093 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
094 addClusterEventListener(new DebuggingClusterEventListenerImpl());
095 }
096
097 if (PropsValues.LIVE_USERS_ENABLED) {
098 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
099 }
100
101 _secure = StringUtil.equalsIgnoreCase(
102 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
103
104 if (Validator.isNotNull(PropsValues.PORTAL_INSTANCE_HTTPS_PORT)) {
105 _port = PropsValues.PORTAL_INSTANCE_HTTPS_PORT;
106 }
107 else {
108 _port = PropsValues.PORTAL_INSTANCE_HTTP_PORT;
109 }
110
111 super.afterPropertiesSet();
112 }
113
114 @Override
115 public void destroy() {
116 if (!isEnabled()) {
117 return;
118 }
119
120 PortalExecutorManagerUtil.shutdown(
121 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
122
123 _controlJChannel.setReceiver(null);
124
125 _controlJChannel.close();
126
127 _clusterEventListeners.clear();
128 _clusterNodeAddresses.clear();
129 _futureClusterResponses.clear();
130 _liveInstances.clear();
131 _localAddress = null;
132 _localClusterNode = null;
133 }
134
135 @Override
136 public FutureClusterResponses execute(ClusterRequest clusterRequest)
137 throws SystemException {
138
139 if (!isEnabled()) {
140 return null;
141 }
142
143 List<Address> addresses = prepareAddresses(clusterRequest);
144
145 FutureClusterResponses futureClusterResponses =
146 new FutureClusterResponses(addresses);
147
148 if (!clusterRequest.isFireAndForget()) {
149 String uuid = clusterRequest.getUuid();
150
151 _futureClusterResponses.put(uuid, futureClusterResponses);
152 }
153
154 if (_shortcutLocalMethod &&
155 addresses.remove(getLocalClusterNodeAddress())) {
156
157 runLocalMethod(clusterRequest, futureClusterResponses);
158 }
159
160 if (clusterRequest.isMulticast()) {
161 try {
162 _controlJChannel.send(null, clusterRequest);
163 }
164 catch (Exception e) {
165 throw new SystemException(
166 "Unable to send multicast request", e);
167 }
168 }
169 else {
170 for (Address address : addresses) {
171 org.jgroups.Address jGroupsAddress =
172 (org.jgroups.Address)address.getRealAddress();
173
174 try {
175 _controlJChannel.send(jGroupsAddress, clusterRequest);
176 }
177 catch (Exception e) {
178 throw new SystemException(
179 "Unable to send unicast request", e);
180 }
181 }
182 }
183
184 return futureClusterResponses;
185 }
186
187 @Override
188 public void execute(
189 ClusterRequest clusterRequest,
190 ClusterResponseCallback clusterResponseCallback)
191 throws SystemException {
192
193 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
194
195 ClusterResponseCallbackJob clusterResponseCallbackJob =
196 new ClusterResponseCallbackJob(
197 clusterResponseCallback, futureClusterResponses);
198
199 _executorService.execute(clusterResponseCallbackJob);
200 }
201
202 @Override
203 public void execute(
204 ClusterRequest clusterRequest,
205 ClusterResponseCallback clusterResponseCallback, long timeout,
206 TimeUnit timeUnit)
207 throws SystemException {
208
209 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
210
211 ClusterResponseCallbackJob clusterResponseCallbackJob =
212 new ClusterResponseCallbackJob(
213 clusterResponseCallback, futureClusterResponses, timeout,
214 timeUnit);
215
216 _executorService.execute(clusterResponseCallbackJob);
217 }
218
219 @Override
220 public List<ClusterEventListener> getClusterEventListeners() {
221 if (!isEnabled()) {
222 return Collections.emptyList();
223 }
224
225 return Collections.unmodifiableList(_clusterEventListeners);
226 }
227
228 @Override
229 public List<Address> getClusterNodeAddresses() {
230 if (!isEnabled()) {
231 return Collections.emptyList();
232 }
233
234 return getAddresses(_controlJChannel);
235 }
236
237 @Override
238 public List<ClusterNode> getClusterNodes() {
239 if (!isEnabled()) {
240 return Collections.emptyList();
241 }
242
243 return new ArrayList<ClusterNode>(_liveInstances.values());
244 }
245
246 @Override
247 public ClusterNode getLocalClusterNode() {
248 if (!isEnabled()) {
249 return null;
250 }
251
252 return _localClusterNode;
253 }
254
255 @Override
256 public Address getLocalClusterNodeAddress() {
257 if (!isEnabled()) {
258 return null;
259 }
260
261 return _localAddress;
262 }
263
264 @Override
265 public void initialize() {
266 if (!isEnabled()) {
267 return;
268 }
269
270 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
271 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
272
273 PortalUtil.addPortalPortProtocolEventListener(this);
274
275 _localAddress = new AddressImpl(_controlJChannel.getAddress());
276
277 try {
278 initLocalClusterNode();
279
280 memberJoined(_localAddress, _localClusterNode);
281
282 sendNotifyRequest();
283 }
284 catch (Exception e) {
285 _log.error("Unable to determine local network address", e);
286 }
287
288 ClusterRequestReceiver clusterRequestReceiver =
289 (ClusterRequestReceiver)_controlJChannel.getReceiver();
290
291 clusterRequestReceiver.openLatch();
292 }
293
294 @Override
295 public boolean isClusterNodeAlive(Address address) {
296 if (!isEnabled()) {
297 return false;
298 }
299
300 List<Address> addresses = getAddresses(_controlJChannel);
301
302 return addresses.contains(address);
303 }
304
305 @Override
306 public boolean isClusterNodeAlive(String clusterNodeId) {
307 if (!isEnabled()) {
308 return false;
309 }
310
311 return _clusterNodeAddresses.containsKey(clusterNodeId);
312 }
313
314
318 @Override
319 public void portalPortConfigured(int port) {
320 portalPortProtocolConfigured(port, null);
321 }
322
323 @Override
324 public void portalPortProtocolConfigured(int port, Boolean secure) {
325 if (!isEnabled()) {
326 return;
327 }
328
329 if (Validator.isNotNull(secure)) {
330 String portalProtocol = _localClusterNode.getPortalProtocol();
331
332 if (((secure && portalProtocol.equals(Http.HTTPS)) ||
333 (!secure && portalProtocol.equals(Http.HTTP))) &&
334 (_localClusterNode.getPort() == _port)) {
335
336 return;
337 }
338
339 if (secure) {
340 _localClusterNode.setPortalProtocol(Http.HTTPS);
341 }
342 else {
343 _localClusterNode.setPortalProtocol(Http.HTTP);
344 }
345 }
346 else if (_localClusterNode.getPort() == _port) {
347 return;
348 }
349
350 try {
351 _localClusterNode.setPort(port);
352
353 memberJoined(_localAddress, _localClusterNode);
354
355 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
356 ClusterMessageType.UPDATE, _localClusterNode);
357
358 _controlJChannel.send(null, clusterRequest);
359 }
360 catch (Exception e) {
361 _log.error("Unable to determine configure node port", e);
362 }
363 }
364
365 @Override
366 public void removeClusterEventListener(
367 ClusterEventListener clusterEventListener) {
368
369 if (!isEnabled()) {
370 return;
371 }
372
373 _clusterEventListeners.remove(clusterEventListener);
374 }
375
376 public void setClusterEventListeners(
377 List<ClusterEventListener> clusterEventListeners) {
378
379 if (!isEnabled()) {
380 return;
381 }
382
383 _clusterEventListeners.addAllAbsent(clusterEventListeners);
384 }
385
386 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
387 if (!isEnabled()) {
388 return;
389 }
390
391 _shortcutLocalMethod = shortcutLocalMethod;
392 }
393
394 protected void fireClusterEvent(ClusterEvent clusterEvent) {
395 for (ClusterEventListener listener : _clusterEventListeners) {
396 listener.processClusterEvent(clusterEvent);
397 }
398 }
399
400 protected ClusterNodeResponse generateClusterNodeResponse(
401 ClusterRequest clusterRequest, Object returnValue,
402 Exception exception) {
403
404 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
405
406 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
407 clusterNodeResponse.setClusterNode(getLocalClusterNode());
408 clusterNodeResponse.setClusterMessageType(
409 clusterRequest.getClusterMessageType());
410 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
411 clusterNodeResponse.setUuid(clusterRequest.getUuid());
412
413 if (exception != null) {
414 clusterNodeResponse.setException(exception);
415 }
416 else {
417 if (returnValue instanceof Serializable) {
418 clusterNodeResponse.setResult(returnValue);
419 }
420 else if (returnValue != null) {
421 clusterNodeResponse.setException(
422 new ClusterException("Return value is not serializable"));
423 }
424 }
425
426 return clusterNodeResponse;
427 }
428
429 protected JChannel getControlChannel() {
430 return _controlJChannel;
431 }
432
433 protected FutureClusterResponses getExecutionResults(String uuid) {
434 return _futureClusterResponses.get(uuid);
435 }
436
437 @Override
438 protected void initChannels() throws Exception {
439 Properties controlProperties = PropsUtil.getProperties(
440 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
441
442 String controlProperty = controlProperties.getProperty(
443 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
444
445 ClusterRequestReceiver clusterRequestReceiver =
446 new ClusterRequestReceiver(this);
447
448 _controlJChannel = createJChannel(
449 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
450 }
451
452 protected void initLocalClusterNode() throws Exception {
453 InetAddress inetAddress = bindInetAddress;
454
455 if (inetAddress == null) {
456 inetAddress = InetAddressUtil.getLocalInetAddress();
457 }
458
459 ClusterNode localClusterNode = new ClusterNode(
460 PortalUUIDUtil.generate(), inetAddress);
461
462 int port = _port;
463
464 if (port <= 0) {
465 port = PortalUtil.getPortalPort(_secure);
466 }
467
468 localClusterNode.setPort(port);
469
470 localClusterNode.setPortalProtocol(
471 PropsValues.PORTAL_INSTANCE_PROTOCOL);
472
473 _localClusterNode = localClusterNode;
474 }
475
476 protected boolean isShortcutLocalMethod() {
477 return _shortcutLocalMethod;
478 }
479
480 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
481 _liveInstances.put(joinAddress, clusterNode);
482
483 Address previousAddress = _clusterNodeAddresses.put(
484 clusterNode.getClusterNodeId(), joinAddress);
485
486 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
487 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
488
489
490
491 fireClusterEvent(clusterEvent);
492 }
493 }
494
495 protected void memberRemoved(List<Address> departAddresses) {
496 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
497
498 for (Address departAddress : departAddresses) {
499 ClusterNode departClusterNode = _liveInstances.remove(
500 departAddress);
501
502 if (departClusterNode == null) {
503 continue;
504 }
505
506 departClusterNodes.add(departClusterNode);
507
508 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
509 }
510
511 if (departClusterNodes.isEmpty()) {
512 return;
513 }
514
515 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
516
517 fireClusterEvent(clusterEvent);
518 }
519
520 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
521 boolean isMulticast = clusterRequest.isMulticast();
522
523 List<Address> addresses = null;
524
525 if (isMulticast) {
526 addresses = getAddresses(_controlJChannel);
527 }
528 else {
529 addresses = new ArrayList<Address>();
530
531 Collection<Address> clusterNodeAddresses =
532 clusterRequest.getTargetClusterNodeAddresses();
533
534 if (clusterNodeAddresses != null) {
535 addresses.addAll(clusterNodeAddresses);
536 }
537
538 Collection<String> clusterNodeIds =
539 clusterRequest.getTargetClusterNodeIds();
540
541 if (clusterNodeIds != null) {
542 for (String clusterNodeId : clusterNodeIds) {
543 Address address = _clusterNodeAddresses.get(clusterNodeId);
544
545 addresses.add(address);
546 }
547 }
548 }
549
550 if (clusterRequest.isSkipLocal()) {
551 addresses.remove(getLocalClusterNodeAddress());
552 }
553
554 return addresses;
555 }
556
557 protected void runLocalMethod(
558 ClusterRequest clusterRequest,
559 FutureClusterResponses futureClusterResponses) {
560
561 MethodHandler methodHandler = clusterRequest.getMethodHandler();
562
563 Object returnValue = null;
564 Exception exception = null;
565
566 if (methodHandler == null) {
567 exception = new ClusterException(
568 "Payload is not of type " + MethodHandler.class.getName());
569 }
570 else {
571 try {
572 returnValue = methodHandler.invoke(true);
573 }
574 catch (Exception e) {
575 exception = e;
576 }
577 }
578
579 if (!clusterRequest.isFireAndForget()) {
580 ClusterNodeResponse clusterNodeResponse =
581 generateClusterNodeResponse(
582 clusterRequest, returnValue, exception);
583
584 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
585 }
586 }
587
588 protected void sendNotifyRequest() {
589 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
590 ClusterMessageType.NOTIFY, _localClusterNode);
591
592 try {
593 _controlJChannel.send(null, clusterRequest);
594 }
595 catch (Exception e) {
596 _log.error("Unable to send notify message", e);
597 }
598 }
599
600 private static final String _DEFAULT_CLUSTER_NAME =
601 "LIFERAY-CONTROL-CHANNEL";
602
603 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
604
605 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
606 new CopyOnWriteArrayList<ClusterEventListener>();
607 private Map<String, Address> _clusterNodeAddresses =
608 new ConcurrentHashMap<String, Address>();
609 private JChannel _controlJChannel;
610 private ExecutorService _executorService;
611 private Map<String, FutureClusterResponses> _futureClusterResponses =
612 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
613 private Map<Address, ClusterNode> _liveInstances =
614 new ConcurrentHashMap<Address, ClusterNode>();
615 private Address _localAddress;
616 private ClusterNode _localClusterNode;
617 private int _port;
618 private boolean _secure;
619 private boolean _shortcutLocalMethod;
620
621 private class ClusterResponseCallbackJob implements Runnable {
622
623 public ClusterResponseCallbackJob(
624 ClusterResponseCallback clusterResponseCallback,
625 FutureClusterResponses futureClusterResponses) {
626
627 _clusterResponseCallback = clusterResponseCallback;
628 _futureClusterResponses = futureClusterResponses;
629 _timeout = -1;
630 _timeoutGet = false;
631 _timeUnit = TimeUnit.SECONDS;
632 }
633
634 public ClusterResponseCallbackJob(
635 ClusterResponseCallback clusterResponseCallback,
636 FutureClusterResponses futureClusterResponses, long timeout,
637 TimeUnit timeUnit) {
638
639 _clusterResponseCallback = clusterResponseCallback;
640 _futureClusterResponses = futureClusterResponses;
641 _timeout = timeout;
642 _timeoutGet = true;
643 _timeUnit = timeUnit;
644 }
645
646 @Override
647 public void run() {
648 BlockingQueue<ClusterNodeResponse> blockingQueue =
649 _futureClusterResponses.getPartialResults();
650
651 _clusterResponseCallback.callback(blockingQueue);
652
653 ClusterNodeResponses clusterNodeResponses = null;
654
655 try {
656 if (_timeoutGet) {
657 clusterNodeResponses = _futureClusterResponses.get(
658 _timeout, _timeUnit);
659 }
660 else {
661 clusterNodeResponses = _futureClusterResponses.get();
662 }
663
664 _clusterResponseCallback.callback(clusterNodeResponses);
665 }
666 catch (InterruptedException ie) {
667 _clusterResponseCallback.processInterruptedException(ie);
668 }
669 catch (TimeoutException te) {
670 _clusterResponseCallback.processTimeoutException(te);
671 }
672 }
673
674 private final ClusterResponseCallback _clusterResponseCallback;
675 private final FutureClusterResponses _futureClusterResponses;
676 private final long _timeout;
677 private final boolean _timeoutGet;
678 private final TimeUnit _timeUnit;
679
680 }
681
682 }