001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.util.InetAddressUtil;
034    import com.liferay.portal.kernel.util.MethodHandler;
035    import com.liferay.portal.kernel.util.PropsKeys;
036    import com.liferay.portal.kernel.util.PropsUtil;
037    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039    import com.liferay.portal.util.PortalPortEventListener;
040    import com.liferay.portal.util.PortalUtil;
041    import com.liferay.portal.util.PropsValues;
042    
043    import java.io.Serializable;
044    
045    import java.net.InetAddress;
046    
047    import java.util.ArrayList;
048    import java.util.Collection;
049    import java.util.Collections;
050    import java.util.List;
051    import java.util.Map;
052    import java.util.Properties;
053    import java.util.concurrent.BlockingQueue;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    import java.util.concurrent.ExecutorService;
057    import java.util.concurrent.TimeUnit;
058    import java.util.concurrent.TimeoutException;
059    
060    import org.jgroups.ChannelException;
061    import org.jgroups.JChannel;
062    
063    /**
064     * @author Tina Tian
065     * @author Shuyang Zhou
066     */
067    public class ClusterExecutorImpl
068            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
069    
070            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
071                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
072    
073            public void addClusterEventListener(
074                    ClusterEventListener clusterEventListener) {
075    
076                    if (!isEnabled()) {
077                            return;
078                    }
079    
080                    _clusterEventListeners.addIfAbsent(clusterEventListener);
081            }
082    
083            @Override
084            public void afterPropertiesSet() {
085                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
086                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
087                    }
088    
089                    if (PropsValues.LIVE_USERS_ENABLED) {
090                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
091                    }
092    
093                    super.afterPropertiesSet();
094            }
095    
096            @Override
097            public void destroy() {
098                    if (!isEnabled()) {
099                            return;
100                    }
101    
102                    PortalExecutorManagerUtil.shutdown(
103                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
104    
105                    _controlJChannel.close();
106            }
107    
108            public FutureClusterResponses execute(ClusterRequest clusterRequest)
109                    throws SystemException {
110    
111                    if (!isEnabled()) {
112                            return null;
113                    }
114    
115                    List<Address> addresses = prepareAddresses(clusterRequest);
116    
117                    FutureClusterResponses futureClusterResponses =
118                            new FutureClusterResponses(addresses);
119    
120                    if (!clusterRequest.isFireAndForget()) {
121                            String uuid = clusterRequest.getUuid();
122    
123                            _futureClusterResponses.put(uuid, futureClusterResponses);
124                    }
125    
126                    if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
127                            addresses.remove(getLocalClusterNodeAddress())) {
128    
129                            ClusterNodeResponse clusterNodeResponse = runLocalMethod(
130                                    clusterRequest.getMethodHandler());
131    
132                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
133                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
134    
135                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
136                    }
137    
138                    if (clusterRequest.isMulticast()) {
139                            sendMulticastRequest(clusterRequest);
140                    }
141                    else {
142                            sendUnicastRequest(clusterRequest, addresses);
143                    }
144    
145                    return futureClusterResponses;
146            }
147    
148            public void execute(
149                            ClusterRequest clusterRequest,
150                            ClusterResponseCallback clusterResponseCallback)
151                    throws SystemException {
152    
153                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
154    
155                    ClusterResponseCallbackJob clusterResponseCallbackJob =
156                            new ClusterResponseCallbackJob(
157                                    clusterResponseCallback, futureClusterResponses);
158    
159                    _executorService.execute(clusterResponseCallbackJob);
160            }
161    
162            public void execute(
163                            ClusterRequest clusterRequest,
164                            ClusterResponseCallback clusterResponseCallback, long timeout,
165                            TimeUnit timeUnit)
166                    throws SystemException {
167    
168                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
169    
170                    ClusterResponseCallbackJob clusterResponseCallbackJob =
171                            new ClusterResponseCallbackJob(
172                                    clusterResponseCallback, futureClusterResponses, timeout,
173                                    timeUnit);
174    
175                    _executorService.execute(clusterResponseCallbackJob);
176            }
177    
178            public List<ClusterEventListener> getClusterEventListeners() {
179                    if (!isEnabled()) {
180                            return Collections.emptyList();
181                    }
182    
183                    return Collections.unmodifiableList(_clusterEventListeners);
184            }
185    
186            public List<Address> getClusterNodeAddresses() {
187                    if (!isEnabled()) {
188                            return Collections.emptyList();
189                    }
190    
191                    return getAddresses(_controlJChannel);
192            }
193    
194            public List<ClusterNode> getClusterNodes() {
195                    if (!isEnabled()) {
196                            return Collections.emptyList();
197                    }
198    
199                    return new ArrayList<ClusterNode>(_liveInstances.values());
200            }
201    
202            public ClusterNode getLocalClusterNode() {
203                    if (!isEnabled()) {
204                            return null;
205                    }
206    
207                    return _localClusterNode;
208            }
209    
210            public Address getLocalClusterNodeAddress() {
211                    if (!isEnabled()) {
212                            return null;
213                    }
214    
215                    return _localAddress;
216            }
217    
218            public void initialize() {
219                    if (!isEnabled()) {
220                            return;
221                    }
222    
223                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
224                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
225    
226                    PortalUtil.addPortalPortEventListener(this);
227    
228                    _localAddress = new AddressImpl(_controlJChannel.getLocalAddress());
229    
230                    try {
231                            initLocalClusterNode();
232                    }
233                    catch (SystemException se) {
234                            _log.error("Unable to determine local network address", se);
235                    }
236    
237                    memberJoined(_localAddress, _localClusterNode);
238    
239                    sendNotifyRequest();
240    
241                    ClusterRequestReceiver clusterRequestReceiver =
242                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
243    
244                    clusterRequestReceiver.openLatch();
245            }
246    
247            public boolean isClusterNodeAlive(Address address) {
248                    if (!isEnabled()) {
249                            return false;
250                    }
251    
252                    List<Address> addresses = getAddresses(_controlJChannel);
253    
254                    return addresses.contains(address);
255            }
256    
257            public boolean isClusterNodeAlive(String clusterNodeId) {
258                    if (!isEnabled()) {
259                            return false;
260                    }
261    
262                    return _clusterNodeAddresses.containsKey(clusterNodeId);
263            }
264    
265            @Override
266            public boolean isEnabled() {
267                    return PropsValues.CLUSTER_LINK_ENABLED;
268            }
269    
270            public void portalPortConfigured(int port) {
271                    if (!isEnabled() ||
272                            (_localClusterNode.getPort() ==
273                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
274    
275                            return;
276                    }
277    
278                    try {
279                            _localClusterNode.setPort(port);
280    
281                            memberJoined(_localAddress, _localClusterNode);
282    
283                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
284                                    ClusterMessageType.UPDATE, _localClusterNode);
285    
286                            _controlJChannel.send(null, null, clusterRequest);
287                    }
288                    catch (Exception e) {
289                            _log.error("Unable to determine configure node port", e);
290                    }
291            }
292    
293            public void removeClusterEventListener(
294                    ClusterEventListener clusterEventListener) {
295    
296                    if (!isEnabled()) {
297                            return;
298                    }
299    
300                    _clusterEventListeners.remove(clusterEventListener);
301            }
302    
303            public void setClusterEventListeners(
304                    List<ClusterEventListener> clusterEventListeners) {
305    
306                    if (!isEnabled()) {
307                            return;
308                    }
309    
310                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
311            }
312    
313            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
314                    if (!isEnabled()) {
315                            return;
316                    }
317    
318                    _shortcutLocalMethod = shortcutLocalMethod;
319            }
320    
321            protected void fireClusterEvent(ClusterEvent clusterEvent) {
322                    for (ClusterEventListener listener : _clusterEventListeners) {
323                            listener.processClusterEvent(clusterEvent);
324                    }
325            }
326    
327            protected JChannel getControlChannel() {
328                    return _controlJChannel;
329            }
330    
331            protected FutureClusterResponses getExecutionResults(String uuid) {
332                    return _futureClusterResponses.get(uuid);
333            }
334    
335            @Override
336            protected void initChannels() {
337                    Properties controlProperties = PropsUtil.getProperties(
338                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
339    
340                    String controlProperty = controlProperties.getProperty(
341                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
342    
343                    ClusterRequestReceiver clusterRequestReceiver =
344                            new ClusterRequestReceiver(this);
345    
346                    try {
347                            _controlJChannel = createJChannel(
348                                    controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
349                    }
350                    catch (ChannelException ce) {
351                            _log.error(ce, ce);
352                    }
353                    catch (Exception e) {
354                            _log.error(e, e);
355                    }
356            }
357    
358            protected void initLocalClusterNode() throws SystemException {
359                    _localClusterNode = new ClusterNode(PortalUUIDUtil.generate());
360    
361                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
362                            _localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
363                    }
364                    else {
365                            _localClusterNode.setPort(PortalUtil.getPortalPort(false));
366                    }
367    
368                    try {
369                            InetAddress inetAddress = bindInetAddress;
370    
371                            if (inetAddress == null) {
372                                    inetAddress = InetAddressUtil.getLocalInetAddress();
373                            }
374    
375                            _localClusterNode.setInetAddress(inetAddress);
376    
377                            _localClusterNode.setHostName(inetAddress.getHostName());
378                    }
379                    catch (Exception e) {
380                            throw new SystemException(
381                                    "Unable to determine local network address", e);
382                    }
383            }
384    
385            protected boolean isShortcutLocalMethod() {
386                    return _shortcutLocalMethod;
387            }
388    
389            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
390                    _liveInstances.put(joinAddress, clusterNode);
391    
392                    Address previousAddress = _clusterNodeAddresses.put(
393                            clusterNode.getClusterNodeId(), joinAddress);
394    
395                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
396                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
397    
398                            fireClusterEvent(clusterEvent);
399                    }
400            }
401    
402            protected void memberRemoved(List<Address> departAddresses) {
403                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
404    
405                    for (Address departAddress : departAddresses) {
406                            ClusterNode departClusterNode = _liveInstances.remove(
407                                    departAddress);
408    
409                            if (departClusterNode == null) {
410                                    continue;
411                            }
412    
413                            departClusterNodes.add(departClusterNode);
414    
415                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
416                    }
417    
418                    if (departClusterNodes.isEmpty()) {
419                            return;
420                    }
421    
422                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
423    
424                    fireClusterEvent(clusterEvent);
425            }
426    
427            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
428                    boolean isMulticast = clusterRequest.isMulticast();
429    
430                    List<Address> addresses = null;
431    
432                    if (isMulticast) {
433                            addresses = getAddresses(_controlJChannel);
434                    }
435                    else {
436                            addresses = new ArrayList<Address>();
437    
438                            Collection<Address> clusterNodeAddresses =
439                                    clusterRequest.getTargetClusterNodeAddresses();
440    
441                            if (clusterNodeAddresses != null) {
442                                    addresses.addAll(clusterNodeAddresses);
443                            }
444    
445                            Collection<String> clusterNodeIds =
446                                    clusterRequest.getTargetClusterNodeIds();
447    
448                            if (clusterNodeIds != null) {
449                                    for (String clusterNodeId : clusterNodeIds) {
450                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
451    
452                                            addresses.add(address);
453                                    }
454                            }
455                    }
456    
457                    return addresses;
458            }
459    
460            protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler) {
461                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
462    
463                    ClusterNode localClusterNode = getLocalClusterNode();
464    
465                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
466                    clusterNodeResponse.setClusterNode(localClusterNode);
467                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
468    
469                    if (methodHandler == null) {
470                            clusterNodeResponse.setException(
471                                    new ClusterException(
472                                            "Payload is not of type " + MethodHandler.class.getName()));
473    
474                            return clusterNodeResponse;
475                    }
476    
477                    try {
478                            Object returnValue = methodHandler.invoke(true);
479    
480                            if (returnValue instanceof Serializable) {
481                                    clusterNodeResponse.setResult(returnValue);
482                            }
483                            else if (returnValue != null) {
484                                    clusterNodeResponse.setException(
485                                            new ClusterException("Return value is not serializable"));
486                            }
487                    }
488                    catch (Exception e) {
489                            clusterNodeResponse.setException(e);
490                    }
491    
492                    return clusterNodeResponse;
493            }
494    
495            protected void sendMulticastRequest(ClusterRequest clusterRequest)
496                    throws SystemException {
497    
498                    try {
499                            _controlJChannel.send(null, null, clusterRequest);
500                    }
501                    catch (ChannelException ce) {
502                            _log.error(
503                                    "Unable to send multicast message " + clusterRequest, ce);
504    
505                            throw new SystemException("Unable to send multicast request", ce);
506                    }
507            }
508    
509            protected void sendNotifyRequest() {
510                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
511                            ClusterMessageType.NOTIFY, _localClusterNode);
512    
513                    try {
514                            _controlJChannel.send(null, null, clusterRequest);
515                    }
516                    catch (ChannelException ce) {
517                            _log.error("Unable to send multicast message", ce);
518                    }
519            }
520    
521            protected void sendUnicastRequest(
522                            ClusterRequest clusterRequest, List<Address> addresses)
523                    throws SystemException {
524    
525                    for (Address address : addresses) {
526                            org.jgroups.Address jGroupsAddress =
527                                    (org.jgroups.Address)address.getRealAddress();
528    
529                            try {
530                                    _controlJChannel.send(jGroupsAddress, null, clusterRequest);
531                            }
532                            catch (ChannelException ce) {
533                                    _log.error(
534                                            "Unable to send unicast message " + clusterRequest, ce);
535    
536                                    throw new SystemException("Unable to send unicast request", ce);
537                            }
538                    }
539            }
540    
541            private static final String _DEFAULT_CLUSTER_NAME =
542                    "LIFERAY-CONTROL-CHANNEL";
543    
544            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
545    
546            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
547                    new CopyOnWriteArrayList<ClusterEventListener>();
548            private Map<String, Address> _clusterNodeAddresses =
549                    new ConcurrentHashMap<String, Address>();
550            private JChannel _controlJChannel;
551            private ExecutorService _executorService;
552            private Map<String, FutureClusterResponses> _futureClusterResponses =
553                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
554            private Map<Address, ClusterNode> _liveInstances =
555                    new ConcurrentHashMap<Address, ClusterNode>();
556            private Address _localAddress;
557            private ClusterNode _localClusterNode;
558            private boolean _shortcutLocalMethod;
559    
560            private class ClusterResponseCallbackJob implements Runnable {
561    
562                    public ClusterResponseCallbackJob(
563                            ClusterResponseCallback clusterResponseCallback,
564                            FutureClusterResponses futureClusterResponses) {
565    
566                            _clusterResponseCallback = clusterResponseCallback;
567                            _futureClusterResponses = futureClusterResponses;
568                            _timeout = -1;
569                            _timeoutGet = false;
570                            _timeUnit = TimeUnit.SECONDS;
571                    }
572    
573                    public ClusterResponseCallbackJob(
574                            ClusterResponseCallback clusterResponseCallback,
575                            FutureClusterResponses futureClusterResponses, long timeout,
576                            TimeUnit timeUnit) {
577    
578                            _clusterResponseCallback = clusterResponseCallback;
579                            _futureClusterResponses = futureClusterResponses;
580                            _timeout = timeout;
581                            _timeoutGet = true;
582                            _timeUnit = timeUnit;
583                    }
584    
585                    public void run() {
586                            BlockingQueue<ClusterNodeResponse> blockingQueue =
587                                    _futureClusterResponses.getPartialResults();
588    
589                            _clusterResponseCallback.callback(blockingQueue);
590    
591                            ClusterNodeResponses clusterNodeResponses = null;
592    
593                            try {
594                                    if (_timeoutGet) {
595                                            clusterNodeResponses = _futureClusterResponses.get(
596                                                    _timeout, _timeUnit);
597                                    }
598                                    else {
599                                            clusterNodeResponses = _futureClusterResponses.get();
600                                    }
601    
602                                    _clusterResponseCallback.callback(clusterNodeResponses);
603                            }
604                            catch (InterruptedException ie) {
605                                    _clusterResponseCallback.processInterruptedException(ie);
606                            }
607                            catch (TimeoutException te) {
608                                    _clusterResponseCallback.processTimeoutException(te);
609                            }
610                    }
611    
612                    private final ClusterResponseCallback _clusterResponseCallback;
613                    private final FutureClusterResponses _futureClusterResponses;
614                    private final long _timeout;
615                    private final boolean _timeoutGet;
616                    private final TimeUnit _timeUnit;
617    
618            }
619    
620    }