001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.Http;
035    import com.liferay.portal.kernel.util.InetAddressUtil;
036    import com.liferay.portal.kernel.util.MethodHandler;
037    import com.liferay.portal.kernel.util.PropsKeys;
038    import com.liferay.portal.kernel.util.StringUtil;
039    import com.liferay.portal.kernel.util.Validator;
040    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
041    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
042    import com.liferay.portal.util.PortalPortEventListener;
043    import com.liferay.portal.util.PortalPortProtocolEventListener;
044    import com.liferay.portal.util.PortalUtil;
045    import com.liferay.portal.util.PropsUtil;
046    import com.liferay.portal.util.PropsValues;
047    
048    import java.io.Serializable;
049    
050    import java.net.InetAddress;
051    
052    import java.util.ArrayList;
053    import java.util.Collection;
054    import java.util.Collections;
055    import java.util.List;
056    import java.util.Map;
057    import java.util.Properties;
058    import java.util.concurrent.BlockingQueue;
059    import java.util.concurrent.ConcurrentHashMap;
060    import java.util.concurrent.CopyOnWriteArrayList;
061    import java.util.concurrent.ExecutorService;
062    import java.util.concurrent.TimeUnit;
063    import java.util.concurrent.TimeoutException;
064    
065    import org.jgroups.JChannel;
066    
067    /**
068     * @author Tina Tian
069     * @author Shuyang Zhou
070     */
071    @DoPrivileged
072    public class ClusterExecutorImpl
073            extends ClusterBase
074            implements ClusterExecutor, PortalPortEventListener,
075                               PortalPortProtocolEventListener {
076    
077            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
078                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
079    
080            @Override
081            public void addClusterEventListener(
082                    ClusterEventListener clusterEventListener) {
083    
084                    if (!isEnabled()) {
085                            return;
086                    }
087    
088                    _clusterEventListeners.addIfAbsent(clusterEventListener);
089            }
090    
091            @Override
092            public void afterPropertiesSet() {
093                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
094                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
095                    }
096    
097                    if (PropsValues.LIVE_USERS_ENABLED) {
098                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
099                    }
100    
101                    _secure = StringUtil.equalsIgnoreCase(
102                            Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
103    
104                    if (Validator.isNotNull(PropsValues.PORTAL_INSTANCE_HTTPS_PORT)) {
105                            _port = PropsValues.PORTAL_INSTANCE_HTTPS_PORT;
106                    }
107                    else {
108                            _port = PropsValues.PORTAL_INSTANCE_HTTP_PORT;
109                    }
110    
111                    super.afterPropertiesSet();
112            }
113    
114            @Override
115            public void destroy() {
116                    if (!isEnabled()) {
117                            return;
118                    }
119    
120                    PortalExecutorManagerUtil.shutdown(
121                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
122    
123                    _controlJChannel.setReceiver(null);
124    
125                    _controlJChannel.close();
126    
127                    _clusterEventListeners.clear();
128                    _clusterNodeAddresses.clear();
129                    _futureClusterResponses.clear();
130                    _liveInstances.clear();
131                    _localAddress = null;
132                    _localClusterNode = null;
133            }
134    
135            @Override
136            public FutureClusterResponses execute(ClusterRequest clusterRequest)
137                    throws SystemException {
138    
139                    if (!isEnabled()) {
140                            return null;
141                    }
142    
143                    List<Address> addresses = prepareAddresses(clusterRequest);
144    
145                    FutureClusterResponses futureClusterResponses =
146                            new FutureClusterResponses(addresses);
147    
148                    if (!clusterRequest.isFireAndForget()) {
149                            String uuid = clusterRequest.getUuid();
150    
151                            _futureClusterResponses.put(uuid, futureClusterResponses);
152                    }
153    
154                    if (_shortcutLocalMethod &&
155                            addresses.remove(getLocalClusterNodeAddress())) {
156    
157                            runLocalMethod(clusterRequest, futureClusterResponses);
158                    }
159    
160                    if (clusterRequest.isMulticast()) {
161                            try {
162                                    _controlJChannel.send(null, clusterRequest);
163                            }
164                            catch (Exception e) {
165                                    throw new SystemException(
166                                            "Unable to send multicast request", e);
167                            }
168                    }
169                    else {
170                            for (Address address : addresses) {
171                                    org.jgroups.Address jGroupsAddress =
172                                            (org.jgroups.Address)address.getRealAddress();
173    
174                                    try {
175                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
176                                    }
177                                    catch (Exception e) {
178                                            throw new SystemException(
179                                                    "Unable to send unicast request", e);
180                                    }
181                            }
182                    }
183    
184                    return futureClusterResponses;
185            }
186    
187            @Override
188            public void execute(
189                            ClusterRequest clusterRequest,
190                            ClusterResponseCallback clusterResponseCallback)
191                    throws SystemException {
192    
193                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
194    
195                    ClusterResponseCallbackJob clusterResponseCallbackJob =
196                            new ClusterResponseCallbackJob(
197                                    clusterResponseCallback, futureClusterResponses);
198    
199                    _executorService.execute(clusterResponseCallbackJob);
200            }
201    
202            @Override
203            public void execute(
204                            ClusterRequest clusterRequest,
205                            ClusterResponseCallback clusterResponseCallback, long timeout,
206                            TimeUnit timeUnit)
207                    throws SystemException {
208    
209                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
210    
211                    ClusterResponseCallbackJob clusterResponseCallbackJob =
212                            new ClusterResponseCallbackJob(
213                                    clusterResponseCallback, futureClusterResponses, timeout,
214                                    timeUnit);
215    
216                    _executorService.execute(clusterResponseCallbackJob);
217            }
218    
219            @Override
220            public List<ClusterEventListener> getClusterEventListeners() {
221                    if (!isEnabled()) {
222                            return Collections.emptyList();
223                    }
224    
225                    return Collections.unmodifiableList(_clusterEventListeners);
226            }
227    
228            @Override
229            public List<Address> getClusterNodeAddresses() {
230                    if (!isEnabled()) {
231                            return Collections.emptyList();
232                    }
233    
234                    return getAddresses(_controlJChannel);
235            }
236    
237            @Override
238            public List<ClusterNode> getClusterNodes() {
239                    if (!isEnabled()) {
240                            return Collections.emptyList();
241                    }
242    
243                    return new ArrayList<ClusterNode>(_liveInstances.values());
244            }
245    
246            @Override
247            public ClusterNode getLocalClusterNode() {
248                    if (!isEnabled()) {
249                            return null;
250                    }
251    
252                    return _localClusterNode;
253            }
254    
255            @Override
256            public Address getLocalClusterNodeAddress() {
257                    if (!isEnabled()) {
258                            return null;
259                    }
260    
261                    return _localAddress;
262            }
263    
264            @Override
265            public void initialize() {
266                    if (!isEnabled()) {
267                            return;
268                    }
269    
270                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
271                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
272    
273                    PortalUtil.addPortalPortProtocolEventListener(this);
274    
275                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
276    
277                    try {
278                            initLocalClusterNode();
279    
280                            memberJoined(_localAddress, _localClusterNode);
281    
282                            sendNotifyRequest();
283                    }
284                    catch (Exception e) {
285                            _log.error("Unable to determine local network address", e);
286                    }
287    
288                    ClusterRequestReceiver clusterRequestReceiver =
289                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
290    
291                    clusterRequestReceiver.openLatch();
292            }
293    
294            @Override
295            public boolean isClusterNodeAlive(Address address) {
296                    if (!isEnabled()) {
297                            return false;
298                    }
299    
300                    List<Address> addresses = getAddresses(_controlJChannel);
301    
302                    return addresses.contains(address);
303            }
304    
305            @Override
306            public boolean isClusterNodeAlive(String clusterNodeId) {
307                    if (!isEnabled()) {
308                            return false;
309                    }
310    
311                    return _clusterNodeAddresses.containsKey(clusterNodeId);
312            }
313    
314            /**
315             * @deprecated As of 6.2.0, replaced by {@link
316             *             #portalPortProtocolConfigured(int, Boolean)}
317             */
318            @Override
319            public void portalPortConfigured(int port) {
320                    portalPortProtocolConfigured(port, null);
321            }
322    
323            @Override
324            public void portalPortProtocolConfigured(int port, Boolean secure) {
325                    if (!isEnabled()) {
326                            return;
327                    }
328    
329                    if (Validator.isNotNull(secure)) {
330                            String portalProtocol = _localClusterNode.getPortalProtocol();
331    
332                            if (((secure && portalProtocol.equals(Http.HTTPS)) ||
333                                     (!secure && portalProtocol.equals(Http.HTTP))) &&
334                                    (_localClusterNode.getPort() == _port)) {
335    
336                                    return;
337                            }
338    
339                            if (secure) {
340                                    _localClusterNode.setPortalProtocol(Http.HTTPS);
341                            }
342                            else {
343                                    _localClusterNode.setPortalProtocol(Http.HTTP);
344                            }
345                    }
346                    else if (_localClusterNode.getPort() == _port) {
347                            return;
348                    }
349    
350                    try {
351                            _localClusterNode.setPort(port);
352    
353                            memberJoined(_localAddress, _localClusterNode);
354    
355                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
356                                    ClusterMessageType.UPDATE, _localClusterNode);
357    
358                            _controlJChannel.send(null, clusterRequest);
359                    }
360                    catch (Exception e) {
361                            _log.error("Unable to determine configure node port", e);
362                    }
363            }
364    
365            @Override
366            public void removeClusterEventListener(
367                    ClusterEventListener clusterEventListener) {
368    
369                    if (!isEnabled()) {
370                            return;
371                    }
372    
373                    _clusterEventListeners.remove(clusterEventListener);
374            }
375    
376            public void setClusterEventListeners(
377                    List<ClusterEventListener> clusterEventListeners) {
378    
379                    if (!isEnabled()) {
380                            return;
381                    }
382    
383                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
384            }
385    
386            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
387                    if (!isEnabled()) {
388                            return;
389                    }
390    
391                    _shortcutLocalMethod = shortcutLocalMethod;
392            }
393    
394            protected void fireClusterEvent(ClusterEvent clusterEvent) {
395                    for (ClusterEventListener listener : _clusterEventListeners) {
396                            listener.processClusterEvent(clusterEvent);
397                    }
398            }
399    
400            protected ClusterNodeResponse generateClusterNodeResponse(
401                    ClusterRequest clusterRequest, Object returnValue,
402                    Exception exception) {
403    
404                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
405    
406                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
407                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
408                    clusterNodeResponse.setClusterMessageType(
409                            clusterRequest.getClusterMessageType());
410                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
411                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
412    
413                    if (exception != null) {
414                            clusterNodeResponse.setException(exception);
415                    }
416                    else {
417                            if (returnValue instanceof Serializable) {
418                                    clusterNodeResponse.setResult(returnValue);
419                            }
420                            else if (returnValue != null) {
421                                    clusterNodeResponse.setException(
422                                            new ClusterException("Return value is not serializable"));
423                            }
424                    }
425    
426                    return clusterNodeResponse;
427            }
428    
429            protected JChannel getControlChannel() {
430                    return _controlJChannel;
431            }
432    
433            protected FutureClusterResponses getExecutionResults(String uuid) {
434                    return _futureClusterResponses.get(uuid);
435            }
436    
437            @Override
438            protected void initChannels() throws Exception {
439                    Properties controlProperties = PropsUtil.getProperties(
440                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
441    
442                    String controlProperty = controlProperties.getProperty(
443                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
444    
445                    ClusterRequestReceiver clusterRequestReceiver =
446                            new ClusterRequestReceiver(this);
447    
448                    _controlJChannel = createJChannel(
449                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
450            }
451    
452            protected void initLocalClusterNode() throws Exception {
453                    InetAddress inetAddress = bindInetAddress;
454    
455                    if (inetAddress == null) {
456                            inetAddress = InetAddressUtil.getLocalInetAddress();
457                    }
458    
459                    ClusterNode localClusterNode = new ClusterNode(
460                            PortalUUIDUtil.generate(), inetAddress);
461    
462                    int port = _port;
463    
464                    if (port <= 0) {
465                            port = PortalUtil.getPortalPort(_secure);
466                    }
467    
468                    localClusterNode.setPort(port);
469    
470                    localClusterNode.setPortalProtocol(
471                            PropsValues.PORTAL_INSTANCE_PROTOCOL);
472    
473                    _localClusterNode = localClusterNode;
474            }
475    
476            protected boolean isShortcutLocalMethod() {
477                    return _shortcutLocalMethod;
478            }
479    
480            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
481                    _liveInstances.put(joinAddress, clusterNode);
482    
483                    Address previousAddress = _clusterNodeAddresses.put(
484                            clusterNode.getClusterNodeId(), joinAddress);
485    
486                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
487                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
488    
489                            // PLACEHOLDER
490    
491                            fireClusterEvent(clusterEvent);
492                    }
493            }
494    
495            protected void memberRemoved(List<Address> departAddresses) {
496                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
497    
498                    for (Address departAddress : departAddresses) {
499                            ClusterNode departClusterNode = _liveInstances.remove(
500                                    departAddress);
501    
502                            if (departClusterNode == null) {
503                                    continue;
504                            }
505    
506                            departClusterNodes.add(departClusterNode);
507    
508                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
509                    }
510    
511                    if (departClusterNodes.isEmpty()) {
512                            return;
513                    }
514    
515                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
516    
517                    fireClusterEvent(clusterEvent);
518            }
519    
520            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
521                    boolean isMulticast = clusterRequest.isMulticast();
522    
523                    List<Address> addresses = null;
524    
525                    if (isMulticast) {
526                            addresses = getAddresses(_controlJChannel);
527                    }
528                    else {
529                            addresses = new ArrayList<Address>();
530    
531                            Collection<Address> clusterNodeAddresses =
532                                    clusterRequest.getTargetClusterNodeAddresses();
533    
534                            if (clusterNodeAddresses != null) {
535                                    addresses.addAll(clusterNodeAddresses);
536                            }
537    
538                            Collection<String> clusterNodeIds =
539                                    clusterRequest.getTargetClusterNodeIds();
540    
541                            if (clusterNodeIds != null) {
542                                    for (String clusterNodeId : clusterNodeIds) {
543                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
544    
545                                            addresses.add(address);
546                                    }
547                            }
548                    }
549    
550                    if (clusterRequest.isSkipLocal()) {
551                            addresses.remove(getLocalClusterNodeAddress());
552                    }
553    
554                    return addresses;
555            }
556    
557            protected void runLocalMethod(
558                    ClusterRequest clusterRequest,
559                    FutureClusterResponses futureClusterResponses) {
560    
561                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
562    
563                    Object returnValue = null;
564                    Exception exception = null;
565    
566                    if (methodHandler == null) {
567                            exception = new ClusterException(
568                                    "Payload is not of type " + MethodHandler.class.getName());
569                    }
570                    else {
571                            try {
572                                    returnValue = methodHandler.invoke(true);
573                            }
574                            catch (Exception e) {
575                                    exception = e;
576                            }
577                    }
578    
579                    if (!clusterRequest.isFireAndForget()) {
580                            ClusterNodeResponse clusterNodeResponse =
581                                    generateClusterNodeResponse(
582                                            clusterRequest, returnValue, exception);
583    
584                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
585                    }
586            }
587    
588            protected void sendNotifyRequest() {
589                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
590                            ClusterMessageType.NOTIFY, _localClusterNode);
591    
592                    try {
593                            _controlJChannel.send(null, clusterRequest);
594                    }
595                    catch (Exception e) {
596                            _log.error("Unable to send notify message", e);
597                    }
598            }
599    
600            private static final String _DEFAULT_CLUSTER_NAME =
601                    "LIFERAY-CONTROL-CHANNEL";
602    
603            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
604    
605            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
606                    new CopyOnWriteArrayList<ClusterEventListener>();
607            private Map<String, Address> _clusterNodeAddresses =
608                    new ConcurrentHashMap<String, Address>();
609            private JChannel _controlJChannel;
610            private ExecutorService _executorService;
611            private Map<String, FutureClusterResponses> _futureClusterResponses =
612                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
613            private Map<Address, ClusterNode> _liveInstances =
614                    new ConcurrentHashMap<Address, ClusterNode>();
615            private Address _localAddress;
616            private ClusterNode _localClusterNode;
617            private int _port;
618            private boolean _secure;
619            private boolean _shortcutLocalMethod;
620    
621            private class ClusterResponseCallbackJob implements Runnable {
622    
623                    public ClusterResponseCallbackJob(
624                            ClusterResponseCallback clusterResponseCallback,
625                            FutureClusterResponses futureClusterResponses) {
626    
627                            _clusterResponseCallback = clusterResponseCallback;
628                            _futureClusterResponses = futureClusterResponses;
629                            _timeout = -1;
630                            _timeoutGet = false;
631                            _timeUnit = TimeUnit.SECONDS;
632                    }
633    
634                    public ClusterResponseCallbackJob(
635                            ClusterResponseCallback clusterResponseCallback,
636                            FutureClusterResponses futureClusterResponses, long timeout,
637                            TimeUnit timeUnit) {
638    
639                            _clusterResponseCallback = clusterResponseCallback;
640                            _futureClusterResponses = futureClusterResponses;
641                            _timeout = timeout;
642                            _timeoutGet = true;
643                            _timeUnit = timeUnit;
644                    }
645    
646                    @Override
647                    public void run() {
648                            BlockingQueue<ClusterNodeResponse> blockingQueue =
649                                    _futureClusterResponses.getPartialResults();
650    
651                            _clusterResponseCallback.callback(blockingQueue);
652    
653                            ClusterNodeResponses clusterNodeResponses = null;
654    
655                            try {
656                                    if (_timeoutGet) {
657                                            clusterNodeResponses = _futureClusterResponses.get(
658                                                    _timeout, _timeUnit);
659                                    }
660                                    else {
661                                            clusterNodeResponses = _futureClusterResponses.get();
662                                    }
663    
664                                    _clusterResponseCallback.callback(clusterNodeResponses);
665                            }
666                            catch (InterruptedException ie) {
667                                    _clusterResponseCallback.processInterruptedException(ie);
668                            }
669                            catch (TimeoutException te) {
670                                    _clusterResponseCallback.processTimeoutException(te);
671                            }
672                    }
673    
674                    private final ClusterResponseCallback _clusterResponseCallback;
675                    private final FutureClusterResponses _futureClusterResponses;
676                    private final long _timeout;
677                    private final boolean _timeoutGet;
678                    private final TimeUnit _timeUnit;
679    
680            }
681    
682    }