001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.concurrent.ConcurrentReferenceValueHashMap;
030    import com.liferay.portal.kernel.exception.SystemException;
031    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.memory.FinalizeManager;
035    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
036    import com.liferay.portal.kernel.util.CharPool;
037    import com.liferay.portal.kernel.util.GetterUtil;
038    import com.liferay.portal.kernel.util.Http;
039    import com.liferay.portal.kernel.util.MethodHandler;
040    import com.liferay.portal.kernel.util.PropsKeys;
041    import com.liferay.portal.kernel.util.StringUtil;
042    import com.liferay.portal.kernel.util.Validator;
043    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
044    import com.liferay.portal.util.PortalInetSocketAddressEventListener;
045    import com.liferay.portal.util.PortalUtil;
046    import com.liferay.portal.util.PropsUtil;
047    import com.liferay.portal.util.PropsValues;
048    
049    import java.io.Serializable;
050    
051    import java.net.InetAddress;
052    import java.net.InetSocketAddress;
053    
054    import java.util.ArrayList;
055    import java.util.Collection;
056    import java.util.Collections;
057    import java.util.List;
058    import java.util.Map;
059    import java.util.Properties;
060    import java.util.concurrent.BlockingQueue;
061    import java.util.concurrent.ConcurrentHashMap;
062    import java.util.concurrent.CopyOnWriteArrayList;
063    import java.util.concurrent.ExecutorService;
064    import java.util.concurrent.TimeUnit;
065    import java.util.concurrent.TimeoutException;
066    
067    import org.jgroups.JChannel;
068    
069    /**
070     * @author Tina Tian
071     * @author Shuyang Zhou
072     */
073    @DoPrivileged
074    public class ClusterExecutorImpl
075            extends ClusterBase
076            implements ClusterExecutor, PortalInetSocketAddressEventListener {
077    
078            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
079                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
080    
081            @Override
082            public void addClusterEventListener(
083                    ClusterEventListener clusterEventListener) {
084    
085                    if (!isEnabled()) {
086                            return;
087                    }
088    
089                    _clusterEventListeners.addIfAbsent(clusterEventListener);
090            }
091    
092            @Override
093            public void afterPropertiesSet() {
094                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
095                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
096                    }
097    
098                    if (PropsValues.LIVE_USERS_ENABLED) {
099                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
100                    }
101    
102                    _secure = StringUtil.equalsIgnoreCase(
103                            Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
104    
105                    super.afterPropertiesSet();
106            }
107    
108            @Override
109            public void destroy() {
110                    if (!isEnabled()) {
111                            return;
112                    }
113    
114                    PortalExecutorManagerUtil.shutdown(
115                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
116    
117                    _controlJChannel.setReceiver(null);
118    
119                    _controlJChannel.close();
120    
121                    _clusterEventListeners.clear();
122                    _clusterNodeAddresses.clear();
123                    _futureClusterResponses.clear();
124                    _liveInstances.clear();
125                    _localAddress = null;
126                    _localClusterNode = null;
127            }
128    
129            @Override
130            public FutureClusterResponses execute(ClusterRequest clusterRequest) {
131                    if (!isEnabled()) {
132                            return null;
133                    }
134    
135                    List<Address> addresses = prepareAddresses(clusterRequest);
136    
137                    FutureClusterResponses futureClusterResponses =
138                            new FutureClusterResponses(addresses);
139    
140                    if (!clusterRequest.isFireAndForget()) {
141                            String uuid = clusterRequest.getUuid();
142    
143                            _futureClusterResponses.put(uuid, futureClusterResponses);
144                    }
145    
146                    if (_shortcutLocalMethod &&
147                            addresses.remove(getLocalClusterNodeAddress())) {
148    
149                            runLocalMethod(clusterRequest, futureClusterResponses);
150                    }
151    
152                    if (clusterRequest.isMulticast()) {
153                            try {
154                                    _controlJChannel.send(null, clusterRequest);
155                            }
156                            catch (Exception e) {
157                                    throw new SystemException(
158                                            "Unable to send multicast request", e);
159                            }
160                    }
161                    else {
162                            for (Address address : addresses) {
163                                    org.jgroups.Address jGroupsAddress =
164                                            (org.jgroups.Address)address.getRealAddress();
165    
166                                    try {
167                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
168                                    }
169                                    catch (Exception e) {
170                                            throw new SystemException(
171                                                    "Unable to send unicast request", e);
172                                    }
173                            }
174                    }
175    
176                    return futureClusterResponses;
177            }
178    
179            @Override
180            public void execute(
181                    ClusterRequest clusterRequest,
182                    ClusterResponseCallback clusterResponseCallback) {
183    
184                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
185    
186                    ClusterResponseCallbackJob clusterResponseCallbackJob =
187                            new ClusterResponseCallbackJob(
188                                    clusterResponseCallback, futureClusterResponses);
189    
190                    _executorService.execute(clusterResponseCallbackJob);
191            }
192    
193            @Override
194            public void execute(
195                    ClusterRequest clusterRequest,
196                    ClusterResponseCallback clusterResponseCallback, long timeout,
197                    TimeUnit timeUnit) {
198    
199                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
200    
201                    ClusterResponseCallbackJob clusterResponseCallbackJob =
202                            new ClusterResponseCallbackJob(
203                                    clusterResponseCallback, futureClusterResponses, timeout,
204                                    timeUnit);
205    
206                    _executorService.execute(clusterResponseCallbackJob);
207            }
208    
209            @Override
210            public List<ClusterEventListener> getClusterEventListeners() {
211                    if (!isEnabled()) {
212                            return Collections.emptyList();
213                    }
214    
215                    return Collections.unmodifiableList(_clusterEventListeners);
216            }
217    
218            @Override
219            public List<Address> getClusterNodeAddresses() {
220                    if (!isEnabled()) {
221                            return Collections.emptyList();
222                    }
223    
224                    return getAddresses(_controlJChannel);
225            }
226    
227            @Override
228            public List<ClusterNode> getClusterNodes() {
229                    if (!isEnabled()) {
230                            return Collections.emptyList();
231                    }
232    
233                    return new ArrayList<ClusterNode>(_liveInstances.values());
234            }
235    
236            @Override
237            public ClusterNode getLocalClusterNode() {
238                    if (!isEnabled()) {
239                            return null;
240                    }
241    
242                    return _localClusterNode;
243            }
244    
245            @Override
246            public Address getLocalClusterNodeAddress() {
247                    if (!isEnabled()) {
248                            return null;
249                    }
250    
251                    return _localAddress;
252            }
253    
254            @Override
255            public void initialize() {
256                    if (!isEnabled()) {
257                            return;
258                    }
259    
260                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
261                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
262    
263                    PortalUtil.addPortalInetSocketAddressEventListener(this);
264    
265                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
266    
267                    initLocalClusterNode();
268    
269                    memberJoined(_localAddress, _localClusterNode);
270    
271                    sendNotifyRequest();
272    
273                    ClusterRequestReceiver clusterRequestReceiver =
274                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
275    
276                    clusterRequestReceiver.openLatch();
277            }
278    
279            @Override
280            public boolean isClusterNodeAlive(Address address) {
281                    if (!isEnabled()) {
282                            return false;
283                    }
284    
285                    List<Address> addresses = getAddresses(_controlJChannel);
286    
287                    return addresses.contains(address);
288            }
289    
290            @Override
291            public boolean isClusterNodeAlive(String clusterNodeId) {
292                    if (!isEnabled()) {
293                            return false;
294                    }
295    
296                    return _clusterNodeAddresses.containsKey(clusterNodeId);
297            }
298    
299            @Override
300            public void portalLocalInetSockAddressConfigured(
301                    InetSocketAddress inetSocketAddress) {
302    
303                    if (!isEnabled() ||
304                            (_localClusterNode.getPortalInetSocketAddress() != null)) {
305    
306                            return;
307                    }
308    
309                    try {
310                            _localClusterNode.setPortalInetSocketAddress(inetSocketAddress);
311    
312                            memberJoined(_localAddress, _localClusterNode);
313    
314                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
315                                    ClusterMessageType.UPDATE, _localClusterNode);
316    
317                            _controlJChannel.send(null, clusterRequest);
318                    }
319                    catch (Exception e) {
320                            _log.error("Unable to determine configure node port", e);
321                    }
322            }
323    
324            @Override
325            public void portalServerInetSocketAddressConfigured(
326                    InetSocketAddress inetSocketAddress) {
327            }
328    
329            @Override
330            public void removeClusterEventListener(
331                    ClusterEventListener clusterEventListener) {
332    
333                    if (!isEnabled()) {
334                            return;
335                    }
336    
337                    _clusterEventListeners.remove(clusterEventListener);
338            }
339    
340            public void setClusterEventListeners(
341                    List<ClusterEventListener> clusterEventListeners) {
342    
343                    if (!isEnabled()) {
344                            return;
345                    }
346    
347                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
348            }
349    
350            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
351                    if (!isEnabled()) {
352                            return;
353                    }
354    
355                    _shortcutLocalMethod = shortcutLocalMethod;
356            }
357    
358            protected void fireClusterEvent(ClusterEvent clusterEvent) {
359                    for (ClusterEventListener listener : _clusterEventListeners) {
360                            listener.processClusterEvent(clusterEvent);
361                    }
362            }
363    
364            protected ClusterNodeResponse generateClusterNodeResponse(
365                    ClusterRequest clusterRequest, Object returnValue,
366                    Exception exception) {
367    
368                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
369    
370                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
371                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
372                    clusterNodeResponse.setClusterMessageType(
373                            clusterRequest.getClusterMessageType());
374                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
375                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
376    
377                    if (exception != null) {
378                            clusterNodeResponse.setException(exception);
379                    }
380                    else {
381                            if (returnValue instanceof Serializable) {
382                                    clusterNodeResponse.setResult(returnValue);
383                            }
384                            else if (returnValue != null) {
385                                    clusterNodeResponse.setException(
386                                            new ClusterException("Return value is not serializable"));
387                            }
388                    }
389    
390                    return clusterNodeResponse;
391            }
392    
393            protected InetSocketAddress getConfiguredPortalInetSockAddress(
394                    boolean secure) {
395    
396                    InetSocketAddress inetSocketAddress = null;
397    
398                    String portalInetSocketAddressValue = null;
399    
400                    if (secure) {
401                            portalInetSocketAddressValue =
402                                    PropsValues.PORTAL_INSTANCE_HTTPS_INET_SOCKET_ADDRESS;
403                    }
404                    else {
405                            portalInetSocketAddressValue =
406                                    PropsValues.PORTAL_INSTANCE_HTTP_INET_SOCKET_ADDRESS;
407                    }
408    
409                    if (Validator.isNotNull(portalInetSocketAddressValue)) {
410                            String[] parts = StringUtil.split(
411                                    portalInetSocketAddressValue, CharPool.COLON);
412    
413                            if (parts.length == 2) {
414                                    try {
415                                            inetSocketAddress = new InetSocketAddress(
416                                                    InetAddress.getByName(parts[0]),
417                                                    GetterUtil.getIntegerStrict(parts[1]));
418                                    }
419                                    catch (Exception e) {
420                                            _log.error(
421                                                    "Unable to parse portal InetSocketAddress from " +
422                                                            portalInetSocketAddressValue,
423                                                    e);
424                                    }
425                            }
426                    }
427    
428                    return inetSocketAddress;
429            }
430    
431            protected JChannel getControlChannel() {
432                    return _controlJChannel;
433            }
434    
435            protected FutureClusterResponses getExecutionResults(String uuid) {
436                    return _futureClusterResponses.get(uuid);
437            }
438    
439            @Override
440            protected void initChannels() throws Exception {
441                    Properties controlProperties = PropsUtil.getProperties(
442                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
443    
444                    String controlProperty = controlProperties.getProperty(
445                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
446    
447                    ClusterRequestReceiver clusterRequestReceiver =
448                            new ClusterRequestReceiver(this);
449    
450                    _controlJChannel = createJChannel(
451                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
452            }
453    
454            protected void initLocalClusterNode() {
455                    InetAddress inetAddress = getBindInetAddress(_controlJChannel);
456    
457                    ClusterNode clusterNode = new ClusterNode(
458                            PortalUUIDUtil.generate(), inetAddress);
459    
460                    InetSocketAddress inetSocketAddress =
461                            getConfiguredPortalInetSockAddress(_secure);
462    
463                    if (inetSocketAddress != null) {
464                            clusterNode.setPortalInetSocketAddress(inetSocketAddress);
465                    }
466    
467                    _localClusterNode = clusterNode;
468            }
469    
470            protected boolean isShortcutLocalMethod() {
471                    return _shortcutLocalMethod;
472            }
473    
474            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
475                    _liveInstances.put(joinAddress, clusterNode);
476    
477                    Address previousAddress = _clusterNodeAddresses.put(
478                            clusterNode.getClusterNodeId(), joinAddress);
479    
480                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
481                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
482    
483                            // PLACEHOLDER
484    
485                            fireClusterEvent(clusterEvent);
486                    }
487            }
488    
489            protected void memberRemoved(List<Address> departAddresses) {
490                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
491    
492                    for (Address departAddress : departAddresses) {
493                            ClusterNode departClusterNode = _liveInstances.remove(
494                                    departAddress);
495    
496                            if (departClusterNode == null) {
497                                    continue;
498                            }
499    
500                            departClusterNodes.add(departClusterNode);
501    
502                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
503                    }
504    
505                    if (departClusterNodes.isEmpty()) {
506                            return;
507                    }
508    
509                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
510    
511                    fireClusterEvent(clusterEvent);
512            }
513    
514            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
515                    boolean isMulticast = clusterRequest.isMulticast();
516    
517                    List<Address> addresses = null;
518    
519                    if (isMulticast) {
520                            addresses = getAddresses(_controlJChannel);
521                    }
522                    else {
523                            addresses = new ArrayList<Address>();
524    
525                            Collection<Address> clusterNodeAddresses =
526                                    clusterRequest.getTargetClusterNodeAddresses();
527    
528                            if (clusterNodeAddresses != null) {
529                                    addresses.addAll(clusterNodeAddresses);
530                            }
531    
532                            Collection<String> clusterNodeIds =
533                                    clusterRequest.getTargetClusterNodeIds();
534    
535                            if (clusterNodeIds != null) {
536                                    for (String clusterNodeId : clusterNodeIds) {
537                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
538    
539                                            addresses.add(address);
540                                    }
541                            }
542                    }
543    
544                    if (clusterRequest.isSkipLocal()) {
545                            addresses.remove(getLocalClusterNodeAddress());
546                    }
547    
548                    return addresses;
549            }
550    
551            protected void runLocalMethod(
552                    ClusterRequest clusterRequest,
553                    FutureClusterResponses futureClusterResponses) {
554    
555                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
556    
557                    Object returnValue = null;
558                    Exception exception = null;
559    
560                    if (methodHandler == null) {
561                            exception = new ClusterException(
562                                    "Payload is not of type " + MethodHandler.class.getName());
563                    }
564                    else {
565                            try {
566                                    returnValue = methodHandler.invoke();
567                            }
568                            catch (Exception e) {
569                                    exception = e;
570                            }
571                    }
572    
573                    if (!clusterRequest.isFireAndForget()) {
574                            ClusterNodeResponse clusterNodeResponse =
575                                    generateClusterNodeResponse(
576                                            clusterRequest, returnValue, exception);
577    
578                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
579                    }
580            }
581    
582            protected void sendNotifyRequest() {
583                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
584                            ClusterMessageType.NOTIFY, _localClusterNode);
585    
586                    try {
587                            _controlJChannel.send(null, clusterRequest);
588                    }
589                    catch (Exception e) {
590                            _log.error("Unable to send notify message", e);
591                    }
592            }
593    
594            private static final String _DEFAULT_CLUSTER_NAME =
595                    "LIFERAY-CONTROL-CHANNEL";
596    
597            private static final Log _log = LogFactoryUtil.getLog(
598                    ClusterExecutorImpl.class);
599    
600            private final CopyOnWriteArrayList<ClusterEventListener>
601                    _clusterEventListeners =
602                            new CopyOnWriteArrayList<ClusterEventListener>();
603            private final Map<String, Address> _clusterNodeAddresses =
604                    new ConcurrentHashMap<String, Address>();
605            private JChannel _controlJChannel;
606            private ExecutorService _executorService;
607            private Map<String, FutureClusterResponses> _futureClusterResponses =
608                    new ConcurrentReferenceValueHashMap<String, FutureClusterResponses>(
609                            FinalizeManager.WEAK_REFERENCE_FACTORY);
610            private final Map<Address, ClusterNode> _liveInstances =
611                    new ConcurrentHashMap<Address, ClusterNode>();
612            private Address _localAddress;
613            private ClusterNode _localClusterNode;
614            private boolean _secure;
615            private boolean _shortcutLocalMethod;
616    
617            private class ClusterResponseCallbackJob implements Runnable {
618    
619                    public ClusterResponseCallbackJob(
620                            ClusterResponseCallback clusterResponseCallback,
621                            FutureClusterResponses futureClusterResponses) {
622    
623                            _clusterResponseCallback = clusterResponseCallback;
624                            _futureClusterResponses = futureClusterResponses;
625                            _timeout = -1;
626                            _timeoutGet = false;
627                            _timeUnit = TimeUnit.SECONDS;
628                    }
629    
630                    public ClusterResponseCallbackJob(
631                            ClusterResponseCallback clusterResponseCallback,
632                            FutureClusterResponses futureClusterResponses, long timeout,
633                            TimeUnit timeUnit) {
634    
635                            _clusterResponseCallback = clusterResponseCallback;
636                            _futureClusterResponses = futureClusterResponses;
637                            _timeout = timeout;
638                            _timeoutGet = true;
639                            _timeUnit = timeUnit;
640                    }
641    
642                    @Override
643                    public void run() {
644                            BlockingQueue<ClusterNodeResponse> blockingQueue =
645                                    _futureClusterResponses.getPartialResults();
646    
647                            _clusterResponseCallback.callback(blockingQueue);
648    
649                            ClusterNodeResponses clusterNodeResponses = null;
650    
651                            try {
652                                    if (_timeoutGet) {
653                                            clusterNodeResponses = _futureClusterResponses.get(
654                                                    _timeout, _timeUnit);
655                                    }
656                                    else {
657                                            clusterNodeResponses = _futureClusterResponses.get();
658                                    }
659    
660                                    _clusterResponseCallback.callback(clusterNodeResponses);
661                            }
662                            catch (InterruptedException ie) {
663                                    _clusterResponseCallback.processInterruptedException(ie);
664                            }
665                            catch (TimeoutException te) {
666                                    _clusterResponseCallback.processTimeoutException(te);
667                            }
668                    }
669    
670                    private final ClusterResponseCallback _clusterResponseCallback;
671                    private final FutureClusterResponses _futureClusterResponses;
672                    private final long _timeout;
673                    private final boolean _timeoutGet;
674                    private final TimeUnit _timeUnit;
675    
676            }
677    
678    }