001    /**
002     * Copyright (c) 2000-2011 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.InetAddressUtil;
031    import com.liferay.portal.kernel.util.MethodHandler;
032    import com.liferay.portal.kernel.util.NamedThreadFactory;
033    import com.liferay.portal.kernel.util.ObjectValuePair;
034    import com.liferay.portal.kernel.util.PropsKeys;
035    import com.liferay.portal.kernel.util.PropsUtil;
036    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
037    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038    import com.liferay.portal.util.PortalPortEventListener;
039    import com.liferay.portal.util.PortalUtil;
040    import com.liferay.portal.util.PropsValues;
041    
042    import java.io.Serializable;
043    
044    import java.net.InetAddress;
045    
046    import java.util.ArrayList;
047    import java.util.Collection;
048    import java.util.Collections;
049    import java.util.Iterator;
050    import java.util.List;
051    import java.util.Map;
052    import java.util.Properties;
053    import java.util.Set;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    import java.util.concurrent.Executors;
057    import java.util.concurrent.ScheduledExecutorService;
058    import java.util.concurrent.TimeUnit;
059    
060    import org.jgroups.ChannelException;
061    import org.jgroups.JChannel;
062    
063    /**
064     * @author Tina Tian
065     * @author Shuyang Zhou
066     */
067    public class ClusterExecutorImpl
068            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
069    
070            public void addClusterEventListener(
071                    ClusterEventListener clusterEventListener) {
072                    if (!isEnabled()) {
073                            return;
074                    }
075    
076                    _clusterEventListeners.addIfAbsent(clusterEventListener);
077            }
078    
079            @Override
080            public void afterPropertiesSet() {
081                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
082                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
083                    }
084    
085                    super.afterPropertiesSet();
086            }
087    
088            @Override
089            public void destroy() {
090                    if (!isEnabled()) {
091                            return;
092                    }
093    
094                    _scheduledExecutorService.shutdownNow();
095    
096                    _clusterRequestReceiver.destroy();
097    
098                    _controlChannel.close();
099            }
100    
101            public FutureClusterResponses execute(ClusterRequest clusterRequest)
102                    throws SystemException {
103    
104                    if (!isEnabled()) {
105                            return null;
106                    }
107    
108                    List<Address> addresses = prepareAddresses(clusterRequest);
109    
110                    FutureClusterResponses futureClusterResponses =
111                            new FutureClusterResponses(addresses);
112    
113                    if (!clusterRequest.isFireAndForget()) {
114                            String uuid = clusterRequest.getUuid();
115    
116                            _futureClusterResponses.put(uuid, futureClusterResponses);
117                    }
118    
119                    if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
120                            addresses.remove(getLocalClusterNodeAddress())) {
121    
122                            ClusterNodeResponse clusterNodeResponse = runLocalMethod(
123                                    clusterRequest.getMethodHandler());
124    
125                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
126                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
127    
128                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
129                    }
130    
131                    if (clusterRequest.isMulticast()) {
132                            sendMulticastRequest(clusterRequest);
133                    }
134                    else {
135                            sendUnicastRequest(clusterRequest, addresses);
136                    }
137    
138                    return futureClusterResponses;
139            }
140    
141            public List<ClusterEventListener> getClusterEventListeners() {
142                    if (!isEnabled()) {
143                            return Collections.emptyList();
144                    }
145    
146                    return Collections.unmodifiableList(_clusterEventListeners);
147            }
148    
149            public List<Address> getClusterNodeAddresses() {
150                    if (!isEnabled()) {
151                            return Collections.emptyList();
152                    }
153    
154                    removeExpiredInstances();
155    
156                    return new ArrayList<Address>(_clusterNodeAddresses.values());
157            }
158    
159            public List<ClusterNode> getClusterNodes() {
160                    if (!isEnabled()) {
161                            return Collections.emptyList();
162                    }
163    
164                    removeExpiredInstances();
165    
166                    Set<ObjectValuePair<Address, ClusterNode>> liveInstances =
167                            _liveInstances.keySet();
168    
169                    List<ClusterNode> liveClusterNodes = new ArrayList<ClusterNode>(
170                            liveInstances.size());
171    
172                    for (ObjectValuePair<Address, ClusterNode> liveInstance :
173                                    liveInstances) {
174    
175                            liveClusterNodes.add(liveInstance.getValue());
176                    }
177    
178                    return liveClusterNodes;
179            }
180    
181            public ClusterNode getLocalClusterNode() {
182                    if (!isEnabled()) {
183                            return null;
184                    }
185    
186                    return _localClusterNode;
187            }
188    
189            public Address getLocalClusterNodeAddress() {
190                    if (!isEnabled()) {
191                            return null;
192                    }
193    
194                    return _localAddress;
195            }
196    
197            public void initialize() {
198                    if (!isEnabled()) {
199                            return;
200                    }
201    
202                    PortalUtil.addPortalPortEventListener(this);
203    
204                    _localAddress = new AddressImpl(_controlChannel.getLocalAddress());
205    
206                    try {
207                            initLocalClusterNode();
208                    }
209                    catch (SystemException se) {
210                            _log.error("Unable to determine local network address", se);
211                    }
212    
213                    ObjectValuePair<Address, ClusterNode> localInstance =
214                            new ObjectValuePair<Address, ClusterNode>(
215                                    _localAddress, _localClusterNode);
216    
217                    _liveInstances.put(localInstance, Long.MAX_VALUE);
218    
219                    _clusterNodeAddresses.put(
220                            _localClusterNode.getClusterNodeId(), _localAddress);
221    
222                    _clusterRequestReceiver.initialize();
223    
224                    _scheduledExecutorService = Executors.newScheduledThreadPool(
225                            1,
226                            new NamedThreadFactory(
227                                    ClusterExecutorImpl.class.getName(), Thread.NORM_PRIORITY,
228                                    Thread.currentThread().getContextClassLoader()));
229    
230                    _scheduledExecutorService.scheduleWithFixedDelay(
231                            new HeartbeatTask(), 0,
232                            PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
233                            TimeUnit.MILLISECONDS);
234            }
235    
236            public boolean isClusterNodeAlive(Address address) {
237                    if (!isEnabled()) {
238                            return false;
239                    }
240    
241                    removeExpiredInstances();
242    
243                    return _clusterNodeAddresses.containsValue(address);
244            }
245    
246            public boolean isClusterNodeAlive(String clusterNodeId) {
247                    if (!isEnabled()) {
248                            return false;
249                    }
250    
251                    removeExpiredInstances();
252    
253                    return _clusterNodeAddresses.containsKey(clusterNodeId);
254            }
255    
256            @Override
257            public boolean isEnabled() {
258                    return PropsValues.CLUSTER_LINK_ENABLED;
259            }
260    
261            public void portalPortConfigured(int port) {
262                    if (!isEnabled() ||
263                            _localClusterNode.getPort() ==
264                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT) {
265    
266                            return;
267                    }
268    
269                    _localClusterNode.setPort(port);
270            }
271    
272            public void removeClusterEventListener(
273                    ClusterEventListener clusterEventListener) {
274    
275                    if (!isEnabled()) {
276                            return;
277                    }
278    
279                    _clusterEventListeners.remove(clusterEventListener);
280            }
281    
282            public void setClusterEventListeners(
283                    List<ClusterEventListener> clusterEventListeners) {
284    
285                    if (!isEnabled()) {
286                            return;
287                    }
288    
289                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
290            }
291    
292            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
293                    if (!isEnabled()) {
294                            return;
295                    }
296    
297                    _shortcutLocalMethod = shortcutLocalMethod;
298            }
299    
300            protected void fireClusterEvent(ClusterEvent clusterEvent) {
301                    for (ClusterEventListener listener : _clusterEventListeners) {
302                            listener.processClusterEvent(clusterEvent);
303                    }
304            }
305    
306            protected JChannel getControlChannel() {
307                    return _controlChannel;
308            }
309    
310            protected FutureClusterResponses getExecutionResults(String uuid) {
311                    return _futureClusterResponses.get(uuid);
312            }
313    
314            @Override
315            protected void initChannels() {
316                    Properties controlProperties = PropsUtil.getProperties(
317                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
318    
319                    String controlProperty = controlProperties.getProperty(
320                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
321    
322                    _clusterRequestReceiver = new ClusterRequestReceiver(this);
323    
324                    try {
325                            _controlChannel = createJChannel(
326                                    controlProperty, _clusterRequestReceiver,
327                                    _DEFAULT_CLUSTER_NAME);
328                    }
329                    catch (ChannelException ce) {
330                            _log.error(ce, ce);
331                    }
332                    catch (Exception e) {
333                            _log.error(e, e);
334                    }
335            }
336    
337            protected void initLocalClusterNode() throws SystemException {
338                    _localClusterNode = new ClusterNode(PortalUUIDUtil.generate());
339    
340                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
341                            _localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
342                    }
343                    else {
344                            _localClusterNode.setPort(PortalUtil.getPortalPort(false));
345                    }
346    
347                    try {
348                            InetAddress inetAddress = bindInetAddress;
349    
350                            if (inetAddress == null) {
351                                    inetAddress = InetAddressUtil.getLocalInetAddress();
352                            }
353    
354                            _localClusterNode.setInetAddress(inetAddress);
355    
356                            _localClusterNode.setHostName(inetAddress.getHostName());
357                    }
358                    catch (Exception e) {
359                            throw new SystemException(
360                                    "Unable to determine local network address", e);
361                    }
362            }
363    
364            protected boolean isShortcutLocalMethod() {
365                    return _shortcutLocalMethod;
366            }
367    
368            protected void notify(
369                    Address address, ClusterNode clusterNode, long expirationTime) {
370    
371                    removeExpiredInstances();
372    
373                    if (System.currentTimeMillis() > expirationTime) {
374                            return;
375                    }
376    
377                    ObjectValuePair<Address, ClusterNode> liveInstance =
378                            new ObjectValuePair<Address, ClusterNode>(address, clusterNode);
379    
380                    Long oldExpirationTime = _liveInstances.put(
381                            liveInstance, expirationTime);
382    
383                    if ((oldExpirationTime != null) ||
384                            ((_localAddress != null) && _localAddress.equals(address))) {
385    
386                            return;
387                    }
388    
389                    _clusterNodeAddresses.put(clusterNode.getClusterNodeId(), address);
390    
391                    ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
392    
393                    fireClusterEvent(clusterEvent);
394            }
395    
396            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
397                    boolean isMulticast = clusterRequest.isMulticast();
398    
399                    List<Address> addresses = null;
400    
401                    if (isMulticast) {
402                            addresses = getAddresses(_controlChannel);
403                    }
404                    else {
405                            addresses = new ArrayList<Address>();
406    
407                            Collection<Address> clusterNodeAddresses =
408                                    clusterRequest.getTargetClusterNodeAddresses();
409    
410                            if (clusterNodeAddresses != null) {
411                                    addresses.addAll(clusterNodeAddresses);
412                            }
413    
414                            Collection<String> clusterNodeIds =
415                                    clusterRequest.getTargetClusterNodeIds();
416    
417                            if (clusterNodeIds != null) {
418                                    for (String clusterNodeId : clusterNodeIds) {
419                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
420    
421                                            addresses.add(address);
422                                    }
423                            }
424                    }
425    
426                    return addresses;
427            }
428    
429            protected void removeExpiredInstances() {
430                    if (_liveInstances.isEmpty()) {
431                            return;
432                    }
433    
434                    Set<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>>
435                            liveInstances = _liveInstances.entrySet();
436    
437                    Iterator<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>> itr =
438                            liveInstances.iterator();
439    
440                    long now = System.currentTimeMillis();
441    
442                    while (itr.hasNext()) {
443                            Map.Entry<ObjectValuePair<Address, ClusterNode>, Long> entry =
444                                    itr.next();
445    
446                            long expirationTime = entry.getValue();
447    
448                            if (now < expirationTime) {
449                                    continue;
450                            }
451    
452                            ObjectValuePair<Address, ClusterNode> liveInstance = entry.getKey();
453    
454                            ClusterNode clusterNode = liveInstance.getValue();
455    
456                            if (_localClusterNode.equals(clusterNode)) {
457                                    continue;
458                            }
459    
460                            _clusterNodeAddresses.remove(clusterNode.getClusterNodeId());
461    
462                            itr.remove();
463    
464                            ClusterEvent clusterEvent = ClusterEvent.depart(clusterNode);
465    
466                            fireClusterEvent(clusterEvent);
467                    }
468            }
469    
470            protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler) {
471                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
472    
473                    ClusterNode localClusterNode = getLocalClusterNode();
474    
475                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
476                    clusterNodeResponse.setClusterNode(localClusterNode);
477                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
478    
479                    if (methodHandler == null) {
480                            clusterNodeResponse.setException(
481                                    new ClusterException(
482                                            "Payload is not of type " + MethodHandler.class.getName()));
483    
484                            return clusterNodeResponse;
485                    }
486    
487                    try {
488                            Object returnValue = methodHandler.invoke(true);
489    
490                            if (returnValue instanceof Serializable) {
491                                    clusterNodeResponse.setResult(returnValue);
492                            }
493                            else if (returnValue != null) {
494                                    clusterNodeResponse.setException(
495                                            new ClusterException("Return value is not serializable"));
496                            }
497                    }
498                    catch (Exception e) {
499                            clusterNodeResponse.setException(e);
500                    }
501    
502                    return clusterNodeResponse;
503            }
504    
505            protected void sendMulticastRequest(ClusterRequest clusterRequest)
506                    throws SystemException {
507    
508                    try {
509                            _controlChannel.send(null, null, clusterRequest);
510                    }
511                    catch (ChannelException ce) {
512                            _log.error(
513                                    "Unable to send multicast message " + clusterRequest, ce);
514    
515                            throw new SystemException(
516                                    "Unable to send multicast request", ce);
517                    }
518            }
519    
520            protected void sendUnicastRequest(
521                            ClusterRequest clusterRequest, List<Address> addresses)
522                    throws SystemException {
523    
524                    for (Address address : addresses) {
525                            org.jgroups.Address jGroupsAddress =
526                                    (org.jgroups.Address)address.getRealAddress();
527    
528                            try {
529                                    _controlChannel.send(jGroupsAddress, null, clusterRequest);
530                            }
531                            catch (ChannelException ce) {
532                                    _log.error(
533                                            "Unable to send unicast message " + clusterRequest, ce);
534    
535                                    throw new SystemException(
536                                            "Unable to send unicast request", ce);
537                            }
538                    }
539            }
540    
541            private static final String _DEFAULT_CLUSTER_NAME =
542                    "LIFERAY-CONTROL-CHANNEL";
543    
544            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
545    
546            public ScheduledExecutorService _scheduledExecutorService;
547            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
548                    new CopyOnWriteArrayList<ClusterEventListener>();
549            private Map<String, Address> _clusterNodeAddresses =
550                    new ConcurrentHashMap<String, Address>();
551            private ClusterRequestReceiver _clusterRequestReceiver;
552            private JChannel _controlChannel;
553            private Map<String, FutureClusterResponses> _futureClusterResponses =
554                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
555            private Map<ObjectValuePair<Address, ClusterNode>, Long> _liveInstances =
556                    new ConcurrentHashMap<ObjectValuePair<Address, ClusterNode>, Long>();
557            private Address _localAddress;
558            private ClusterNode _localClusterNode;
559            private boolean _shortcutLocalMethod;
560    
561            private class HeartbeatTask implements Runnable {
562    
563                    public void run() {
564                            try {
565                                    ClusterRequest clusterNotifyRequest =
566                                            ClusterRequest.createClusterNotifyRequest(
567                                                    _localClusterNode);
568    
569                                    sendMulticastRequest(clusterNotifyRequest);
570                            }
571                            catch (Exception e) {
572                                    if (_log.isDebugEnabled()) {
573                                            _log.debug("Unable to send check in request", e);
574                                    }
575    
576                                    _scheduledExecutorService.scheduleWithFixedDelay(
577                                            new HeartbeatTask(), 0,
578                                            PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
579                                            TimeUnit.MILLISECONDS);
580                            }
581                    }
582    
583            }
584    
585    }