001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.util.InetAddressUtil;
034 import com.liferay.portal.kernel.util.MethodHandler;
035 import com.liferay.portal.kernel.util.PropsKeys;
036 import com.liferay.portal.kernel.util.PropsUtil;
037 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039 import com.liferay.portal.util.PortalPortEventListener;
040 import com.liferay.portal.util.PortalUtil;
041 import com.liferay.portal.util.PropsValues;
042
043 import java.io.Serializable;
044
045 import java.net.InetAddress;
046
047 import java.util.ArrayList;
048 import java.util.Collection;
049 import java.util.Collections;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Properties;
053 import java.util.concurrent.BlockingQueue;
054 import java.util.concurrent.ConcurrentHashMap;
055 import java.util.concurrent.CopyOnWriteArrayList;
056 import java.util.concurrent.ExecutorService;
057 import java.util.concurrent.TimeUnit;
058 import java.util.concurrent.TimeoutException;
059
060 import org.jgroups.ChannelException;
061 import org.jgroups.JChannel;
062
063
067 public class ClusterExecutorImpl
068 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
069
070 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
071 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
072
073 public void addClusterEventListener(
074 ClusterEventListener clusterEventListener) {
075
076 if (!isEnabled()) {
077 return;
078 }
079
080 _clusterEventListeners.addIfAbsent(clusterEventListener);
081 }
082
083 @Override
084 public void afterPropertiesSet() {
085 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
086 addClusterEventListener(new DebuggingClusterEventListenerImpl());
087 }
088
089 if (PropsValues.LIVE_USERS_ENABLED) {
090 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
091 }
092
093 super.afterPropertiesSet();
094 }
095
096 @Override
097 public void destroy() {
098 if (!isEnabled()) {
099 return;
100 }
101
102 PortalExecutorManagerUtil.shutdown(
103 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
104
105 _controlJChannel.close();
106 }
107
108 public FutureClusterResponses execute(ClusterRequest clusterRequest)
109 throws SystemException {
110
111 if (!isEnabled()) {
112 return null;
113 }
114
115 List<Address> addresses = prepareAddresses(clusterRequest);
116
117 FutureClusterResponses futureClusterResponses =
118 new FutureClusterResponses(addresses);
119
120 if (!clusterRequest.isFireAndForget()) {
121 String uuid = clusterRequest.getUuid();
122
123 _futureClusterResponses.put(uuid, futureClusterResponses);
124 }
125
126 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
127 addresses.remove(getLocalClusterNodeAddress())) {
128
129 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
130 clusterRequest.getMethodHandler());
131
132 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
133 clusterNodeResponse.setUuid(clusterRequest.getUuid());
134
135 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
136 }
137
138 if (clusterRequest.isMulticast()) {
139 sendMulticastRequest(clusterRequest);
140 }
141 else {
142 sendUnicastRequest(clusterRequest, addresses);
143 }
144
145 return futureClusterResponses;
146 }
147
148 public void execute(
149 ClusterRequest clusterRequest,
150 ClusterResponseCallback clusterResponseCallback)
151 throws SystemException {
152
153 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
154
155 ClusterResponseCallbackJob clusterResponseCallbackJob =
156 new ClusterResponseCallbackJob(
157 clusterResponseCallback, futureClusterResponses);
158
159 _executorService.execute(clusterResponseCallbackJob);
160 }
161
162 public void execute(
163 ClusterRequest clusterRequest,
164 ClusterResponseCallback clusterResponseCallback, long timeout,
165 TimeUnit timeUnit)
166 throws SystemException {
167
168 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
169
170 ClusterResponseCallbackJob clusterResponseCallbackJob =
171 new ClusterResponseCallbackJob(
172 clusterResponseCallback, futureClusterResponses, timeout,
173 timeUnit);
174
175 _executorService.execute(clusterResponseCallbackJob);
176 }
177
178 public List<ClusterEventListener> getClusterEventListeners() {
179 if (!isEnabled()) {
180 return Collections.emptyList();
181 }
182
183 return Collections.unmodifiableList(_clusterEventListeners);
184 }
185
186 public List<Address> getClusterNodeAddresses() {
187 if (!isEnabled()) {
188 return Collections.emptyList();
189 }
190
191 return getAddresses(_controlJChannel);
192 }
193
194 public List<ClusterNode> getClusterNodes() {
195 if (!isEnabled()) {
196 return Collections.emptyList();
197 }
198
199 return new ArrayList<ClusterNode>(_liveInstances.values());
200 }
201
202 public ClusterNode getLocalClusterNode() {
203 if (!isEnabled()) {
204 return null;
205 }
206
207 return _localClusterNode;
208 }
209
210 public Address getLocalClusterNodeAddress() {
211 if (!isEnabled()) {
212 return null;
213 }
214
215 return _localAddress;
216 }
217
218 public void initialize() {
219 if (!isEnabled()) {
220 return;
221 }
222
223 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
224 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
225
226 PortalUtil.addPortalPortEventListener(this);
227
228 _localAddress = new AddressImpl(_controlJChannel.getLocalAddress());
229
230 try {
231 initLocalClusterNode();
232 }
233 catch (SystemException se) {
234 _log.error("Unable to determine local network address", se);
235 }
236
237 memberJoined(_localAddress, _localClusterNode);
238
239 sendNotifyRequest();
240
241 ClusterRequestReceiver clusterRequestReceiver =
242 (ClusterRequestReceiver)_controlJChannel.getReceiver();
243
244 clusterRequestReceiver.openLatch();
245 }
246
247 public boolean isClusterNodeAlive(Address address) {
248 if (!isEnabled()) {
249 return false;
250 }
251
252 List<Address> addresses = getAddresses(_controlJChannel);
253
254 return addresses.contains(address);
255 }
256
257 public boolean isClusterNodeAlive(String clusterNodeId) {
258 if (!isEnabled()) {
259 return false;
260 }
261
262 return _clusterNodeAddresses.containsKey(clusterNodeId);
263 }
264
265 @Override
266 public boolean isEnabled() {
267 return PropsValues.CLUSTER_LINK_ENABLED;
268 }
269
270 public void portalPortConfigured(int port) {
271 if (!isEnabled() ||
272 (_localClusterNode.getPort() ==
273 PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
274
275 return;
276 }
277
278 try {
279 _localClusterNode.setPort(port);
280
281 memberJoined(_localAddress, _localClusterNode);
282
283 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
284 ClusterMessageType.UPDATE, _localClusterNode);
285
286 _controlJChannel.send(null, null, clusterRequest);
287 }
288 catch (Exception e) {
289 _log.error("Unable to determine configure node port", e);
290 }
291 }
292
293 public void removeClusterEventListener(
294 ClusterEventListener clusterEventListener) {
295
296 if (!isEnabled()) {
297 return;
298 }
299
300 _clusterEventListeners.remove(clusterEventListener);
301 }
302
303 public void setClusterEventListeners(
304 List<ClusterEventListener> clusterEventListeners) {
305
306 if (!isEnabled()) {
307 return;
308 }
309
310 _clusterEventListeners.addAllAbsent(clusterEventListeners);
311 }
312
313 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
314 if (!isEnabled()) {
315 return;
316 }
317
318 _shortcutLocalMethod = shortcutLocalMethod;
319 }
320
321 protected void fireClusterEvent(ClusterEvent clusterEvent) {
322 for (ClusterEventListener listener : _clusterEventListeners) {
323 listener.processClusterEvent(clusterEvent);
324 }
325 }
326
327 protected JChannel getControlChannel() {
328 return _controlJChannel;
329 }
330
331 protected FutureClusterResponses getExecutionResults(String uuid) {
332 return _futureClusterResponses.get(uuid);
333 }
334
335 @Override
336 protected void initChannels() {
337 Properties controlProperties = PropsUtil.getProperties(
338 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
339
340 String controlProperty = controlProperties.getProperty(
341 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
342
343 ClusterRequestReceiver clusterRequestReceiver =
344 new ClusterRequestReceiver(this);
345
346 try {
347 _controlJChannel = createJChannel(
348 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
349 }
350 catch (ChannelException ce) {
351 _log.error(ce, ce);
352 }
353 catch (Exception e) {
354 _log.error(e, e);
355 }
356 }
357
358 protected void initLocalClusterNode() throws SystemException {
359 _localClusterNode = new ClusterNode(PortalUUIDUtil.generate());
360
361 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
362 _localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
363 }
364 else {
365 _localClusterNode.setPort(PortalUtil.getPortalPort(false));
366 }
367
368 try {
369 InetAddress inetAddress = bindInetAddress;
370
371 if (inetAddress == null) {
372 inetAddress = InetAddressUtil.getLocalInetAddress();
373 }
374
375 _localClusterNode.setInetAddress(inetAddress);
376
377 _localClusterNode.setHostName(inetAddress.getHostName());
378 }
379 catch (Exception e) {
380 throw new SystemException(
381 "Unable to determine local network address", e);
382 }
383 }
384
385 protected boolean isShortcutLocalMethod() {
386 return _shortcutLocalMethod;
387 }
388
389 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
390 _liveInstances.put(joinAddress, clusterNode);
391
392 Address previousAddress = _clusterNodeAddresses.put(
393 clusterNode.getClusterNodeId(), joinAddress);
394
395 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
396 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
397
398 fireClusterEvent(clusterEvent);
399 }
400 }
401
402 protected void memberRemoved(List<Address> departAddresses) {
403 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
404
405 for (Address departAddress : departAddresses) {
406 ClusterNode departClusterNode = _liveInstances.remove(
407 departAddress);
408
409 if (departClusterNode == null) {
410 continue;
411 }
412
413 departClusterNodes.add(departClusterNode);
414
415 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
416 }
417
418 if (departClusterNodes.isEmpty()) {
419 return;
420 }
421
422 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
423
424 fireClusterEvent(clusterEvent);
425 }
426
427 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
428 boolean isMulticast = clusterRequest.isMulticast();
429
430 List<Address> addresses = null;
431
432 if (isMulticast) {
433 addresses = getAddresses(_controlJChannel);
434 }
435 else {
436 addresses = new ArrayList<Address>();
437
438 Collection<Address> clusterNodeAddresses =
439 clusterRequest.getTargetClusterNodeAddresses();
440
441 if (clusterNodeAddresses != null) {
442 addresses.addAll(clusterNodeAddresses);
443 }
444
445 Collection<String> clusterNodeIds =
446 clusterRequest.getTargetClusterNodeIds();
447
448 if (clusterNodeIds != null) {
449 for (String clusterNodeId : clusterNodeIds) {
450 Address address = _clusterNodeAddresses.get(clusterNodeId);
451
452 addresses.add(address);
453 }
454 }
455 }
456
457 return addresses;
458 }
459
460 protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler) {
461 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
462
463 ClusterNode localClusterNode = getLocalClusterNode();
464
465 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
466 clusterNodeResponse.setClusterNode(localClusterNode);
467 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
468
469 if (methodHandler == null) {
470 clusterNodeResponse.setException(
471 new ClusterException(
472 "Payload is not of type " + MethodHandler.class.getName()));
473
474 return clusterNodeResponse;
475 }
476
477 try {
478 Object returnValue = methodHandler.invoke(true);
479
480 if (returnValue instanceof Serializable) {
481 clusterNodeResponse.setResult(returnValue);
482 }
483 else if (returnValue != null) {
484 clusterNodeResponse.setException(
485 new ClusterException("Return value is not serializable"));
486 }
487 }
488 catch (Exception e) {
489 clusterNodeResponse.setException(e);
490 }
491
492 return clusterNodeResponse;
493 }
494
495 protected void sendMulticastRequest(ClusterRequest clusterRequest)
496 throws SystemException {
497
498 try {
499 _controlJChannel.send(null, null, clusterRequest);
500 }
501 catch (ChannelException ce) {
502 _log.error(
503 "Unable to send multicast message " + clusterRequest, ce);
504
505 throw new SystemException("Unable to send multicast request", ce);
506 }
507 }
508
509 protected void sendNotifyRequest() {
510 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
511 ClusterMessageType.NOTIFY, _localClusterNode);
512
513 try {
514 _controlJChannel.send(null, null, clusterRequest);
515 }
516 catch (ChannelException ce) {
517 _log.error("Unable to send multicast message", ce);
518 }
519 }
520
521 protected void sendUnicastRequest(
522 ClusterRequest clusterRequest, List<Address> addresses)
523 throws SystemException {
524
525 for (Address address : addresses) {
526 org.jgroups.Address jGroupsAddress =
527 (org.jgroups.Address)address.getRealAddress();
528
529 try {
530 _controlJChannel.send(jGroupsAddress, null, clusterRequest);
531 }
532 catch (ChannelException ce) {
533 _log.error(
534 "Unable to send unicast message " + clusterRequest, ce);
535
536 throw new SystemException("Unable to send unicast request", ce);
537 }
538 }
539 }
540
541 private static final String _DEFAULT_CLUSTER_NAME =
542 "LIFERAY-CONTROL-CHANNEL";
543
544 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
545
546 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
547 new CopyOnWriteArrayList<ClusterEventListener>();
548 private Map<String, Address> _clusterNodeAddresses =
549 new ConcurrentHashMap<String, Address>();
550 private JChannel _controlJChannel;
551 private ExecutorService _executorService;
552 private Map<String, FutureClusterResponses> _futureClusterResponses =
553 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
554 private Map<Address, ClusterNode> _liveInstances =
555 new ConcurrentHashMap<Address, ClusterNode>();
556 private Address _localAddress;
557 private ClusterNode _localClusterNode;
558 private boolean _shortcutLocalMethod;
559
560 private class ClusterResponseCallbackJob implements Runnable {
561
562 public ClusterResponseCallbackJob(
563 ClusterResponseCallback clusterResponseCallback,
564 FutureClusterResponses futureClusterResponses) {
565
566 _clusterResponseCallback = clusterResponseCallback;
567 _futureClusterResponses = futureClusterResponses;
568 _timeout = -1;
569 _timeoutGet = false;
570 _timeUnit = TimeUnit.SECONDS;
571 }
572
573 public ClusterResponseCallbackJob(
574 ClusterResponseCallback clusterResponseCallback,
575 FutureClusterResponses futureClusterResponses, long timeout,
576 TimeUnit timeUnit) {
577
578 _clusterResponseCallback = clusterResponseCallback;
579 _futureClusterResponses = futureClusterResponses;
580 _timeout = timeout;
581 _timeoutGet = true;
582 _timeUnit = timeUnit;
583 }
584
585 public void run() {
586 BlockingQueue<ClusterNodeResponse> blockingQueue =
587 _futureClusterResponses.getPartialResults();
588
589 _clusterResponseCallback.callback(blockingQueue);
590
591 ClusterNodeResponses clusterNodeResponses = null;
592
593 try {
594 if (_timeoutGet) {
595 clusterNodeResponses = _futureClusterResponses.get(
596 _timeout, _timeUnit);
597 }
598 else {
599 clusterNodeResponses = _futureClusterResponses.get();
600 }
601
602 _clusterResponseCallback.callback(clusterNodeResponses);
603 }
604 catch (InterruptedException ie) {
605 _clusterResponseCallback.processInterruptedException(ie);
606 }
607 catch (TimeoutException te) {
608 _clusterResponseCallback.processTimeoutException(te);
609 }
610 }
611
612 private final ClusterResponseCallback _clusterResponseCallback;
613 private final FutureClusterResponses _futureClusterResponses;
614 private final long _timeout;
615 private final boolean _timeoutGet;
616 private final TimeUnit _timeUnit;
617
618 }
619
620 }