001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.Http;
035 import com.liferay.portal.kernel.util.InetAddressUtil;
036 import com.liferay.portal.kernel.util.MethodHandler;
037 import com.liferay.portal.kernel.util.PropsKeys;
038 import com.liferay.portal.kernel.util.StringUtil;
039 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
040 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
041 import com.liferay.portal.util.PortalPortEventListener;
042 import com.liferay.portal.util.PortalUtil;
043 import com.liferay.portal.util.PropsUtil;
044 import com.liferay.portal.util.PropsValues;
045
046 import java.io.Serializable;
047
048 import java.net.InetAddress;
049
050 import java.util.ArrayList;
051 import java.util.Collection;
052 import java.util.Collections;
053 import java.util.List;
054 import java.util.Map;
055 import java.util.Properties;
056 import java.util.concurrent.BlockingQueue;
057 import java.util.concurrent.ConcurrentHashMap;
058 import java.util.concurrent.CopyOnWriteArrayList;
059 import java.util.concurrent.ExecutorService;
060 import java.util.concurrent.TimeUnit;
061 import java.util.concurrent.TimeoutException;
062
063 import org.jgroups.JChannel;
064
065
069 @DoPrivileged
070 public class ClusterExecutorImpl
071 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
072
073 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
074 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
075
076 @Override
077 public void addClusterEventListener(
078 ClusterEventListener clusterEventListener) {
079
080 if (!isEnabled()) {
081 return;
082 }
083
084 _clusterEventListeners.addIfAbsent(clusterEventListener);
085 }
086
087 @Override
088 public void afterPropertiesSet() {
089 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
090 addClusterEventListener(new DebuggingClusterEventListenerImpl());
091 }
092
093 if (PropsValues.LIVE_USERS_ENABLED) {
094 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
095 }
096
097 _secure = StringUtil.equalsIgnoreCase(
098 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
099
100 if (_secure) {
101 _port = PropsValues.PORTAL_INSTANCE_HTTPS_PORT;
102 }
103 else {
104 _port = PropsValues.PORTAL_INSTANCE_HTTP_PORT;
105 }
106
107 super.afterPropertiesSet();
108 }
109
110 @Override
111 public void destroy() {
112 if (!isEnabled()) {
113 return;
114 }
115
116 PortalExecutorManagerUtil.shutdown(
117 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
118
119 _controlJChannel.setReceiver(null);
120
121 _controlJChannel.close();
122
123 _clusterEventListeners.clear();
124 _clusterNodeAddresses.clear();
125 _futureClusterResponses.clear();
126 _liveInstances.clear();
127 _localAddress = null;
128 _localClusterNode = null;
129 }
130
131 @Override
132 public FutureClusterResponses execute(ClusterRequest clusterRequest)
133 throws SystemException {
134
135 if (!isEnabled()) {
136 return null;
137 }
138
139 List<Address> addresses = prepareAddresses(clusterRequest);
140
141 FutureClusterResponses futureClusterResponses =
142 new FutureClusterResponses(addresses);
143
144 if (!clusterRequest.isFireAndForget()) {
145 String uuid = clusterRequest.getUuid();
146
147 _futureClusterResponses.put(uuid, futureClusterResponses);
148 }
149
150 if (_shortcutLocalMethod &&
151 addresses.remove(getLocalClusterNodeAddress())) {
152
153 runLocalMethod(clusterRequest, futureClusterResponses);
154 }
155
156 if (clusterRequest.isMulticast()) {
157 try {
158 _controlJChannel.send(null, clusterRequest);
159 }
160 catch (Exception e) {
161 throw new SystemException(
162 "Unable to send multicast request", e);
163 }
164 }
165 else {
166 for (Address address : addresses) {
167 org.jgroups.Address jGroupsAddress =
168 (org.jgroups.Address)address.getRealAddress();
169
170 try {
171 _controlJChannel.send(jGroupsAddress, clusterRequest);
172 }
173 catch (Exception e) {
174 throw new SystemException(
175 "Unable to send unicast request", e);
176 }
177 }
178 }
179
180 return futureClusterResponses;
181 }
182
183 @Override
184 public void execute(
185 ClusterRequest clusterRequest,
186 ClusterResponseCallback clusterResponseCallback)
187 throws SystemException {
188
189 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
190
191 ClusterResponseCallbackJob clusterResponseCallbackJob =
192 new ClusterResponseCallbackJob(
193 clusterResponseCallback, futureClusterResponses);
194
195 _executorService.execute(clusterResponseCallbackJob);
196 }
197
198 @Override
199 public void execute(
200 ClusterRequest clusterRequest,
201 ClusterResponseCallback clusterResponseCallback, long timeout,
202 TimeUnit timeUnit)
203 throws SystemException {
204
205 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
206
207 ClusterResponseCallbackJob clusterResponseCallbackJob =
208 new ClusterResponseCallbackJob(
209 clusterResponseCallback, futureClusterResponses, timeout,
210 timeUnit);
211
212 _executorService.execute(clusterResponseCallbackJob);
213 }
214
215 @Override
216 public List<ClusterEventListener> getClusterEventListeners() {
217 if (!isEnabled()) {
218 return Collections.emptyList();
219 }
220
221 return Collections.unmodifiableList(_clusterEventListeners);
222 }
223
224 @Override
225 public List<Address> getClusterNodeAddresses() {
226 if (!isEnabled()) {
227 return Collections.emptyList();
228 }
229
230 return getAddresses(_controlJChannel);
231 }
232
233 @Override
234 public List<ClusterNode> getClusterNodes() {
235 if (!isEnabled()) {
236 return Collections.emptyList();
237 }
238
239 return new ArrayList<ClusterNode>(_liveInstances.values());
240 }
241
242 @Override
243 public ClusterNode getLocalClusterNode() {
244 if (!isEnabled()) {
245 return null;
246 }
247
248 return _localClusterNode;
249 }
250
251 @Override
252 public Address getLocalClusterNodeAddress() {
253 if (!isEnabled()) {
254 return null;
255 }
256
257 return _localAddress;
258 }
259
260 @Override
261 public void initialize() {
262 if (!isEnabled()) {
263 return;
264 }
265
266 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
267 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
268
269 PortalUtil.addPortalPortEventListener(this);
270
271 _localAddress = new AddressImpl(_controlJChannel.getAddress());
272
273 try {
274 initLocalClusterNode();
275
276 memberJoined(_localAddress, _localClusterNode);
277
278 sendNotifyRequest();
279 }
280 catch (Exception e) {
281 _log.error("Unable to determine local network address", e);
282 }
283
284 ClusterRequestReceiver clusterRequestReceiver =
285 (ClusterRequestReceiver)_controlJChannel.getReceiver();
286
287 clusterRequestReceiver.openLatch();
288 }
289
290 @Override
291 public boolean isClusterNodeAlive(Address address) {
292 if (!isEnabled()) {
293 return false;
294 }
295
296 List<Address> addresses = getAddresses(_controlJChannel);
297
298 return addresses.contains(address);
299 }
300
301 @Override
302 public boolean isClusterNodeAlive(String clusterNodeId) {
303 if (!isEnabled()) {
304 return false;
305 }
306
307 return _clusterNodeAddresses.containsKey(clusterNodeId);
308 }
309
310 @Override
311 public void portalPortConfigured(int port) {
312 if (!isEnabled() || (_localClusterNode.getPort() == _port)) {
313 return;
314 }
315
316 try {
317 _localClusterNode.setPort(port);
318
319 memberJoined(_localAddress, _localClusterNode);
320
321 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
322 ClusterMessageType.UPDATE, _localClusterNode);
323
324 _controlJChannel.send(null, clusterRequest);
325 }
326 catch (Exception e) {
327 _log.error("Unable to determine configure node port", e);
328 }
329 }
330
331 @Override
332 public void removeClusterEventListener(
333 ClusterEventListener clusterEventListener) {
334
335 if (!isEnabled()) {
336 return;
337 }
338
339 _clusterEventListeners.remove(clusterEventListener);
340 }
341
342 public void setClusterEventListeners(
343 List<ClusterEventListener> clusterEventListeners) {
344
345 if (!isEnabled()) {
346 return;
347 }
348
349 _clusterEventListeners.addAllAbsent(clusterEventListeners);
350 }
351
352 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
353 if (!isEnabled()) {
354 return;
355 }
356
357 _shortcutLocalMethod = shortcutLocalMethod;
358 }
359
360 protected void fireClusterEvent(ClusterEvent clusterEvent) {
361 for (ClusterEventListener listener : _clusterEventListeners) {
362 listener.processClusterEvent(clusterEvent);
363 }
364 }
365
366 protected ClusterNodeResponse generateClusterNodeResponse(
367 ClusterRequest clusterRequest, Object returnValue,
368 Exception exception) {
369
370 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
371
372 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
373 clusterNodeResponse.setClusterNode(getLocalClusterNode());
374 clusterNodeResponse.setClusterMessageType(
375 clusterRequest.getClusterMessageType());
376 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
377 clusterNodeResponse.setUuid(clusterRequest.getUuid());
378
379 if (exception != null) {
380 clusterNodeResponse.setException(exception);
381 }
382 else {
383 if (returnValue instanceof Serializable) {
384 clusterNodeResponse.setResult(returnValue);
385 }
386 else if (returnValue != null) {
387 clusterNodeResponse.setException(
388 new ClusterException("Return value is not serializable"));
389 }
390 }
391
392 return clusterNodeResponse;
393 }
394
395 protected JChannel getControlChannel() {
396 return _controlJChannel;
397 }
398
399 protected FutureClusterResponses getExecutionResults(String uuid) {
400 return _futureClusterResponses.get(uuid);
401 }
402
403 @Override
404 protected void initChannels() throws Exception {
405 Properties controlProperties = PropsUtil.getProperties(
406 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
407
408 String controlProperty = controlProperties.getProperty(
409 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
410
411 ClusterRequestReceiver clusterRequestReceiver =
412 new ClusterRequestReceiver(this);
413
414 _controlJChannel = createJChannel(
415 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
416 }
417
418 protected void initLocalClusterNode() throws Exception {
419 InetAddress inetAddress = bindInetAddress;
420
421 if (inetAddress == null) {
422 inetAddress = InetAddressUtil.getLocalInetAddress();
423 }
424
425 ClusterNode localClusterNode = new ClusterNode(
426 PortalUUIDUtil.generate(), inetAddress);
427
428 int port = _port;
429
430 if (port <= 0) {
431 port = PortalUtil.getPortalPort(_secure);
432 }
433
434 localClusterNode.setPort(port);
435
436 _localClusterNode = localClusterNode;
437 }
438
439 protected boolean isShortcutLocalMethod() {
440 return _shortcutLocalMethod;
441 }
442
443 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
444 _liveInstances.put(joinAddress, clusterNode);
445
446 Address previousAddress = _clusterNodeAddresses.put(
447 clusterNode.getClusterNodeId(), joinAddress);
448
449 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
450 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
451
452
453
454 fireClusterEvent(clusterEvent);
455 }
456 }
457
458 protected void memberRemoved(List<Address> departAddresses) {
459 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
460
461 for (Address departAddress : departAddresses) {
462 ClusterNode departClusterNode = _liveInstances.remove(
463 departAddress);
464
465 if (departClusterNode == null) {
466 continue;
467 }
468
469 departClusterNodes.add(departClusterNode);
470
471 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
472 }
473
474 if (departClusterNodes.isEmpty()) {
475 return;
476 }
477
478 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
479
480 fireClusterEvent(clusterEvent);
481 }
482
483 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
484 boolean isMulticast = clusterRequest.isMulticast();
485
486 List<Address> addresses = null;
487
488 if (isMulticast) {
489 addresses = getAddresses(_controlJChannel);
490 }
491 else {
492 addresses = new ArrayList<Address>();
493
494 Collection<Address> clusterNodeAddresses =
495 clusterRequest.getTargetClusterNodeAddresses();
496
497 if (clusterNodeAddresses != null) {
498 addresses.addAll(clusterNodeAddresses);
499 }
500
501 Collection<String> clusterNodeIds =
502 clusterRequest.getTargetClusterNodeIds();
503
504 if (clusterNodeIds != null) {
505 for (String clusterNodeId : clusterNodeIds) {
506 Address address = _clusterNodeAddresses.get(clusterNodeId);
507
508 addresses.add(address);
509 }
510 }
511 }
512
513 if (clusterRequest.isSkipLocal()) {
514 addresses.remove(getLocalClusterNodeAddress());
515 }
516
517 return addresses;
518 }
519
520 protected void runLocalMethod(
521 ClusterRequest clusterRequest,
522 FutureClusterResponses futureClusterResponses) {
523
524 MethodHandler methodHandler = clusterRequest.getMethodHandler();
525
526 Object returnValue = null;
527 Exception exception = null;
528
529 if (methodHandler == null) {
530 exception = new ClusterException(
531 "Payload is not of type " + MethodHandler.class.getName());
532 }
533 else {
534 try {
535 returnValue = methodHandler.invoke(true);
536 }
537 catch (Exception e) {
538 exception = e;
539 }
540 }
541
542 if (!clusterRequest.isFireAndForget()) {
543 ClusterNodeResponse clusterNodeResponse =
544 generateClusterNodeResponse(
545 clusterRequest, returnValue, exception);
546
547 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
548 }
549 }
550
551 protected void sendNotifyRequest() {
552 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
553 ClusterMessageType.NOTIFY, _localClusterNode);
554
555 try {
556 _controlJChannel.send(null, clusterRequest);
557 }
558 catch (Exception e) {
559 _log.error("Unable to send notify message", e);
560 }
561 }
562
563 private static final String _DEFAULT_CLUSTER_NAME =
564 "LIFERAY-CONTROL-CHANNEL";
565
566 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
567
568 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
569 new CopyOnWriteArrayList<ClusterEventListener>();
570 private Map<String, Address> _clusterNodeAddresses =
571 new ConcurrentHashMap<String, Address>();
572 private JChannel _controlJChannel;
573 private ExecutorService _executorService;
574 private Map<String, FutureClusterResponses> _futureClusterResponses =
575 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
576 private Map<Address, ClusterNode> _liveInstances =
577 new ConcurrentHashMap<Address, ClusterNode>();
578 private Address _localAddress;
579 private ClusterNode _localClusterNode;
580 private int _port;
581 private boolean _secure;
582 private boolean _shortcutLocalMethod;
583
584 private class ClusterResponseCallbackJob implements Runnable {
585
586 public ClusterResponseCallbackJob(
587 ClusterResponseCallback clusterResponseCallback,
588 FutureClusterResponses futureClusterResponses) {
589
590 _clusterResponseCallback = clusterResponseCallback;
591 _futureClusterResponses = futureClusterResponses;
592 _timeout = -1;
593 _timeoutGet = false;
594 _timeUnit = TimeUnit.SECONDS;
595 }
596
597 public ClusterResponseCallbackJob(
598 ClusterResponseCallback clusterResponseCallback,
599 FutureClusterResponses futureClusterResponses, long timeout,
600 TimeUnit timeUnit) {
601
602 _clusterResponseCallback = clusterResponseCallback;
603 _futureClusterResponses = futureClusterResponses;
604 _timeout = timeout;
605 _timeoutGet = true;
606 _timeUnit = timeUnit;
607 }
608
609 @Override
610 public void run() {
611 BlockingQueue<ClusterNodeResponse> blockingQueue =
612 _futureClusterResponses.getPartialResults();
613
614 _clusterResponseCallback.callback(blockingQueue);
615
616 ClusterNodeResponses clusterNodeResponses = null;
617
618 try {
619 if (_timeoutGet) {
620 clusterNodeResponses = _futureClusterResponses.get(
621 _timeout, _timeUnit);
622 }
623 else {
624 clusterNodeResponses = _futureClusterResponses.get();
625 }
626
627 _clusterResponseCallback.callback(clusterNodeResponses);
628 }
629 catch (InterruptedException ie) {
630 _clusterResponseCallback.processInterruptedException(ie);
631 }
632 catch (TimeoutException te) {
633 _clusterResponseCallback.processTimeoutException(te);
634 }
635 }
636
637 private final ClusterResponseCallback _clusterResponseCallback;
638 private final FutureClusterResponses _futureClusterResponses;
639 private final long _timeout;
640 private final boolean _timeoutGet;
641 private final TimeUnit _timeUnit;
642
643 }
644
645 }