001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.Http;
035    import com.liferay.portal.kernel.util.InetAddressUtil;
036    import com.liferay.portal.kernel.util.MethodHandler;
037    import com.liferay.portal.kernel.util.PropsKeys;
038    import com.liferay.portal.kernel.util.StringBundler;
039    import com.liferay.portal.kernel.util.StringUtil;
040    import com.liferay.portal.kernel.util.Validator;
041    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
042    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043    import com.liferay.portal.util.PortalPortEventListener;
044    import com.liferay.portal.util.PortalPortProtocolEventListener;
045    import com.liferay.portal.util.PortalUtil;
046    import com.liferay.portal.util.PropsUtil;
047    import com.liferay.portal.util.PropsValues;
048    
049    import java.io.Serializable;
050    
051    import java.net.InetAddress;
052    
053    import java.util.ArrayList;
054    import java.util.Collection;
055    import java.util.Collections;
056    import java.util.List;
057    import java.util.Map;
058    import java.util.concurrent.BlockingQueue;
059    import java.util.concurrent.ConcurrentHashMap;
060    import java.util.concurrent.CopyOnWriteArrayList;
061    import java.util.concurrent.ExecutorService;
062    import java.util.concurrent.TimeUnit;
063    import java.util.concurrent.TimeoutException;
064    
065    import org.jgroups.JChannel;
066    
067    /**
068     * @author Tina Tian
069     * @author Shuyang Zhou
070     */
071    @DoPrivileged
072    public class ClusterExecutorImpl
073            extends ClusterBase
074            implements ClusterExecutor, PortalPortEventListener,
075                               PortalPortProtocolEventListener {
076    
077            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
078                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
079    
080            public Address getCoordinatorAddress() {
081                    return new AddressImpl(_baseReceiver.getCoordinator());
082            }
083    
084            @Override
085            public void addClusterEventListener(
086                    ClusterEventListener clusterEventListener) {
087    
088                    if (!isEnabled()) {
089                            return;
090                    }
091    
092                    _clusterEventListeners.addIfAbsent(clusterEventListener);
093            }
094    
095            @Override
096            public void afterPropertiesSet() {
097                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
098                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
099                    }
100    
101                    if (PropsValues.LIVE_USERS_ENABLED) {
102                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
103                    }
104    
105                    super.afterPropertiesSet();
106            }
107    
108            @Override
109            public void destroy() {
110                    if (!isEnabled()) {
111                            return;
112                    }
113    
114                    PortalExecutorManagerUtil.shutdown(
115                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
116    
117                    _controlJChannel.setReceiver(null);
118    
119                    _controlJChannel.close();
120    
121                    _clusterEventListeners.clear();
122                    _clusterNodeAddresses.clear();
123                    _futureClusterResponses.clear();
124                    _liveInstances.clear();
125                    _localAddress = null;
126                    _localClusterNode = null;
127            }
128    
129            @Override
130            public FutureClusterResponses execute(ClusterRequest clusterRequest)
131                    throws SystemException {
132    
133                    if (!isEnabled()) {
134                            return null;
135                    }
136    
137                    List<Address> addresses = prepareAddresses(clusterRequest);
138    
139                    FutureClusterResponses futureClusterResponses =
140                            new FutureClusterResponses(addresses);
141    
142                    if (!clusterRequest.isFireAndForget()) {
143                            String uuid = clusterRequest.getUuid();
144    
145                            _futureClusterResponses.put(uuid, futureClusterResponses);
146                    }
147    
148                    if (_shortcutLocalMethod &&
149                            addresses.remove(getLocalClusterNodeAddress())) {
150    
151                            runLocalMethod(clusterRequest, futureClusterResponses);
152                    }
153    
154                    if (clusterRequest.isMulticast()) {
155                            try {
156                                    sendJGroupsMessage(_controlJChannel, null, clusterRequest);
157                            }
158                            catch (Exception e) {
159                                    throw new SystemException(
160                                            "Unable to send multicast request", e);
161                            }
162                    }
163                    else {
164                            for (Address address : addresses) {
165                                    org.jgroups.Address jGroupsAddress =
166                                            (org.jgroups.Address)address.getRealAddress();
167    
168                                    try {
169                                            sendJGroupsMessage(
170                                                    _controlJChannel, jGroupsAddress, clusterRequest);
171                                    }
172                                    catch (Exception e) {
173                                            throw new SystemException(
174                                                    "Unable to send unicast request", e);
175                                    }
176                            }
177                    }
178    
179                    return futureClusterResponses;
180            }
181    
182            @Override
183            public void execute(
184                            ClusterRequest clusterRequest,
185                            ClusterResponseCallback clusterResponseCallback)
186                    throws SystemException {
187    
188                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
189    
190                    ClusterResponseCallbackJob clusterResponseCallbackJob =
191                            new ClusterResponseCallbackJob(
192                                    clusterResponseCallback, futureClusterResponses);
193    
194                    _executorService.execute(clusterResponseCallbackJob);
195            }
196    
197            @Override
198            public void execute(
199                            ClusterRequest clusterRequest,
200                            ClusterResponseCallback clusterResponseCallback, long timeout,
201                            TimeUnit timeUnit)
202                    throws SystemException {
203    
204                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
205    
206                    ClusterResponseCallbackJob clusterResponseCallbackJob =
207                            new ClusterResponseCallbackJob(
208                                    clusterResponseCallback, futureClusterResponses, timeout,
209                                    timeUnit);
210    
211                    _executorService.execute(clusterResponseCallbackJob);
212            }
213    
214            @Override
215            public List<ClusterEventListener> getClusterEventListeners() {
216                    if (!isEnabled()) {
217                            return Collections.emptyList();
218                    }
219    
220                    return Collections.unmodifiableList(_clusterEventListeners);
221            }
222    
223            @Override
224            public List<Address> getClusterNodeAddresses() {
225                    if (!isEnabled()) {
226                            return Collections.emptyList();
227                    }
228    
229                    return getAddresses(_controlJChannel);
230            }
231    
232            @Override
233            public List<ClusterNode> getClusterNodes() {
234                    if (!isEnabled()) {
235                            return Collections.emptyList();
236                    }
237    
238                    return new ArrayList<ClusterNode>(_liveInstances.values());
239            }
240    
241            @Override
242            public ClusterNode getLocalClusterNode() {
243                    if (!isEnabled()) {
244                            return null;
245                    }
246    
247                    return _localClusterNode;
248            }
249    
250            @Override
251            public Address getLocalClusterNodeAddress() {
252                    if (!isEnabled()) {
253                            return null;
254                    }
255    
256                    return _localAddress;
257            }
258    
259            @Override
260            public void initialize() {
261                    if (!isEnabled()) {
262                            return;
263                    }
264    
265                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
266                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
267    
268                    PortalUtil.addPortalPortProtocolEventListener(this);
269    
270                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
271    
272                    try {
273                            initLocalClusterNode();
274    
275                            memberJoined(_localAddress, _localClusterNode);
276    
277                            sendNotifyRequest();
278                    }
279                    catch (Exception e) {
280                            _log.error("Unable to determine local network address", e);
281                    }
282    
283                    _baseReceiver.openLatch();
284            }
285    
286            @Override
287            public boolean isClusterNodeAlive(Address address) {
288                    if (!isEnabled()) {
289                            return false;
290                    }
291    
292                    List<Address> addresses = getAddresses(_controlJChannel);
293    
294                    return addresses.contains(address);
295            }
296    
297            @Override
298            public boolean isClusterNodeAlive(String clusterNodeId) {
299                    if (!isEnabled()) {
300                            return false;
301                    }
302    
303                    return _clusterNodeAddresses.containsKey(clusterNodeId);
304            }
305    
306            /**
307             * @deprecated As of 6.2.0, replaced by {@link
308             *             #portalPortProtocolConfigured(int, Boolean)}
309             */
310            @Override
311            public void portalPortConfigured(int port) {
312                    portalPortProtocolConfigured(port, null);
313            }
314    
315            @Override
316            public void portalPortProtocolConfigured(int port, Boolean secure) {
317                    if (!isEnabled() || (port <= 0) || (secure == null)) {
318                            return;
319                    }
320    
321                    if (Validator.isNotNull(_localClusterNode.getPortalProtocol())) {
322                            return;
323                    }
324    
325                    if (secure) {
326                            _localClusterNode.setPortalProtocol(Http.HTTPS);
327                    }
328                    else {
329                            _localClusterNode.setPortalProtocol(Http.HTTP);
330                    }
331    
332                    try {
333                            _localClusterNode.setPort(port);
334    
335                            memberJoined(_localAddress, _localClusterNode);
336    
337                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
338                                    ClusterMessageType.UPDATE, _localClusterNode);
339    
340                            sendJGroupsMessage(_controlJChannel, null, clusterRequest);
341                    }
342                    catch (Exception e) {
343                            _log.error("Unable to determine configure node port", e);
344                    }
345            }
346    
347            @Override
348            public void removeClusterEventListener(
349                    ClusterEventListener clusterEventListener) {
350    
351                    if (!isEnabled()) {
352                            return;
353                    }
354    
355                    _clusterEventListeners.remove(clusterEventListener);
356            }
357    
358            public void setClusterEventListeners(
359                    List<ClusterEventListener> clusterEventListeners) {
360    
361                    if (!isEnabled()) {
362                            return;
363                    }
364    
365                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
366            }
367    
368            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
369                    if (!isEnabled()) {
370                            return;
371                    }
372    
373                    _shortcutLocalMethod = shortcutLocalMethod;
374            }
375    
376            protected void fireClusterEvent(ClusterEvent clusterEvent) {
377                    for (ClusterEventListener listener : _clusterEventListeners) {
378                            listener.processClusterEvent(clusterEvent);
379                    }
380            }
381    
382            protected ClusterNodeResponse generateClusterNodeResponse(
383                    ClusterRequest clusterRequest, Object returnValue,
384                    Exception exception) {
385    
386                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
387    
388                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
389                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
390                    clusterNodeResponse.setClusterMessageType(
391                            clusterRequest.getClusterMessageType());
392                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
393                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
394    
395                    if (exception != null) {
396                            clusterNodeResponse.setException(exception);
397                    }
398                    else {
399                            if (returnValue instanceof Serializable) {
400                                    clusterNodeResponse.setResult(returnValue);
401                            }
402                            else if (returnValue != null) {
403                                    clusterNodeResponse.setException(
404                                            new ClusterException("Return value is not serializable"));
405                            }
406                    }
407    
408                    return clusterNodeResponse;
409            }
410    
411            protected JChannel getControlChannel() {
412                    return _controlJChannel;
413            }
414    
415            protected FutureClusterResponses getExecutionResults(String uuid) {
416                    return _futureClusterResponses.get(uuid);
417            }
418    
419            @Override
420            protected void initChannels() throws Exception {
421                    String channelName = PropsUtil.get(
422                            PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL);
423    
424                    if (Validator.isNull(channelName)) {
425                            throw new IllegalStateException(
426                                    "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL +
427                                            "\"");
428                    }
429    
430                    String controlProperty = PropsUtil.get(
431                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
432    
433                    if (Validator.isNull(controlProperty)) {
434                            throw new IllegalStateException(
435                                    "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL +
436                                            "\"");
437                    }
438    
439                    _baseReceiver = new ClusterRequestReceiver(this);
440    
441                    _controlJChannel = createJChannel(
442                            controlProperty, _baseReceiver, channelName);
443            }
444    
445            protected void initLocalClusterNode() throws Exception {
446                    InetAddress inetAddress = bindInetAddress;
447    
448                    if (inetAddress == null) {
449                            inetAddress = InetAddressUtil.getLocalInetAddress();
450                    }
451    
452                    ClusterNode localClusterNode = new ClusterNode(
453                            PortalUUIDUtil.generate(), inetAddress);
454    
455                    if (StringUtil.equalsIgnoreCase(
456                                    Http.HTTPS, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
457                            (PropsValues.PORTAL_INSTANCE_HTTPS_PORT > 0)) {
458    
459                            localClusterNode.setPortalProtocol(Http.HTTPS);
460                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTPS_PORT);
461                    }
462                    else if (StringUtil.equalsIgnoreCase(
463                                            Http.HTTP, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
464                                     (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0)) {
465    
466                            localClusterNode.setPortalProtocol(Http.HTTP);
467                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
468                    }
469                    else {
470                            if (_log.isWarnEnabled()) {
471                                    StringBundler sb = new StringBundler(8);
472    
473                                    sb.append("Unable to configure node protocol and port. ");
474                                    sb.append("This configuration will be inferred dynamically ");
475                                    sb.append("from the first request but static configuration ");
476                                    sb.append("is recomended to avoid comunications problems ");
477                                    sb.append("between nodes. Please set the right values for ");
478                                    sb.append("\"portal.instance.protocol\" and ");
479                                    sb.append("\"portal.instance.http.port\" or ");
480                                    sb.append("\"portal.instance.https.port\".");
481    
482                                    _log.warn(sb.toString());
483                            }
484                    }
485    
486                    _localClusterNode = localClusterNode;
487    
488                    if (_log.isDebugEnabled()) {
489                            _log.debug(
490                                    "Initialized cluster node: " + localClusterNode.toString());
491                    }
492            }
493    
494            protected boolean isShortcutLocalMethod() {
495                    return _shortcutLocalMethod;
496            }
497    
498            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
499                    _liveInstances.put(joinAddress, clusterNode);
500    
501                    Address previousAddress = _clusterNodeAddresses.put(
502                            clusterNode.getClusterNodeId(), joinAddress);
503    
504                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
505                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
506    
507                            // PLACEHOLDER
508    
509                            fireClusterEvent(clusterEvent);
510                    }
511            }
512    
513            protected void memberRemoved(List<Address> departAddresses) {
514                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
515    
516                    for (Address departAddress : departAddresses) {
517                            ClusterNode departClusterNode = _liveInstances.remove(
518                                    departAddress);
519    
520                            if (departClusterNode == null) {
521                                    continue;
522                            }
523    
524                            departClusterNodes.add(departClusterNode);
525    
526                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
527                    }
528    
529                    if (departClusterNodes.isEmpty()) {
530                            return;
531                    }
532    
533                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
534    
535                    fireClusterEvent(clusterEvent);
536            }
537    
538            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
539                    boolean isMulticast = clusterRequest.isMulticast();
540    
541                    List<Address> addresses = null;
542    
543                    if (isMulticast) {
544                            addresses = getAddresses(_controlJChannel);
545                    }
546                    else {
547                            addresses = new ArrayList<Address>();
548    
549                            Collection<Address> clusterNodeAddresses =
550                                    clusterRequest.getTargetClusterNodeAddresses();
551    
552                            if (clusterNodeAddresses != null) {
553                                    addresses.addAll(clusterNodeAddresses);
554                            }
555    
556                            Collection<String> clusterNodeIds =
557                                    clusterRequest.getTargetClusterNodeIds();
558    
559                            if (clusterNodeIds != null) {
560                                    for (String clusterNodeId : clusterNodeIds) {
561                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
562    
563                                            addresses.add(address);
564                                    }
565                            }
566                    }
567    
568                    if (clusterRequest.isSkipLocal()) {
569                            addresses.remove(getLocalClusterNodeAddress());
570                    }
571    
572                    return addresses;
573            }
574    
575            protected void runLocalMethod(
576                    ClusterRequest clusterRequest,
577                    FutureClusterResponses futureClusterResponses) {
578    
579                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
580    
581                    Object returnValue = null;
582                    Exception exception = null;
583    
584                    if (methodHandler == null) {
585                            exception = new ClusterException(
586                                    "Payload is not of type " + MethodHandler.class.getName());
587                    }
588                    else {
589                            try {
590                                    returnValue = methodHandler.invoke(true);
591                            }
592                            catch (Exception e) {
593                                    exception = e;
594                            }
595                    }
596    
597                    if (!clusterRequest.isFireAndForget()) {
598                            ClusterNodeResponse clusterNodeResponse =
599                                    generateClusterNodeResponse(
600                                            clusterRequest, returnValue, exception);
601    
602                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
603                    }
604            }
605    
606            protected void sendNotifyRequest() {
607                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
608                            ClusterMessageType.NOTIFY, _localClusterNode);
609    
610                    try {
611                            sendJGroupsMessage(_controlJChannel, null, clusterRequest);
612                    }
613                    catch (Exception e) {
614                            _log.error("Unable to send notify message", e);
615                    }
616            }
617    
618            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
619    
620            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
621                    new CopyOnWriteArrayList<ClusterEventListener>();
622            private Map<String, Address> _clusterNodeAddresses =
623                    new ConcurrentHashMap<String, Address>();
624            private JChannel _controlJChannel;
625            private ExecutorService _executorService;
626            private Map<String, FutureClusterResponses> _futureClusterResponses =
627                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
628            private Map<Address, ClusterNode> _liveInstances =
629                    new ConcurrentHashMap<Address, ClusterNode>();
630            private Address _localAddress;
631            private ClusterNode _localClusterNode;
632            private boolean _shortcutLocalMethod;
633            private BaseReceiver _baseReceiver;
634    
635            private class ClusterResponseCallbackJob implements Runnable {
636    
637                    public ClusterResponseCallbackJob(
638                            ClusterResponseCallback clusterResponseCallback,
639                            FutureClusterResponses futureClusterResponses) {
640    
641                            _clusterResponseCallback = clusterResponseCallback;
642                            _futureClusterResponses = futureClusterResponses;
643                            _timeout = -1;
644                            _timeoutGet = false;
645                            _timeUnit = TimeUnit.SECONDS;
646                    }
647    
648                    public ClusterResponseCallbackJob(
649                            ClusterResponseCallback clusterResponseCallback,
650                            FutureClusterResponses futureClusterResponses, long timeout,
651                            TimeUnit timeUnit) {
652    
653                            _clusterResponseCallback = clusterResponseCallback;
654                            _futureClusterResponses = futureClusterResponses;
655                            _timeout = timeout;
656                            _timeoutGet = true;
657                            _timeUnit = timeUnit;
658                    }
659    
660                    @Override
661                    public void run() {
662                            BlockingQueue<ClusterNodeResponse> blockingQueue =
663                                    _futureClusterResponses.getPartialResults();
664    
665                            _clusterResponseCallback.callback(blockingQueue);
666    
667                            ClusterNodeResponses clusterNodeResponses = null;
668    
669                            try {
670                                    if (_timeoutGet) {
671                                            clusterNodeResponses = _futureClusterResponses.get(
672                                                    _timeout, _timeUnit);
673                                    }
674                                    else {
675                                            clusterNodeResponses = _futureClusterResponses.get();
676                                    }
677    
678                                    _clusterResponseCallback.callback(clusterNodeResponses);
679                            }
680                            catch (InterruptedException ie) {
681                                    _clusterResponseCallback.processInterruptedException(ie);
682                            }
683                            catch (TimeoutException te) {
684                                    _clusterResponseCallback.processTimeoutException(te);
685                            }
686                    }
687    
688                    private final ClusterResponseCallback _clusterResponseCallback;
689                    private final FutureClusterResponses _futureClusterResponses;
690                    private final long _timeout;
691                    private final boolean _timeoutGet;
692                    private final TimeUnit _timeUnit;
693    
694            }
695    
696    }