001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.InetAddressUtil;
035 import com.liferay.portal.kernel.util.MethodHandler;
036 import com.liferay.portal.kernel.util.PropsKeys;
037 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039 import com.liferay.portal.util.PortalPortEventListener;
040 import com.liferay.portal.util.PortalUtil;
041 import com.liferay.portal.util.PropsUtil;
042 import com.liferay.portal.util.PropsValues;
043
044 import java.io.Serializable;
045
046 import java.net.InetAddress;
047
048 import java.util.ArrayList;
049 import java.util.Collection;
050 import java.util.Collections;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.Properties;
054 import java.util.concurrent.BlockingQueue;
055 import java.util.concurrent.ConcurrentHashMap;
056 import java.util.concurrent.CopyOnWriteArrayList;
057 import java.util.concurrent.ExecutorService;
058 import java.util.concurrent.TimeUnit;
059 import java.util.concurrent.TimeoutException;
060
061 import org.jgroups.JChannel;
062
063
067 @DoPrivileged
068 public class ClusterExecutorImpl
069 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
070
071 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
072 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
073
074 public void addClusterEventListener(
075 ClusterEventListener clusterEventListener) {
076
077 if (!isEnabled()) {
078 return;
079 }
080
081 _clusterEventListeners.addIfAbsent(clusterEventListener);
082 }
083
084 @Override
085 public void afterPropertiesSet() {
086 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
087 addClusterEventListener(new DebuggingClusterEventListenerImpl());
088 }
089
090 if (PropsValues.LIVE_USERS_ENABLED) {
091 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
092 }
093
094 super.afterPropertiesSet();
095 }
096
097 @Override
098 public void destroy() {
099 if (!isEnabled()) {
100 return;
101 }
102
103 PortalExecutorManagerUtil.shutdown(
104 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
105
106 _controlJChannel.setReceiver(null);
107
108 _controlJChannel.close();
109
110 _clusterEventListeners.clear();
111 _clusterNodeAddresses.clear();
112 _futureClusterResponses.clear();
113 _liveInstances.clear();
114 _localAddress = null;
115 _localClusterNode = null;
116 }
117
118 public FutureClusterResponses execute(ClusterRequest clusterRequest)
119 throws SystemException {
120
121 if (!isEnabled()) {
122 return null;
123 }
124
125 List<Address> addresses = prepareAddresses(clusterRequest);
126
127 FutureClusterResponses futureClusterResponses =
128 new FutureClusterResponses(addresses);
129
130 if (!clusterRequest.isFireAndForget()) {
131 String uuid = clusterRequest.getUuid();
132
133 _futureClusterResponses.put(uuid, futureClusterResponses);
134 }
135
136 if (_shortcutLocalMethod &&
137 addresses.remove(getLocalClusterNodeAddress())) {
138
139 runLocalMethod(clusterRequest, futureClusterResponses);
140 }
141
142 if (clusterRequest.isMulticast()) {
143 try {
144 _controlJChannel.send(null, clusterRequest);
145 }
146 catch (Exception e) {
147 throw new SystemException(
148 "Unable to send multicast request", e);
149 }
150 }
151 else {
152 for (Address address : addresses) {
153 org.jgroups.Address jGroupsAddress =
154 (org.jgroups.Address)address.getRealAddress();
155
156 try {
157 _controlJChannel.send(jGroupsAddress, clusterRequest);
158 }
159 catch (Exception e) {
160 throw new SystemException(
161 "Unable to send unicast request", e);
162 }
163 }
164 }
165
166 return futureClusterResponses;
167 }
168
169 public void execute(
170 ClusterRequest clusterRequest,
171 ClusterResponseCallback clusterResponseCallback)
172 throws SystemException {
173
174 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
175
176 ClusterResponseCallbackJob clusterResponseCallbackJob =
177 new ClusterResponseCallbackJob(
178 clusterResponseCallback, futureClusterResponses);
179
180 _executorService.execute(clusterResponseCallbackJob);
181 }
182
183 public void execute(
184 ClusterRequest clusterRequest,
185 ClusterResponseCallback clusterResponseCallback, long timeout,
186 TimeUnit timeUnit)
187 throws SystemException {
188
189 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
190
191 ClusterResponseCallbackJob clusterResponseCallbackJob =
192 new ClusterResponseCallbackJob(
193 clusterResponseCallback, futureClusterResponses, timeout,
194 timeUnit);
195
196 _executorService.execute(clusterResponseCallbackJob);
197 }
198
199 public List<ClusterEventListener> getClusterEventListeners() {
200 if (!isEnabled()) {
201 return Collections.emptyList();
202 }
203
204 return Collections.unmodifiableList(_clusterEventListeners);
205 }
206
207 public List<Address> getClusterNodeAddresses() {
208 if (!isEnabled()) {
209 return Collections.emptyList();
210 }
211
212 return getAddresses(_controlJChannel);
213 }
214
215 public List<ClusterNode> getClusterNodes() {
216 if (!isEnabled()) {
217 return Collections.emptyList();
218 }
219
220 return new ArrayList<ClusterNode>(_liveInstances.values());
221 }
222
223 public ClusterNode getLocalClusterNode() {
224 if (!isEnabled()) {
225 return null;
226 }
227
228 return _localClusterNode;
229 }
230
231 public Address getLocalClusterNodeAddress() {
232 if (!isEnabled()) {
233 return null;
234 }
235
236 return _localAddress;
237 }
238
239 public void initialize() {
240 if (!isEnabled()) {
241 return;
242 }
243
244 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
245 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
246
247 PortalUtil.addPortalPortEventListener(this);
248
249 _localAddress = new AddressImpl(_controlJChannel.getAddress());
250
251 try {
252 initLocalClusterNode();
253
254 memberJoined(_localAddress, _localClusterNode);
255
256 sendNotifyRequest();
257 }
258 catch (Exception e) {
259 _log.error("Unable to determine local network address", e);
260 }
261
262 ClusterRequestReceiver clusterRequestReceiver =
263 (ClusterRequestReceiver)_controlJChannel.getReceiver();
264
265 clusterRequestReceiver.openLatch();
266 }
267
268 public boolean isClusterNodeAlive(Address address) {
269 if (!isEnabled()) {
270 return false;
271 }
272
273 List<Address> addresses = getAddresses(_controlJChannel);
274
275 return addresses.contains(address);
276 }
277
278 public boolean isClusterNodeAlive(String clusterNodeId) {
279 if (!isEnabled()) {
280 return false;
281 }
282
283 return _clusterNodeAddresses.containsKey(clusterNodeId);
284 }
285
286 public void portalPortConfigured(int port) {
287 if (!isEnabled() ||
288 (_localClusterNode.getPort() ==
289 PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
290
291 return;
292 }
293
294 try {
295 _localClusterNode.setPort(port);
296
297 memberJoined(_localAddress, _localClusterNode);
298
299 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
300 ClusterMessageType.UPDATE, _localClusterNode);
301
302 _controlJChannel.send(null, clusterRequest);
303 }
304 catch (Exception e) {
305 _log.error("Unable to determine configure node port", e);
306 }
307 }
308
309 public void removeClusterEventListener(
310 ClusterEventListener clusterEventListener) {
311
312 if (!isEnabled()) {
313 return;
314 }
315
316 _clusterEventListeners.remove(clusterEventListener);
317 }
318
319 public void setClusterEventListeners(
320 List<ClusterEventListener> clusterEventListeners) {
321
322 if (!isEnabled()) {
323 return;
324 }
325
326 _clusterEventListeners.addAllAbsent(clusterEventListeners);
327 }
328
329 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
330 if (!isEnabled()) {
331 return;
332 }
333
334 _shortcutLocalMethod = shortcutLocalMethod;
335 }
336
337 protected void fireClusterEvent(ClusterEvent clusterEvent) {
338 for (ClusterEventListener listener : _clusterEventListeners) {
339 listener.processClusterEvent(clusterEvent);
340 }
341 }
342
343 protected ClusterNodeResponse generateClusterNodeResponse(
344 ClusterRequest clusterRequest, Object returnValue,
345 Exception exception) {
346
347 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
348
349 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
350 clusterNodeResponse.setClusterNode(getLocalClusterNode());
351 clusterNodeResponse.setClusterMessageType(
352 clusterRequest.getClusterMessageType());
353 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
354 clusterNodeResponse.setUuid(clusterRequest.getUuid());
355
356 if (exception != null) {
357 clusterNodeResponse.setException(exception);
358 }
359 else {
360 if (returnValue instanceof Serializable) {
361 clusterNodeResponse.setResult(returnValue);
362 }
363 else if (returnValue != null) {
364 clusterNodeResponse.setException(
365 new ClusterException("Return value is not serializable"));
366 }
367 }
368
369 return clusterNodeResponse;
370 }
371
372 protected JChannel getControlChannel() {
373 return _controlJChannel;
374 }
375
376 protected FutureClusterResponses getExecutionResults(String uuid) {
377 return _futureClusterResponses.get(uuid);
378 }
379
380 @Override
381 protected void initChannels() throws Exception {
382 Properties controlProperties = PropsUtil.getProperties(
383 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
384
385 String controlProperty = controlProperties.getProperty(
386 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
387
388 ClusterRequestReceiver clusterRequestReceiver =
389 new ClusterRequestReceiver(this);
390
391 _controlJChannel = createJChannel(
392 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
393 }
394
395 protected void initLocalClusterNode() throws Exception {
396 InetAddress inetAddress = bindInetAddress;
397
398 if (inetAddress == null) {
399 inetAddress = InetAddressUtil.getLocalInetAddress();
400 }
401
402 ClusterNode localClusterNode = new ClusterNode(
403 PortalUUIDUtil.generate(), inetAddress);
404
405 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
406 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
407 }
408 else {
409 localClusterNode.setPort(PortalUtil.getPortalPort(false));
410 }
411
412 _localClusterNode = localClusterNode;
413 }
414
415 protected boolean isShortcutLocalMethod() {
416 return _shortcutLocalMethod;
417 }
418
419 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
420 _liveInstances.put(joinAddress, clusterNode);
421
422 Address previousAddress = _clusterNodeAddresses.put(
423 clusterNode.getClusterNodeId(), joinAddress);
424
425 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
426 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
427
428
429
430 fireClusterEvent(clusterEvent);
431 }
432 }
433
434 protected void memberRemoved(List<Address> departAddresses) {
435 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
436
437 for (Address departAddress : departAddresses) {
438 ClusterNode departClusterNode = _liveInstances.remove(
439 departAddress);
440
441 if (departClusterNode == null) {
442 continue;
443 }
444
445 departClusterNodes.add(departClusterNode);
446
447 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
448 }
449
450 if (departClusterNodes.isEmpty()) {
451 return;
452 }
453
454 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
455
456 fireClusterEvent(clusterEvent);
457 }
458
459 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
460 boolean isMulticast = clusterRequest.isMulticast();
461
462 List<Address> addresses = null;
463
464 if (isMulticast) {
465 addresses = getAddresses(_controlJChannel);
466 }
467 else {
468 addresses = new ArrayList<Address>();
469
470 Collection<Address> clusterNodeAddresses =
471 clusterRequest.getTargetClusterNodeAddresses();
472
473 if (clusterNodeAddresses != null) {
474 addresses.addAll(clusterNodeAddresses);
475 }
476
477 Collection<String> clusterNodeIds =
478 clusterRequest.getTargetClusterNodeIds();
479
480 if (clusterNodeIds != null) {
481 for (String clusterNodeId : clusterNodeIds) {
482 Address address = _clusterNodeAddresses.get(clusterNodeId);
483
484 addresses.add(address);
485 }
486 }
487 }
488
489 if (clusterRequest.isSkipLocal()) {
490 addresses.remove(getLocalClusterNodeAddress());
491 }
492
493 return addresses;
494 }
495
496 protected void runLocalMethod(
497 ClusterRequest clusterRequest,
498 FutureClusterResponses futureClusterResponses) {
499
500 MethodHandler methodHandler = clusterRequest.getMethodHandler();
501
502 Object returnValue = null;
503 Exception exception = null;
504
505 if (methodHandler == null) {
506 exception = new ClusterException(
507 "Payload is not of type " + MethodHandler.class.getName());
508 }
509 else {
510 try {
511 returnValue = methodHandler.invoke(true);
512 }
513 catch (Exception e) {
514 exception = e;
515 }
516 }
517
518 if (!clusterRequest.isFireAndForget()) {
519 ClusterNodeResponse clusterNodeResponse =
520 generateClusterNodeResponse(
521 clusterRequest, returnValue, exception);
522
523 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
524 }
525 }
526
527 protected void sendNotifyRequest() {
528 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
529 ClusterMessageType.NOTIFY, _localClusterNode);
530
531 try {
532 _controlJChannel.send(null, clusterRequest);
533 }
534 catch (Exception e) {
535 _log.error("Unable to send notify message", e);
536 }
537 }
538
539 private static final String _DEFAULT_CLUSTER_NAME =
540 "LIFERAY-CONTROL-CHANNEL";
541
542 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
543
544 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
545 new CopyOnWriteArrayList<ClusterEventListener>();
546 private Map<String, Address> _clusterNodeAddresses =
547 new ConcurrentHashMap<String, Address>();
548 private JChannel _controlJChannel;
549 private ExecutorService _executorService;
550 private Map<String, FutureClusterResponses> _futureClusterResponses =
551 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
552 private Map<Address, ClusterNode> _liveInstances =
553 new ConcurrentHashMap<Address, ClusterNode>();
554 private Address _localAddress;
555 private ClusterNode _localClusterNode;
556 private boolean _shortcutLocalMethod;
557
558 private class ClusterResponseCallbackJob implements Runnable {
559
560 public ClusterResponseCallbackJob(
561 ClusterResponseCallback clusterResponseCallback,
562 FutureClusterResponses futureClusterResponses) {
563
564 _clusterResponseCallback = clusterResponseCallback;
565 _futureClusterResponses = futureClusterResponses;
566 _timeout = -1;
567 _timeoutGet = false;
568 _timeUnit = TimeUnit.SECONDS;
569 }
570
571 public ClusterResponseCallbackJob(
572 ClusterResponseCallback clusterResponseCallback,
573 FutureClusterResponses futureClusterResponses, long timeout,
574 TimeUnit timeUnit) {
575
576 _clusterResponseCallback = clusterResponseCallback;
577 _futureClusterResponses = futureClusterResponses;
578 _timeout = timeout;
579 _timeoutGet = true;
580 _timeUnit = timeUnit;
581 }
582
583 public void run() {
584 BlockingQueue<ClusterNodeResponse> blockingQueue =
585 _futureClusterResponses.getPartialResults();
586
587 _clusterResponseCallback.callback(blockingQueue);
588
589 ClusterNodeResponses clusterNodeResponses = null;
590
591 try {
592 if (_timeoutGet) {
593 clusterNodeResponses = _futureClusterResponses.get(
594 _timeout, _timeUnit);
595 }
596 else {
597 clusterNodeResponses = _futureClusterResponses.get();
598 }
599
600 _clusterResponseCallback.callback(clusterNodeResponses);
601 }
602 catch (InterruptedException ie) {
603 _clusterResponseCallback.processInterruptedException(ie);
604 }
605 catch (TimeoutException te) {
606 _clusterResponseCallback.processTimeoutException(te);
607 }
608 }
609
610 private final ClusterResponseCallback _clusterResponseCallback;
611 private final FutureClusterResponses _futureClusterResponses;
612 private final long _timeout;
613 private final boolean _timeoutGet;
614 private final TimeUnit _timeUnit;
615
616 }
617
618 }