001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.util.InetAddressUtil;
034 import com.liferay.portal.kernel.util.MethodHandler;
035 import com.liferay.portal.kernel.util.PropsKeys;
036 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
037 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038 import com.liferay.portal.util.PortalPortEventListener;
039 import com.liferay.portal.util.PortalUtil;
040 import com.liferay.portal.util.PropsUtil;
041 import com.liferay.portal.util.PropsValues;
042
043 import java.io.Serializable;
044
045 import java.net.InetAddress;
046
047 import java.util.ArrayList;
048 import java.util.Collection;
049 import java.util.Collections;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Properties;
053 import java.util.concurrent.BlockingQueue;
054 import java.util.concurrent.ConcurrentHashMap;
055 import java.util.concurrent.CopyOnWriteArrayList;
056 import java.util.concurrent.ExecutorService;
057 import java.util.concurrent.TimeUnit;
058 import java.util.concurrent.TimeoutException;
059
060 import org.jgroups.JChannel;
061
062
066 public class ClusterExecutorImpl
067 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
068
069 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
070 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
071
072 public void addClusterEventListener(
073 ClusterEventListener clusterEventListener) {
074
075 if (!isEnabled()) {
076 return;
077 }
078
079 _clusterEventListeners.addIfAbsent(clusterEventListener);
080 }
081
082 @Override
083 public void afterPropertiesSet() {
084 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
085 addClusterEventListener(new DebuggingClusterEventListenerImpl());
086 }
087
088 if (PropsValues.LIVE_USERS_ENABLED) {
089 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
090 }
091
092 super.afterPropertiesSet();
093 }
094
095 @Override
096 public void destroy() {
097 if (!isEnabled()) {
098 return;
099 }
100
101 PortalExecutorManagerUtil.shutdown(
102 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
103
104 _controlJChannel.setReceiver(null);
105
106 _controlJChannel.close();
107
108 _clusterEventListeners.clear();
109 _clusterNodeAddresses.clear();
110 _futureClusterResponses.clear();
111 _liveInstances.clear();
112 _localAddress = null;
113 _localClusterNode = null;
114 }
115
116 public FutureClusterResponses execute(ClusterRequest clusterRequest)
117 throws SystemException {
118
119 if (!isEnabled()) {
120 return null;
121 }
122
123 List<Address> addresses = prepareAddresses(clusterRequest);
124
125 FutureClusterResponses futureClusterResponses =
126 new FutureClusterResponses(addresses);
127
128 if (!clusterRequest.isFireAndForget()) {
129 String uuid = clusterRequest.getUuid();
130
131 _futureClusterResponses.put(uuid, futureClusterResponses);
132 }
133
134 if (_shortcutLocalMethod &&
135 addresses.remove(getLocalClusterNodeAddress())) {
136
137 runLocalMethod(clusterRequest, futureClusterResponses);
138 }
139
140 if (clusterRequest.isMulticast()) {
141 try {
142 _controlJChannel.send(null, clusterRequest);
143 }
144 catch (Exception e) {
145 throw new SystemException(
146 "Unable to send multicast request", e);
147 }
148 }
149 else {
150 for (Address address : addresses) {
151 org.jgroups.Address jGroupsAddress =
152 (org.jgroups.Address)address.getRealAddress();
153
154 try {
155 _controlJChannel.send(jGroupsAddress, clusterRequest);
156 }
157 catch (Exception e) {
158 throw new SystemException(
159 "Unable to send unicast request", e);
160 }
161 }
162 }
163
164 return futureClusterResponses;
165 }
166
167 public void execute(
168 ClusterRequest clusterRequest,
169 ClusterResponseCallback clusterResponseCallback)
170 throws SystemException {
171
172 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
173
174 ClusterResponseCallbackJob clusterResponseCallbackJob =
175 new ClusterResponseCallbackJob(
176 clusterResponseCallback, futureClusterResponses);
177
178 _executorService.execute(clusterResponseCallbackJob);
179 }
180
181 public void execute(
182 ClusterRequest clusterRequest,
183 ClusterResponseCallback clusterResponseCallback, long timeout,
184 TimeUnit timeUnit)
185 throws SystemException {
186
187 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
188
189 ClusterResponseCallbackJob clusterResponseCallbackJob =
190 new ClusterResponseCallbackJob(
191 clusterResponseCallback, futureClusterResponses, timeout,
192 timeUnit);
193
194 _executorService.execute(clusterResponseCallbackJob);
195 }
196
197 public List<ClusterEventListener> getClusterEventListeners() {
198 if (!isEnabled()) {
199 return Collections.emptyList();
200 }
201
202 return Collections.unmodifiableList(_clusterEventListeners);
203 }
204
205 public List<Address> getClusterNodeAddresses() {
206 if (!isEnabled()) {
207 return Collections.emptyList();
208 }
209
210 return getAddresses(_controlJChannel);
211 }
212
213 public List<ClusterNode> getClusterNodes() {
214 if (!isEnabled()) {
215 return Collections.emptyList();
216 }
217
218 return new ArrayList<ClusterNode>(_liveInstances.values());
219 }
220
221 public ClusterNode getLocalClusterNode() {
222 if (!isEnabled()) {
223 return null;
224 }
225
226 return _localClusterNode;
227 }
228
229 public Address getLocalClusterNodeAddress() {
230 if (!isEnabled()) {
231 return null;
232 }
233
234 return _localAddress;
235 }
236
237 public void initialize() {
238 if (!isEnabled()) {
239 return;
240 }
241
242 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
243 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
244
245 PortalUtil.addPortalPortEventListener(this);
246
247 _localAddress = new AddressImpl(_controlJChannel.getAddress());
248
249 try {
250 initLocalClusterNode();
251
252 memberJoined(_localAddress, _localClusterNode);
253
254 sendNotifyRequest();
255 }
256 catch (Exception e) {
257 _log.error("Unable to determine local network address", e);
258 }
259
260 ClusterRequestReceiver clusterRequestReceiver =
261 (ClusterRequestReceiver)_controlJChannel.getReceiver();
262
263 clusterRequestReceiver.openLatch();
264 }
265
266 public boolean isClusterNodeAlive(Address address) {
267 if (!isEnabled()) {
268 return false;
269 }
270
271 List<Address> addresses = getAddresses(_controlJChannel);
272
273 return addresses.contains(address);
274 }
275
276 public boolean isClusterNodeAlive(String clusterNodeId) {
277 if (!isEnabled()) {
278 return false;
279 }
280
281 return _clusterNodeAddresses.containsKey(clusterNodeId);
282 }
283
284 public void portalPortConfigured(int port) {
285 if (!isEnabled() ||
286 (_localClusterNode.getPort() ==
287 PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
288
289 return;
290 }
291
292 try {
293 _localClusterNode.setPort(port);
294
295 memberJoined(_localAddress, _localClusterNode);
296
297 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
298 ClusterMessageType.UPDATE, _localClusterNode);
299
300 _controlJChannel.send(null, clusterRequest);
301 }
302 catch (Exception e) {
303 _log.error("Unable to determine configure node port", e);
304 }
305 }
306
307 public void removeClusterEventListener(
308 ClusterEventListener clusterEventListener) {
309
310 if (!isEnabled()) {
311 return;
312 }
313
314 _clusterEventListeners.remove(clusterEventListener);
315 }
316
317 public void setClusterEventListeners(
318 List<ClusterEventListener> clusterEventListeners) {
319
320 if (!isEnabled()) {
321 return;
322 }
323
324 _clusterEventListeners.addAllAbsent(clusterEventListeners);
325 }
326
327 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
328 if (!isEnabled()) {
329 return;
330 }
331
332 _shortcutLocalMethod = shortcutLocalMethod;
333 }
334
335 protected void fireClusterEvent(ClusterEvent clusterEvent) {
336 for (ClusterEventListener listener : _clusterEventListeners) {
337 listener.processClusterEvent(clusterEvent);
338 }
339 }
340
341 protected ClusterNodeResponse generateClusterNodeResponse(
342 ClusterRequest clusterRequest, Object returnValue,
343 Exception exception) {
344
345 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
346
347 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
348 clusterNodeResponse.setClusterNode(getLocalClusterNode());
349 clusterNodeResponse.setClusterMessageType(
350 clusterRequest.getClusterMessageType());
351 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
352 clusterNodeResponse.setUuid(clusterRequest.getUuid());
353
354 if (exception != null) {
355 clusterNodeResponse.setException(exception);
356 }
357 else {
358 if (returnValue instanceof Serializable) {
359 clusterNodeResponse.setResult(returnValue);
360 }
361 else if (returnValue != null) {
362 clusterNodeResponse.setException(
363 new ClusterException("Return value is not serializable"));
364 }
365 }
366
367 return clusterNodeResponse;
368 }
369
370 protected JChannel getControlChannel() {
371 return _controlJChannel;
372 }
373
374 protected FutureClusterResponses getExecutionResults(String uuid) {
375 return _futureClusterResponses.get(uuid);
376 }
377
378 @Override
379 protected void initChannels() throws Exception {
380 Properties controlProperties = PropsUtil.getProperties(
381 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
382
383 String controlProperty = controlProperties.getProperty(
384 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
385
386 ClusterRequestReceiver clusterRequestReceiver =
387 new ClusterRequestReceiver(this);
388
389 _controlJChannel = createJChannel(
390 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
391 }
392
393 protected void initLocalClusterNode() throws Exception {
394 InetAddress inetAddress = bindInetAddress;
395
396 if (inetAddress == null) {
397 inetAddress = InetAddressUtil.getLocalInetAddress();
398 }
399
400 ClusterNode localClusterNode = new ClusterNode(
401 PortalUUIDUtil.generate(), inetAddress);
402
403 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
404 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
405 }
406 else {
407 localClusterNode.setPort(PortalUtil.getPortalPort(false));
408 }
409
410 _localClusterNode = localClusterNode;
411 }
412
413 protected boolean isShortcutLocalMethod() {
414 return _shortcutLocalMethod;
415 }
416
417 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
418 _liveInstances.put(joinAddress, clusterNode);
419
420 Address previousAddress = _clusterNodeAddresses.put(
421 clusterNode.getClusterNodeId(), joinAddress);
422
423 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
424 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
425
426
427
428 fireClusterEvent(clusterEvent);
429 }
430 }
431
432 protected void memberRemoved(List<Address> departAddresses) {
433 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
434
435 for (Address departAddress : departAddresses) {
436 ClusterNode departClusterNode = _liveInstances.remove(
437 departAddress);
438
439 if (departClusterNode == null) {
440 continue;
441 }
442
443 departClusterNodes.add(departClusterNode);
444
445 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
446 }
447
448 if (departClusterNodes.isEmpty()) {
449 return;
450 }
451
452 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
453
454 fireClusterEvent(clusterEvent);
455 }
456
457 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
458 boolean isMulticast = clusterRequest.isMulticast();
459
460 List<Address> addresses = null;
461
462 if (isMulticast) {
463 addresses = getAddresses(_controlJChannel);
464 }
465 else {
466 addresses = new ArrayList<Address>();
467
468 Collection<Address> clusterNodeAddresses =
469 clusterRequest.getTargetClusterNodeAddresses();
470
471 if (clusterNodeAddresses != null) {
472 addresses.addAll(clusterNodeAddresses);
473 }
474
475 Collection<String> clusterNodeIds =
476 clusterRequest.getTargetClusterNodeIds();
477
478 if (clusterNodeIds != null) {
479 for (String clusterNodeId : clusterNodeIds) {
480 Address address = _clusterNodeAddresses.get(clusterNodeId);
481
482 addresses.add(address);
483 }
484 }
485 }
486
487 if (clusterRequest.isSkipLocal()) {
488 addresses.remove(getLocalClusterNodeAddress());
489 }
490
491 return addresses;
492 }
493
494 protected void runLocalMethod(
495 ClusterRequest clusterRequest,
496 FutureClusterResponses futureClusterResponses) {
497
498 MethodHandler methodHandler = clusterRequest.getMethodHandler();
499
500 Object returnValue = null;
501 Exception exception = null;
502
503 if (methodHandler == null) {
504 exception = new ClusterException(
505 "Payload is not of type " + MethodHandler.class.getName());
506 }
507 else {
508 try {
509 returnValue = methodHandler.invoke(true);
510 }
511 catch (Exception e) {
512 exception = e;
513 }
514 }
515
516 if (!clusterRequest.isFireAndForget()) {
517 ClusterNodeResponse clusterNodeResponse =
518 generateClusterNodeResponse(
519 clusterRequest, returnValue, exception);
520
521 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
522 }
523 }
524
525 protected void sendNotifyRequest() {
526 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
527 ClusterMessageType.NOTIFY, _localClusterNode);
528
529 try {
530 _controlJChannel.send(null, clusterRequest);
531 }
532 catch (Exception e) {
533 _log.error("Unable to send notify message", e);
534 }
535 }
536
537 private static final String _DEFAULT_CLUSTER_NAME =
538 "LIFERAY-CONTROL-CHANNEL";
539
540 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
541
542 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
543 new CopyOnWriteArrayList<ClusterEventListener>();
544 private Map<String, Address> _clusterNodeAddresses =
545 new ConcurrentHashMap<String, Address>();
546 private JChannel _controlJChannel;
547 private ExecutorService _executorService;
548 private Map<String, FutureClusterResponses> _futureClusterResponses =
549 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
550 private Map<Address, ClusterNode> _liveInstances =
551 new ConcurrentHashMap<Address, ClusterNode>();
552 private Address _localAddress;
553 private ClusterNode _localClusterNode;
554 private boolean _shortcutLocalMethod;
555
556 private class ClusterResponseCallbackJob implements Runnable {
557
558 public ClusterResponseCallbackJob(
559 ClusterResponseCallback clusterResponseCallback,
560 FutureClusterResponses futureClusterResponses) {
561
562 _clusterResponseCallback = clusterResponseCallback;
563 _futureClusterResponses = futureClusterResponses;
564 _timeout = -1;
565 _timeoutGet = false;
566 _timeUnit = TimeUnit.SECONDS;
567 }
568
569 public ClusterResponseCallbackJob(
570 ClusterResponseCallback clusterResponseCallback,
571 FutureClusterResponses futureClusterResponses, long timeout,
572 TimeUnit timeUnit) {
573
574 _clusterResponseCallback = clusterResponseCallback;
575 _futureClusterResponses = futureClusterResponses;
576 _timeout = timeout;
577 _timeoutGet = true;
578 _timeUnit = timeUnit;
579 }
580
581 public void run() {
582 BlockingQueue<ClusterNodeResponse> blockingQueue =
583 _futureClusterResponses.getPartialResults();
584
585 _clusterResponseCallback.callback(blockingQueue);
586
587 ClusterNodeResponses clusterNodeResponses = null;
588
589 try {
590 if (_timeoutGet) {
591 clusterNodeResponses = _futureClusterResponses.get(
592 _timeout, _timeUnit);
593 }
594 else {
595 clusterNodeResponses = _futureClusterResponses.get();
596 }
597
598 _clusterResponseCallback.callback(clusterNodeResponses);
599 }
600 catch (InterruptedException ie) {
601 _clusterResponseCallback.processInterruptedException(ie);
602 }
603 catch (TimeoutException te) {
604 _clusterResponseCallback.processTimeoutException(te);
605 }
606 }
607
608 private final ClusterResponseCallback _clusterResponseCallback;
609 private final FutureClusterResponses _futureClusterResponses;
610 private final long _timeout;
611 private final boolean _timeoutGet;
612 private final TimeUnit _timeUnit;
613
614 }
615
616 }