001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.InetAddressUtil;
031 import com.liferay.portal.kernel.util.MethodHandler;
032 import com.liferay.portal.kernel.util.NamedThreadFactory;
033 import com.liferay.portal.kernel.util.ObjectValuePair;
034 import com.liferay.portal.kernel.util.PropsKeys;
035 import com.liferay.portal.kernel.util.PropsUtil;
036 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
037 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038 import com.liferay.portal.util.PortalPortEventListener;
039 import com.liferay.portal.util.PortalUtil;
040 import com.liferay.portal.util.PropsValues;
041
042 import java.io.Serializable;
043
044 import java.net.InetAddress;
045
046 import java.util.ArrayList;
047 import java.util.Collection;
048 import java.util.Collections;
049 import java.util.Iterator;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Properties;
053 import java.util.Set;
054 import java.util.concurrent.ConcurrentHashMap;
055 import java.util.concurrent.CopyOnWriteArrayList;
056 import java.util.concurrent.Executors;
057 import java.util.concurrent.ScheduledExecutorService;
058 import java.util.concurrent.TimeUnit;
059
060 import org.jgroups.ChannelException;
061 import org.jgroups.JChannel;
062
063
067 public class ClusterExecutorImpl
068 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
069
070 public void addClusterEventListener(
071 ClusterEventListener clusterEventListener) {
072 if (!isEnabled()) {
073 return;
074 }
075
076 _clusterEventListeners.addIfAbsent(clusterEventListener);
077 }
078
079 @Override
080 public void afterPropertiesSet() {
081 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
082 addClusterEventListener(new DebuggingClusterEventListenerImpl());
083 }
084
085 super.afterPropertiesSet();
086 }
087
088 @Override
089 public void destroy() {
090 if (!isEnabled()) {
091 return;
092 }
093
094 _scheduledExecutorService.shutdownNow();
095
096 _clusterRequestReceiver.destroy();
097
098 _controlChannel.close();
099 }
100
101 public FutureClusterResponses execute(ClusterRequest clusterRequest)
102 throws SystemException {
103
104 if (!isEnabled()) {
105 return null;
106 }
107
108 List<Address> addresses = prepareAddresses(clusterRequest);
109
110 FutureClusterResponses futureClusterResponses =
111 new FutureClusterResponses(addresses);
112
113 if (!clusterRequest.isFireAndForget()) {
114 String uuid = clusterRequest.getUuid();
115
116 _futureClusterResponses.put(uuid, futureClusterResponses);
117 }
118
119 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
120 addresses.remove(getLocalClusterNodeAddress())) {
121
122 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
123 clusterRequest.getMethodHandler());
124
125 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
126 clusterNodeResponse.setUuid(clusterRequest.getUuid());
127
128 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
129 }
130
131 if (clusterRequest.isMulticast()) {
132 sendMulticastRequest(clusterRequest);
133 }
134 else {
135 sendUnicastRequest(clusterRequest, addresses);
136 }
137
138 return futureClusterResponses;
139 }
140
141 public List<ClusterEventListener> getClusterEventListeners() {
142 if (!isEnabled()) {
143 return Collections.emptyList();
144 }
145
146 return Collections.unmodifiableList(_clusterEventListeners);
147 }
148
149 public List<Address> getClusterNodeAddresses() {
150 if (!isEnabled()) {
151 return Collections.emptyList();
152 }
153
154 removeExpiredInstances();
155
156 return new ArrayList<Address>(_clusterNodeAddresses.values());
157 }
158
159 public List<ClusterNode> getClusterNodes() {
160 if (!isEnabled()) {
161 return Collections.emptyList();
162 }
163
164 removeExpiredInstances();
165
166 Set<ObjectValuePair<Address, ClusterNode>> liveInstances =
167 _liveInstances.keySet();
168
169 List<ClusterNode> liveClusterNodes = new ArrayList<ClusterNode>(
170 liveInstances.size());
171
172 for (ObjectValuePair<Address, ClusterNode> liveInstance :
173 liveInstances) {
174
175 liveClusterNodes.add(liveInstance.getValue());
176 }
177
178 return liveClusterNodes;
179 }
180
181 public ClusterNode getLocalClusterNode() {
182 if (!isEnabled()) {
183 return null;
184 }
185
186 return _localClusterNode;
187 }
188
189 public Address getLocalClusterNodeAddress() {
190 if (!isEnabled()) {
191 return null;
192 }
193
194 return _localAddress;
195 }
196
197 public void initialize() {
198 if (!isEnabled()) {
199 return;
200 }
201
202 PortalUtil.addPortalPortEventListener(this);
203
204 _localAddress = new AddressImpl(_controlChannel.getLocalAddress());
205
206 try {
207 initLocalClusterNode();
208 }
209 catch (SystemException se) {
210 _log.error("Unable to determine local network address", se);
211 }
212
213 ObjectValuePair<Address, ClusterNode> localInstance =
214 new ObjectValuePair<Address, ClusterNode>(
215 _localAddress, _localClusterNode);
216
217 _liveInstances.put(localInstance, Long.MAX_VALUE);
218
219 _clusterNodeAddresses.put(
220 _localClusterNode.getClusterNodeId(), _localAddress);
221
222 _clusterRequestReceiver.initialize();
223
224 _scheduledExecutorService = Executors.newScheduledThreadPool(
225 1,
226 new NamedThreadFactory(
227 ClusterExecutorImpl.class.getName(), Thread.NORM_PRIORITY,
228 Thread.currentThread().getContextClassLoader()));
229
230 _scheduledExecutorService.scheduleWithFixedDelay(
231 new HeartbeatTask(), 0,
232 PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
233 TimeUnit.MILLISECONDS);
234 }
235
236 public boolean isClusterNodeAlive(Address address) {
237 if (!isEnabled()) {
238 return false;
239 }
240
241 removeExpiredInstances();
242
243 return _clusterNodeAddresses.containsValue(address);
244 }
245
246 public boolean isClusterNodeAlive(String clusterNodeId) {
247 if (!isEnabled()) {
248 return false;
249 }
250
251 removeExpiredInstances();
252
253 return _clusterNodeAddresses.containsKey(clusterNodeId);
254 }
255
256 @Override
257 public boolean isEnabled() {
258 return PropsValues.CLUSTER_LINK_ENABLED;
259 }
260
261 public void portalPortConfigured(int port) {
262 if (!isEnabled() ||
263 _localClusterNode.getPort() ==
264 PropsValues.PORTAL_INSTANCE_HTTP_PORT) {
265
266 return;
267 }
268
269 _localClusterNode.setPort(port);
270 }
271
272 public void removeClusterEventListener(
273 ClusterEventListener clusterEventListener) {
274
275 if (!isEnabled()) {
276 return;
277 }
278
279 _clusterEventListeners.remove(clusterEventListener);
280 }
281
282 public void setClusterEventListeners(
283 List<ClusterEventListener> clusterEventListeners) {
284
285 if (!isEnabled()) {
286 return;
287 }
288
289 _clusterEventListeners.addAllAbsent(clusterEventListeners);
290 }
291
292 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
293 if (!isEnabled()) {
294 return;
295 }
296
297 _shortcutLocalMethod = shortcutLocalMethod;
298 }
299
300 protected void fireClusterEvent(ClusterEvent clusterEvent) {
301 for (ClusterEventListener listener : _clusterEventListeners) {
302 listener.processClusterEvent(clusterEvent);
303 }
304 }
305
306 protected JChannel getControlChannel() {
307 return _controlChannel;
308 }
309
310 protected FutureClusterResponses getExecutionResults(String uuid) {
311 return _futureClusterResponses.get(uuid);
312 }
313
314 @Override
315 protected void initChannels() {
316 Properties controlProperties = PropsUtil.getProperties(
317 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
318
319 String controlProperty = controlProperties.getProperty(
320 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
321
322 _clusterRequestReceiver = new ClusterRequestReceiver(this);
323
324 try {
325 _controlChannel = createJChannel(
326 controlProperty, _clusterRequestReceiver,
327 _DEFAULT_CLUSTER_NAME);
328 }
329 catch (ChannelException ce) {
330 _log.error(ce, ce);
331 }
332 catch (Exception e) {
333 _log.error(e, e);
334 }
335 }
336
337 protected void initLocalClusterNode() throws SystemException {
338 _localClusterNode = new ClusterNode(PortalUUIDUtil.generate());
339
340 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
341 _localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
342 }
343 else {
344 _localClusterNode.setPort(PortalUtil.getPortalPort(false));
345 }
346
347 try {
348 InetAddress inetAddress = bindInetAddress;
349
350 if (inetAddress == null) {
351 inetAddress = InetAddressUtil.getLocalInetAddress();
352 }
353
354 _localClusterNode.setInetAddress(inetAddress);
355
356 _localClusterNode.setHostName(inetAddress.getHostName());
357 }
358 catch (Exception e) {
359 throw new SystemException(
360 "Unable to determine local network address", e);
361 }
362 }
363
364 protected boolean isShortcutLocalMethod() {
365 return _shortcutLocalMethod;
366 }
367
368 protected void notify(
369 Address address, ClusterNode clusterNode, long expirationTime) {
370
371 removeExpiredInstances();
372
373 if (System.currentTimeMillis() > expirationTime) {
374 return;
375 }
376
377 ObjectValuePair<Address, ClusterNode> liveInstance =
378 new ObjectValuePair<Address, ClusterNode>(address, clusterNode);
379
380 Long oldExpirationTime = _liveInstances.put(
381 liveInstance, expirationTime);
382
383 if ((oldExpirationTime != null) ||
384 ((_localAddress != null) && _localAddress.equals(address))) {
385
386 return;
387 }
388
389 _clusterNodeAddresses.put(clusterNode.getClusterNodeId(), address);
390
391 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
392
393 fireClusterEvent(clusterEvent);
394 }
395
396 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
397 boolean isMulticast = clusterRequest.isMulticast();
398
399 List<Address> addresses = null;
400
401 if (isMulticast) {
402 addresses = getAddresses(_controlChannel);
403 }
404 else {
405 addresses = new ArrayList<Address>();
406
407 Collection<Address> clusterNodeAddresses =
408 clusterRequest.getTargetClusterNodeAddresses();
409
410 if (clusterNodeAddresses != null) {
411 addresses.addAll(clusterNodeAddresses);
412 }
413
414 Collection<String> clusterNodeIds =
415 clusterRequest.getTargetClusterNodeIds();
416
417 if (clusterNodeIds != null) {
418 for (String clusterNodeId : clusterNodeIds) {
419 Address address = _clusterNodeAddresses.get(clusterNodeId);
420
421 addresses.add(address);
422 }
423 }
424 }
425
426 return addresses;
427 }
428
429 protected void removeExpiredInstances() {
430 if (_liveInstances.isEmpty()) {
431 return;
432 }
433
434 Set<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>>
435 liveInstances = _liveInstances.entrySet();
436
437 Iterator<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>> itr =
438 liveInstances.iterator();
439
440 long now = System.currentTimeMillis();
441
442 while (itr.hasNext()) {
443 Map.Entry<ObjectValuePair<Address, ClusterNode>, Long> entry =
444 itr.next();
445
446 long expirationTime = entry.getValue();
447
448 if (now < expirationTime) {
449 continue;
450 }
451
452 ObjectValuePair<Address, ClusterNode> liveInstance = entry.getKey();
453
454 ClusterNode clusterNode = liveInstance.getValue();
455
456 if (_localClusterNode.equals(clusterNode)) {
457 continue;
458 }
459
460 _clusterNodeAddresses.remove(clusterNode.getClusterNodeId());
461
462 itr.remove();
463
464 ClusterEvent clusterEvent = ClusterEvent.depart(clusterNode);
465
466 fireClusterEvent(clusterEvent);
467 }
468 }
469
470 protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler) {
471 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
472
473 ClusterNode localClusterNode = getLocalClusterNode();
474
475 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
476 clusterNodeResponse.setClusterNode(localClusterNode);
477 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
478
479 if (methodHandler == null) {
480 clusterNodeResponse.setException(
481 new ClusterException(
482 "Payload is not of type " + MethodHandler.class.getName()));
483
484 return clusterNodeResponse;
485 }
486
487 try {
488 Object returnValue = methodHandler.invoke(true);
489
490 if (returnValue instanceof Serializable) {
491 clusterNodeResponse.setResult(returnValue);
492 }
493 else if (returnValue != null) {
494 clusterNodeResponse.setException(
495 new ClusterException("Return value is not serializable"));
496 }
497 }
498 catch (Exception e) {
499 clusterNodeResponse.setException(e);
500 }
501
502 return clusterNodeResponse;
503 }
504
505 protected void sendMulticastRequest(ClusterRequest clusterRequest)
506 throws SystemException {
507
508 try {
509 _controlChannel.send(null, null, clusterRequest);
510 }
511 catch (ChannelException ce) {
512 _log.error(
513 "Unable to send multicast message " + clusterRequest, ce);
514
515 throw new SystemException(
516 "Unable to send multicast request", ce);
517 }
518 }
519
520 protected void sendUnicastRequest(
521 ClusterRequest clusterRequest, List<Address> addresses)
522 throws SystemException {
523
524 for (Address address : addresses) {
525 org.jgroups.Address jGroupsAddress =
526 (org.jgroups.Address)address.getRealAddress();
527
528 try {
529 _controlChannel.send(jGroupsAddress, null, clusterRequest);
530 }
531 catch (ChannelException ce) {
532 _log.error(
533 "Unable to send unicast message " + clusterRequest, ce);
534
535 throw new SystemException(
536 "Unable to send unicast request", ce);
537 }
538 }
539 }
540
541 private static final String _DEFAULT_CLUSTER_NAME =
542 "LIFERAY-CONTROL-CHANNEL";
543
544 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
545
546 public ScheduledExecutorService _scheduledExecutorService;
547 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
548 new CopyOnWriteArrayList<ClusterEventListener>();
549 private Map<String, Address> _clusterNodeAddresses =
550 new ConcurrentHashMap<String, Address>();
551 private ClusterRequestReceiver _clusterRequestReceiver;
552 private JChannel _controlChannel;
553 private Map<String, FutureClusterResponses> _futureClusterResponses =
554 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
555 private Map<ObjectValuePair<Address, ClusterNode>, Long> _liveInstances =
556 new ConcurrentHashMap<ObjectValuePair<Address, ClusterNode>, Long>();
557 private Address _localAddress;
558 private ClusterNode _localClusterNode;
559 private boolean _shortcutLocalMethod;
560
561 private class HeartbeatTask implements Runnable {
562
563 public void run() {
564 try {
565 ClusterRequest clusterNotifyRequest =
566 ClusterRequest.createClusterNotifyRequest(
567 _localClusterNode);
568
569 sendMulticastRequest(clusterNotifyRequest);
570 }
571 catch (Exception e) {
572 if (_log.isDebugEnabled()) {
573 _log.debug("Unable to send check in request", e);
574 }
575
576 _scheduledExecutorService.scheduleWithFixedDelay(
577 new HeartbeatTask(), 0,
578 PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
579 TimeUnit.MILLISECONDS);
580 }
581 }
582
583 }
584
585 }