001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.Http;
035    import com.liferay.portal.kernel.util.InetAddressUtil;
036    import com.liferay.portal.kernel.util.MethodHandler;
037    import com.liferay.portal.kernel.util.PropsKeys;
038    import com.liferay.portal.kernel.util.StringBundler;
039    import com.liferay.portal.kernel.util.StringUtil;
040    import com.liferay.portal.kernel.util.Validator;
041    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
042    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043    import com.liferay.portal.util.PortalPortEventListener;
044    import com.liferay.portal.util.PortalPortProtocolEventListener;
045    import com.liferay.portal.util.PortalUtil;
046    import com.liferay.portal.util.PropsUtil;
047    import com.liferay.portal.util.PropsValues;
048    
049    import java.io.Serializable;
050    
051    import java.net.InetAddress;
052    
053    import java.util.ArrayList;
054    import java.util.Collection;
055    import java.util.Collections;
056    import java.util.List;
057    import java.util.Map;
058    import java.util.Properties;
059    import java.util.concurrent.BlockingQueue;
060    import java.util.concurrent.ConcurrentHashMap;
061    import java.util.concurrent.CopyOnWriteArrayList;
062    import java.util.concurrent.ExecutorService;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.TimeoutException;
065    
066    import org.jgroups.JChannel;
067    
068    /**
069     * @author Tina Tian
070     * @author Shuyang Zhou
071     */
072    @DoPrivileged
073    public class ClusterExecutorImpl
074            extends ClusterBase
075            implements ClusterExecutor, PortalPortEventListener,
076                               PortalPortProtocolEventListener {
077    
078            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
079                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
080    
081            @Override
082            public void addClusterEventListener(
083                    ClusterEventListener clusterEventListener) {
084    
085                    if (!isEnabled()) {
086                            return;
087                    }
088    
089                    _clusterEventListeners.addIfAbsent(clusterEventListener);
090            }
091    
092            @Override
093            public void afterPropertiesSet() {
094                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
095                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
096                    }
097    
098                    if (PropsValues.LIVE_USERS_ENABLED) {
099                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
100                    }
101    
102                    super.afterPropertiesSet();
103            }
104    
105            @Override
106            public void destroy() {
107                    if (!isEnabled()) {
108                            return;
109                    }
110    
111                    PortalExecutorManagerUtil.shutdown(
112                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
113    
114                    _controlJChannel.setReceiver(null);
115    
116                    _controlJChannel.close();
117    
118                    _clusterEventListeners.clear();
119                    _clusterNodeAddresses.clear();
120                    _futureClusterResponses.clear();
121                    _liveInstances.clear();
122                    _localAddress = null;
123                    _localClusterNode = null;
124            }
125    
126            @Override
127            public FutureClusterResponses execute(ClusterRequest clusterRequest)
128                    throws SystemException {
129    
130                    if (!isEnabled()) {
131                            return null;
132                    }
133    
134                    List<Address> addresses = prepareAddresses(clusterRequest);
135    
136                    FutureClusterResponses futureClusterResponses =
137                            new FutureClusterResponses(addresses);
138    
139                    if (!clusterRequest.isFireAndForget()) {
140                            String uuid = clusterRequest.getUuid();
141    
142                            _futureClusterResponses.put(uuid, futureClusterResponses);
143                    }
144    
145                    if (_shortcutLocalMethod &&
146                            addresses.remove(getLocalClusterNodeAddress())) {
147    
148                            runLocalMethod(clusterRequest, futureClusterResponses);
149                    }
150    
151                    if (clusterRequest.isMulticast()) {
152                            try {
153                                    _controlJChannel.send(null, clusterRequest);
154                            }
155                            catch (Exception e) {
156                                    throw new SystemException(
157                                            "Unable to send multicast request", e);
158                            }
159                    }
160                    else {
161                            for (Address address : addresses) {
162                                    org.jgroups.Address jGroupsAddress =
163                                            (org.jgroups.Address)address.getRealAddress();
164    
165                                    try {
166                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
167                                    }
168                                    catch (Exception e) {
169                                            throw new SystemException(
170                                                    "Unable to send unicast request", e);
171                                    }
172                            }
173                    }
174    
175                    return futureClusterResponses;
176            }
177    
178            @Override
179            public void execute(
180                            ClusterRequest clusterRequest,
181                            ClusterResponseCallback clusterResponseCallback)
182                    throws SystemException {
183    
184                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
185    
186                    ClusterResponseCallbackJob clusterResponseCallbackJob =
187                            new ClusterResponseCallbackJob(
188                                    clusterResponseCallback, futureClusterResponses);
189    
190                    _executorService.execute(clusterResponseCallbackJob);
191            }
192    
193            @Override
194            public void execute(
195                            ClusterRequest clusterRequest,
196                            ClusterResponseCallback clusterResponseCallback, long timeout,
197                            TimeUnit timeUnit)
198                    throws SystemException {
199    
200                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
201    
202                    ClusterResponseCallbackJob clusterResponseCallbackJob =
203                            new ClusterResponseCallbackJob(
204                                    clusterResponseCallback, futureClusterResponses, timeout,
205                                    timeUnit);
206    
207                    _executorService.execute(clusterResponseCallbackJob);
208            }
209    
210            @Override
211            public List<ClusterEventListener> getClusterEventListeners() {
212                    if (!isEnabled()) {
213                            return Collections.emptyList();
214                    }
215    
216                    return Collections.unmodifiableList(_clusterEventListeners);
217            }
218    
219            @Override
220            public List<Address> getClusterNodeAddresses() {
221                    if (!isEnabled()) {
222                            return Collections.emptyList();
223                    }
224    
225                    return getAddresses(_controlJChannel);
226            }
227    
228            @Override
229            public List<ClusterNode> getClusterNodes() {
230                    if (!isEnabled()) {
231                            return Collections.emptyList();
232                    }
233    
234                    return new ArrayList<ClusterNode>(_liveInstances.values());
235            }
236    
237            @Override
238            public ClusterNode getLocalClusterNode() {
239                    if (!isEnabled()) {
240                            return null;
241                    }
242    
243                    return _localClusterNode;
244            }
245    
246            @Override
247            public Address getLocalClusterNodeAddress() {
248                    if (!isEnabled()) {
249                            return null;
250                    }
251    
252                    return _localAddress;
253            }
254    
255            @Override
256            public void initialize() {
257                    if (!isEnabled()) {
258                            return;
259                    }
260    
261                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
262                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
263    
264                    PortalUtil.addPortalPortProtocolEventListener(this);
265    
266                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
267    
268                    try {
269                            initLocalClusterNode();
270    
271                            memberJoined(_localAddress, _localClusterNode);
272    
273                            sendNotifyRequest();
274                    }
275                    catch (Exception e) {
276                            _log.error("Unable to determine local network address", e);
277                    }
278    
279                    ClusterRequestReceiver clusterRequestReceiver =
280                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
281    
282                    clusterRequestReceiver.openLatch();
283            }
284    
285            @Override
286            public boolean isClusterNodeAlive(Address address) {
287                    if (!isEnabled()) {
288                            return false;
289                    }
290    
291                    List<Address> addresses = getAddresses(_controlJChannel);
292    
293                    return addresses.contains(address);
294            }
295    
296            @Override
297            public boolean isClusterNodeAlive(String clusterNodeId) {
298                    if (!isEnabled()) {
299                            return false;
300                    }
301    
302                    return _clusterNodeAddresses.containsKey(clusterNodeId);
303            }
304    
305            /**
306             * @deprecated As of 6.2.0, replaced by {@link
307             *             #portalPortProtocolConfigured(int, Boolean)}
308             */
309            @Override
310            public void portalPortConfigured(int port) {
311                    portalPortProtocolConfigured(port, null);
312            }
313    
314            @Override
315            public void portalPortProtocolConfigured(int port, Boolean secure) {
316                    if (!isEnabled() || (port <= 0) || (secure == null)) {
317                            return;
318                    }
319    
320                    if (Validator.isNotNull(_localClusterNode.getPortalProtocol())) {
321                            return;
322                    }
323    
324                    if (secure) {
325                            _localClusterNode.setPortalProtocol(Http.HTTPS);
326                    }
327                    else {
328                            _localClusterNode.setPortalProtocol(Http.HTTP);
329                    }
330    
331                    try {
332                            _localClusterNode.setPort(port);
333    
334                            memberJoined(_localAddress, _localClusterNode);
335    
336                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
337                                    ClusterMessageType.UPDATE, _localClusterNode);
338    
339                            _controlJChannel.send(null, clusterRequest);
340                    }
341                    catch (Exception e) {
342                            _log.error("Unable to determine configure node port", e);
343                    }
344            }
345    
346            @Override
347            public void removeClusterEventListener(
348                    ClusterEventListener clusterEventListener) {
349    
350                    if (!isEnabled()) {
351                            return;
352                    }
353    
354                    _clusterEventListeners.remove(clusterEventListener);
355            }
356    
357            public void setClusterEventListeners(
358                    List<ClusterEventListener> clusterEventListeners) {
359    
360                    if (!isEnabled()) {
361                            return;
362                    }
363    
364                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
365            }
366    
367            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
368                    if (!isEnabled()) {
369                            return;
370                    }
371    
372                    _shortcutLocalMethod = shortcutLocalMethod;
373            }
374    
375            protected void fireClusterEvent(ClusterEvent clusterEvent) {
376                    for (ClusterEventListener listener : _clusterEventListeners) {
377                            listener.processClusterEvent(clusterEvent);
378                    }
379            }
380    
381            protected ClusterNodeResponse generateClusterNodeResponse(
382                    ClusterRequest clusterRequest, Object returnValue,
383                    Exception exception) {
384    
385                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
386    
387                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
388                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
389                    clusterNodeResponse.setClusterMessageType(
390                            clusterRequest.getClusterMessageType());
391                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
392                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
393    
394                    if (exception != null) {
395                            clusterNodeResponse.setException(exception);
396                    }
397                    else {
398                            if (returnValue instanceof Serializable) {
399                                    clusterNodeResponse.setResult(returnValue);
400                            }
401                            else if (returnValue != null) {
402                                    clusterNodeResponse.setException(
403                                            new ClusterException("Return value is not serializable"));
404                            }
405                    }
406    
407                    return clusterNodeResponse;
408            }
409    
410            protected JChannel getControlChannel() {
411                    return _controlJChannel;
412            }
413    
414            protected FutureClusterResponses getExecutionResults(String uuid) {
415                    return _futureClusterResponses.get(uuid);
416            }
417    
418            @Override
419            protected void initChannels() throws Exception {
420                    Properties controlProperties = PropsUtil.getProperties(
421                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
422    
423                    String controlProperty = controlProperties.getProperty(
424                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
425    
426                    ClusterRequestReceiver clusterRequestReceiver =
427                            new ClusterRequestReceiver(this);
428    
429                    _controlJChannel = createJChannel(
430                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
431            }
432    
433            protected void initLocalClusterNode() throws Exception {
434                    InetAddress inetAddress = bindInetAddress;
435    
436                    if (inetAddress == null) {
437                            inetAddress = InetAddressUtil.getLocalInetAddress();
438                    }
439    
440                    ClusterNode localClusterNode = new ClusterNode(
441                            PortalUUIDUtil.generate(), inetAddress);
442    
443                    if (StringUtil.equalsIgnoreCase(
444                                    Http.HTTPS, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
445                            (PropsValues.PORTAL_INSTANCE_HTTPS_PORT > 0)) {
446    
447                            localClusterNode.setPortalProtocol(Http.HTTPS);
448                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTPS_PORT);
449                    }
450                    else if (StringUtil.equalsIgnoreCase(
451                                            Http.HTTP, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
452                                     (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0)) {
453    
454                            localClusterNode.setPortalProtocol(Http.HTTP);
455                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
456                    }
457                    else {
458                            if (_log.isWarnEnabled()) {
459                                    StringBundler sb = new StringBundler(8);
460    
461                                    sb.append("Unable to configure node protocol and port. ");
462                                    sb.append("This configuration will be inferred dynamically ");
463                                    sb.append("from the first request but static configuration ");
464                                    sb.append("is recomended to avoid comunications problems ");
465                                    sb.append("between nodes. Please set the right values for ");
466                                    sb.append("\"portal.instance.protocol\" and ");
467                                    sb.append("\"portal.instance.http.port\" or ");
468                                    sb.append("\"portal.instance.https.port\".");
469    
470                                    _log.warn(sb.toString());
471                            }
472                    }
473    
474                    _localClusterNode = localClusterNode;
475    
476                    if (_log.isDebugEnabled()) {
477                            _log.debug(
478                                    "Initialized cluster node: " + localClusterNode.toString());
479                    }
480            }
481    
482            protected boolean isShortcutLocalMethod() {
483                    return _shortcutLocalMethod;
484            }
485    
486            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
487                    _liveInstances.put(joinAddress, clusterNode);
488    
489                    Address previousAddress = _clusterNodeAddresses.put(
490                            clusterNode.getClusterNodeId(), joinAddress);
491    
492                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
493                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
494    
495                            // PLACEHOLDER
496    
497                            fireClusterEvent(clusterEvent);
498                    }
499            }
500    
501            protected void memberRemoved(List<Address> departAddresses) {
502                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
503    
504                    for (Address departAddress : departAddresses) {
505                            ClusterNode departClusterNode = _liveInstances.remove(
506                                    departAddress);
507    
508                            if (departClusterNode == null) {
509                                    continue;
510                            }
511    
512                            departClusterNodes.add(departClusterNode);
513    
514                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
515                    }
516    
517                    if (departClusterNodes.isEmpty()) {
518                            return;
519                    }
520    
521                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
522    
523                    fireClusterEvent(clusterEvent);
524            }
525    
526            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
527                    boolean isMulticast = clusterRequest.isMulticast();
528    
529                    List<Address> addresses = null;
530    
531                    if (isMulticast) {
532                            addresses = getAddresses(_controlJChannel);
533                    }
534                    else {
535                            addresses = new ArrayList<Address>();
536    
537                            Collection<Address> clusterNodeAddresses =
538                                    clusterRequest.getTargetClusterNodeAddresses();
539    
540                            if (clusterNodeAddresses != null) {
541                                    addresses.addAll(clusterNodeAddresses);
542                            }
543    
544                            Collection<String> clusterNodeIds =
545                                    clusterRequest.getTargetClusterNodeIds();
546    
547                            if (clusterNodeIds != null) {
548                                    for (String clusterNodeId : clusterNodeIds) {
549                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
550    
551                                            addresses.add(address);
552                                    }
553                            }
554                    }
555    
556                    if (clusterRequest.isSkipLocal()) {
557                            addresses.remove(getLocalClusterNodeAddress());
558                    }
559    
560                    return addresses;
561            }
562    
563            protected void runLocalMethod(
564                    ClusterRequest clusterRequest,
565                    FutureClusterResponses futureClusterResponses) {
566    
567                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
568    
569                    Object returnValue = null;
570                    Exception exception = null;
571    
572                    if (methodHandler == null) {
573                            exception = new ClusterException(
574                                    "Payload is not of type " + MethodHandler.class.getName());
575                    }
576                    else {
577                            try {
578                                    returnValue = methodHandler.invoke(true);
579                            }
580                            catch (Exception e) {
581                                    exception = e;
582                            }
583                    }
584    
585                    if (!clusterRequest.isFireAndForget()) {
586                            ClusterNodeResponse clusterNodeResponse =
587                                    generateClusterNodeResponse(
588                                            clusterRequest, returnValue, exception);
589    
590                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
591                    }
592            }
593    
594            protected void sendNotifyRequest() {
595                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
596                            ClusterMessageType.NOTIFY, _localClusterNode);
597    
598                    try {
599                            _controlJChannel.send(null, clusterRequest);
600                    }
601                    catch (Exception e) {
602                            _log.error("Unable to send notify message", e);
603                    }
604            }
605    
606            private static final String _DEFAULT_CLUSTER_NAME =
607                    "LIFERAY-CONTROL-CHANNEL";
608    
609            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
610    
611            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
612                    new CopyOnWriteArrayList<ClusterEventListener>();
613            private Map<String, Address> _clusterNodeAddresses =
614                    new ConcurrentHashMap<String, Address>();
615            private JChannel _controlJChannel;
616            private ExecutorService _executorService;
617            private Map<String, FutureClusterResponses> _futureClusterResponses =
618                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
619            private Map<Address, ClusterNode> _liveInstances =
620                    new ConcurrentHashMap<Address, ClusterNode>();
621            private Address _localAddress;
622            private ClusterNode _localClusterNode;
623            private boolean _shortcutLocalMethod;
624    
625            private class ClusterResponseCallbackJob implements Runnable {
626    
627                    public ClusterResponseCallbackJob(
628                            ClusterResponseCallback clusterResponseCallback,
629                            FutureClusterResponses futureClusterResponses) {
630    
631                            _clusterResponseCallback = clusterResponseCallback;
632                            _futureClusterResponses = futureClusterResponses;
633                            _timeout = -1;
634                            _timeoutGet = false;
635                            _timeUnit = TimeUnit.SECONDS;
636                    }
637    
638                    public ClusterResponseCallbackJob(
639                            ClusterResponseCallback clusterResponseCallback,
640                            FutureClusterResponses futureClusterResponses, long timeout,
641                            TimeUnit timeUnit) {
642    
643                            _clusterResponseCallback = clusterResponseCallback;
644                            _futureClusterResponses = futureClusterResponses;
645                            _timeout = timeout;
646                            _timeoutGet = true;
647                            _timeUnit = timeUnit;
648                    }
649    
650                    @Override
651                    public void run() {
652                            BlockingQueue<ClusterNodeResponse> blockingQueue =
653                                    _futureClusterResponses.getPartialResults();
654    
655                            _clusterResponseCallback.callback(blockingQueue);
656    
657                            ClusterNodeResponses clusterNodeResponses = null;
658    
659                            try {
660                                    if (_timeoutGet) {
661                                            clusterNodeResponses = _futureClusterResponses.get(
662                                                    _timeout, _timeUnit);
663                                    }
664                                    else {
665                                            clusterNodeResponses = _futureClusterResponses.get();
666                                    }
667    
668                                    _clusterResponseCallback.callback(clusterNodeResponses);
669                            }
670                            catch (InterruptedException ie) {
671                                    _clusterResponseCallback.processInterruptedException(ie);
672                            }
673                            catch (TimeoutException te) {
674                                    _clusterResponseCallback.processTimeoutException(te);
675                            }
676                    }
677    
678                    private final ClusterResponseCallback _clusterResponseCallback;
679                    private final FutureClusterResponses _futureClusterResponses;
680                    private final long _timeout;
681                    private final boolean _timeoutGet;
682                    private final TimeUnit _timeUnit;
683    
684            }
685    
686    }