001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.Http;
035    import com.liferay.portal.kernel.util.InetAddressUtil;
036    import com.liferay.portal.kernel.util.MethodHandler;
037    import com.liferay.portal.kernel.util.PropsKeys;
038    import com.liferay.portal.kernel.util.StringUtil;
039    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
040    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
041    import com.liferay.portal.util.PortalPortEventListener;
042    import com.liferay.portal.util.PortalUtil;
043    import com.liferay.portal.util.PropsUtil;
044    import com.liferay.portal.util.PropsValues;
045    
046    import java.io.Serializable;
047    
048    import java.net.InetAddress;
049    
050    import java.util.ArrayList;
051    import java.util.Collection;
052    import java.util.Collections;
053    import java.util.List;
054    import java.util.Map;
055    import java.util.Properties;
056    import java.util.concurrent.BlockingQueue;
057    import java.util.concurrent.ConcurrentHashMap;
058    import java.util.concurrent.CopyOnWriteArrayList;
059    import java.util.concurrent.ExecutorService;
060    import java.util.concurrent.TimeUnit;
061    import java.util.concurrent.TimeoutException;
062    
063    import org.jgroups.JChannel;
064    
065    /**
066     * @author Tina Tian
067     * @author Shuyang Zhou
068     */
069    @DoPrivileged
070    public class ClusterExecutorImpl
071            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
072    
073            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
074                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
075    
076            @Override
077            public void addClusterEventListener(
078                    ClusterEventListener clusterEventListener) {
079    
080                    if (!isEnabled()) {
081                            return;
082                    }
083    
084                    _clusterEventListeners.addIfAbsent(clusterEventListener);
085            }
086    
087            @Override
088            public void afterPropertiesSet() {
089                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
090                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
091                    }
092    
093                    if (PropsValues.LIVE_USERS_ENABLED) {
094                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
095                    }
096    
097                    _secure = StringUtil.equalsIgnoreCase(
098                            Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
099    
100                    if (_secure) {
101                            _port = PropsValues.PORTAL_INSTANCE_HTTPS_PORT;
102                    }
103                    else {
104                            _port = PropsValues.PORTAL_INSTANCE_HTTP_PORT;
105                    }
106    
107                    super.afterPropertiesSet();
108            }
109    
110            @Override
111            public void destroy() {
112                    if (!isEnabled()) {
113                            return;
114                    }
115    
116                    PortalExecutorManagerUtil.shutdown(
117                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
118    
119                    _controlJChannel.setReceiver(null);
120    
121                    _controlJChannel.close();
122    
123                    _clusterEventListeners.clear();
124                    _clusterNodeAddresses.clear();
125                    _futureClusterResponses.clear();
126                    _liveInstances.clear();
127                    _localAddress = null;
128                    _localClusterNode = null;
129            }
130    
131            @Override
132            public FutureClusterResponses execute(ClusterRequest clusterRequest)
133                    throws SystemException {
134    
135                    if (!isEnabled()) {
136                            return null;
137                    }
138    
139                    List<Address> addresses = prepareAddresses(clusterRequest);
140    
141                    FutureClusterResponses futureClusterResponses =
142                            new FutureClusterResponses(addresses);
143    
144                    if (!clusterRequest.isFireAndForget()) {
145                            String uuid = clusterRequest.getUuid();
146    
147                            _futureClusterResponses.put(uuid, futureClusterResponses);
148                    }
149    
150                    if (_shortcutLocalMethod &&
151                            addresses.remove(getLocalClusterNodeAddress())) {
152    
153                            runLocalMethod(clusterRequest, futureClusterResponses);
154                    }
155    
156                    if (clusterRequest.isMulticast()) {
157                            try {
158                                    _controlJChannel.send(null, clusterRequest);
159                            }
160                            catch (Exception e) {
161                                    throw new SystemException(
162                                            "Unable to send multicast request", e);
163                            }
164                    }
165                    else {
166                            for (Address address : addresses) {
167                                    org.jgroups.Address jGroupsAddress =
168                                            (org.jgroups.Address)address.getRealAddress();
169    
170                                    try {
171                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
172                                    }
173                                    catch (Exception e) {
174                                            throw new SystemException(
175                                                    "Unable to send unicast request", e);
176                                    }
177                            }
178                    }
179    
180                    return futureClusterResponses;
181            }
182    
183            @Override
184            public void execute(
185                            ClusterRequest clusterRequest,
186                            ClusterResponseCallback clusterResponseCallback)
187                    throws SystemException {
188    
189                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
190    
191                    ClusterResponseCallbackJob clusterResponseCallbackJob =
192                            new ClusterResponseCallbackJob(
193                                    clusterResponseCallback, futureClusterResponses);
194    
195                    _executorService.execute(clusterResponseCallbackJob);
196            }
197    
198            @Override
199            public void execute(
200                            ClusterRequest clusterRequest,
201                            ClusterResponseCallback clusterResponseCallback, long timeout,
202                            TimeUnit timeUnit)
203                    throws SystemException {
204    
205                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
206    
207                    ClusterResponseCallbackJob clusterResponseCallbackJob =
208                            new ClusterResponseCallbackJob(
209                                    clusterResponseCallback, futureClusterResponses, timeout,
210                                    timeUnit);
211    
212                    _executorService.execute(clusterResponseCallbackJob);
213            }
214    
215            @Override
216            public List<ClusterEventListener> getClusterEventListeners() {
217                    if (!isEnabled()) {
218                            return Collections.emptyList();
219                    }
220    
221                    return Collections.unmodifiableList(_clusterEventListeners);
222            }
223    
224            @Override
225            public List<Address> getClusterNodeAddresses() {
226                    if (!isEnabled()) {
227                            return Collections.emptyList();
228                    }
229    
230                    return getAddresses(_controlJChannel);
231            }
232    
233            @Override
234            public List<ClusterNode> getClusterNodes() {
235                    if (!isEnabled()) {
236                            return Collections.emptyList();
237                    }
238    
239                    return new ArrayList<ClusterNode>(_liveInstances.values());
240            }
241    
242            @Override
243            public ClusterNode getLocalClusterNode() {
244                    if (!isEnabled()) {
245                            return null;
246                    }
247    
248                    return _localClusterNode;
249            }
250    
251            @Override
252            public Address getLocalClusterNodeAddress() {
253                    if (!isEnabled()) {
254                            return null;
255                    }
256    
257                    return _localAddress;
258            }
259    
260            @Override
261            public void initialize() {
262                    if (!isEnabled()) {
263                            return;
264                    }
265    
266                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
267                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
268    
269                    PortalUtil.addPortalPortEventListener(this);
270    
271                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
272    
273                    try {
274                            initLocalClusterNode();
275    
276                            memberJoined(_localAddress, _localClusterNode);
277    
278                            sendNotifyRequest();
279                    }
280                    catch (Exception e) {
281                            _log.error("Unable to determine local network address", e);
282                    }
283    
284                    ClusterRequestReceiver clusterRequestReceiver =
285                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
286    
287                    clusterRequestReceiver.openLatch();
288            }
289    
290            @Override
291            public boolean isClusterNodeAlive(Address address) {
292                    if (!isEnabled()) {
293                            return false;
294                    }
295    
296                    List<Address> addresses = getAddresses(_controlJChannel);
297    
298                    return addresses.contains(address);
299            }
300    
301            @Override
302            public boolean isClusterNodeAlive(String clusterNodeId) {
303                    if (!isEnabled()) {
304                            return false;
305                    }
306    
307                    return _clusterNodeAddresses.containsKey(clusterNodeId);
308            }
309    
310            @Override
311            public void portalPortConfigured(int port) {
312                    if (!isEnabled() || (_localClusterNode.getPort() == _port)) {
313                            return;
314                    }
315    
316                    try {
317                            _localClusterNode.setPort(port);
318    
319                            memberJoined(_localAddress, _localClusterNode);
320    
321                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
322                                    ClusterMessageType.UPDATE, _localClusterNode);
323    
324                            _controlJChannel.send(null, clusterRequest);
325                    }
326                    catch (Exception e) {
327                            _log.error("Unable to determine configure node port", e);
328                    }
329            }
330    
331            @Override
332            public void removeClusterEventListener(
333                    ClusterEventListener clusterEventListener) {
334    
335                    if (!isEnabled()) {
336                            return;
337                    }
338    
339                    _clusterEventListeners.remove(clusterEventListener);
340            }
341    
342            public void setClusterEventListeners(
343                    List<ClusterEventListener> clusterEventListeners) {
344    
345                    if (!isEnabled()) {
346                            return;
347                    }
348    
349                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
350            }
351    
352            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
353                    if (!isEnabled()) {
354                            return;
355                    }
356    
357                    _shortcutLocalMethod = shortcutLocalMethod;
358            }
359    
360            protected void fireClusterEvent(ClusterEvent clusterEvent) {
361                    for (ClusterEventListener listener : _clusterEventListeners) {
362                            listener.processClusterEvent(clusterEvent);
363                    }
364            }
365    
366            protected ClusterNodeResponse generateClusterNodeResponse(
367                    ClusterRequest clusterRequest, Object returnValue,
368                    Exception exception) {
369    
370                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
371    
372                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
373                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
374                    clusterNodeResponse.setClusterMessageType(
375                            clusterRequest.getClusterMessageType());
376                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
377                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
378    
379                    if (exception != null) {
380                            clusterNodeResponse.setException(exception);
381                    }
382                    else {
383                            if (returnValue instanceof Serializable) {
384                                    clusterNodeResponse.setResult(returnValue);
385                            }
386                            else if (returnValue != null) {
387                                    clusterNodeResponse.setException(
388                                            new ClusterException("Return value is not serializable"));
389                            }
390                    }
391    
392                    return clusterNodeResponse;
393            }
394    
395            protected JChannel getControlChannel() {
396                    return _controlJChannel;
397            }
398    
399            protected FutureClusterResponses getExecutionResults(String uuid) {
400                    return _futureClusterResponses.get(uuid);
401            }
402    
403            @Override
404            protected void initChannels() throws Exception {
405                    Properties controlProperties = PropsUtil.getProperties(
406                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
407    
408                    String controlProperty = controlProperties.getProperty(
409                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
410    
411                    ClusterRequestReceiver clusterRequestReceiver =
412                            new ClusterRequestReceiver(this);
413    
414                    _controlJChannel = createJChannel(
415                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
416            }
417    
418            protected void initLocalClusterNode() throws Exception {
419                    InetAddress inetAddress = bindInetAddress;
420    
421                    if (inetAddress == null) {
422                            inetAddress = InetAddressUtil.getLocalInetAddress();
423                    }
424    
425                    ClusterNode localClusterNode = new ClusterNode(
426                            PortalUUIDUtil.generate(), inetAddress);
427    
428                    int port = _port;
429    
430                    if (port <= 0) {
431                            port = PortalUtil.getPortalPort(_secure);
432                    }
433    
434                    localClusterNode.setPort(port);
435    
436                    _localClusterNode = localClusterNode;
437            }
438    
439            protected boolean isShortcutLocalMethod() {
440                    return _shortcutLocalMethod;
441            }
442    
443            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
444                    _liveInstances.put(joinAddress, clusterNode);
445    
446                    Address previousAddress = _clusterNodeAddresses.put(
447                            clusterNode.getClusterNodeId(), joinAddress);
448    
449                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
450                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
451    
452                            // PLACEHOLDER
453    
454                            fireClusterEvent(clusterEvent);
455                    }
456            }
457    
458            protected void memberRemoved(List<Address> departAddresses) {
459                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
460    
461                    for (Address departAddress : departAddresses) {
462                            ClusterNode departClusterNode = _liveInstances.remove(
463                                    departAddress);
464    
465                            if (departClusterNode == null) {
466                                    continue;
467                            }
468    
469                            departClusterNodes.add(departClusterNode);
470    
471                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
472                    }
473    
474                    if (departClusterNodes.isEmpty()) {
475                            return;
476                    }
477    
478                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
479    
480                    fireClusterEvent(clusterEvent);
481            }
482    
483            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
484                    boolean isMulticast = clusterRequest.isMulticast();
485    
486                    List<Address> addresses = null;
487    
488                    if (isMulticast) {
489                            addresses = getAddresses(_controlJChannel);
490                    }
491                    else {
492                            addresses = new ArrayList<Address>();
493    
494                            Collection<Address> clusterNodeAddresses =
495                                    clusterRequest.getTargetClusterNodeAddresses();
496    
497                            if (clusterNodeAddresses != null) {
498                                    addresses.addAll(clusterNodeAddresses);
499                            }
500    
501                            Collection<String> clusterNodeIds =
502                                    clusterRequest.getTargetClusterNodeIds();
503    
504                            if (clusterNodeIds != null) {
505                                    for (String clusterNodeId : clusterNodeIds) {
506                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
507    
508                                            addresses.add(address);
509                                    }
510                            }
511                    }
512    
513                    if (clusterRequest.isSkipLocal()) {
514                            addresses.remove(getLocalClusterNodeAddress());
515                    }
516    
517                    return addresses;
518            }
519    
520            protected void runLocalMethod(
521                    ClusterRequest clusterRequest,
522                    FutureClusterResponses futureClusterResponses) {
523    
524                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
525    
526                    Object returnValue = null;
527                    Exception exception = null;
528    
529                    if (methodHandler == null) {
530                            exception = new ClusterException(
531                                    "Payload is not of type " + MethodHandler.class.getName());
532                    }
533                    else {
534                            try {
535                                    returnValue = methodHandler.invoke(true);
536                            }
537                            catch (Exception e) {
538                                    exception = e;
539                            }
540                    }
541    
542                    if (!clusterRequest.isFireAndForget()) {
543                            ClusterNodeResponse clusterNodeResponse =
544                                    generateClusterNodeResponse(
545                                            clusterRequest, returnValue, exception);
546    
547                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
548                    }
549            }
550    
551            protected void sendNotifyRequest() {
552                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
553                            ClusterMessageType.NOTIFY, _localClusterNode);
554    
555                    try {
556                            _controlJChannel.send(null, clusterRequest);
557                    }
558                    catch (Exception e) {
559                            _log.error("Unable to send notify message", e);
560                    }
561            }
562    
563            private static final String _DEFAULT_CLUSTER_NAME =
564                    "LIFERAY-CONTROL-CHANNEL";
565    
566            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
567    
568            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
569                    new CopyOnWriteArrayList<ClusterEventListener>();
570            private Map<String, Address> _clusterNodeAddresses =
571                    new ConcurrentHashMap<String, Address>();
572            private JChannel _controlJChannel;
573            private ExecutorService _executorService;
574            private Map<String, FutureClusterResponses> _futureClusterResponses =
575                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
576            private Map<Address, ClusterNode> _liveInstances =
577                    new ConcurrentHashMap<Address, ClusterNode>();
578            private Address _localAddress;
579            private ClusterNode _localClusterNode;
580            private int _port;
581            private boolean _secure;
582            private boolean _shortcutLocalMethod;
583    
584            private class ClusterResponseCallbackJob implements Runnable {
585    
586                    public ClusterResponseCallbackJob(
587                            ClusterResponseCallback clusterResponseCallback,
588                            FutureClusterResponses futureClusterResponses) {
589    
590                            _clusterResponseCallback = clusterResponseCallback;
591                            _futureClusterResponses = futureClusterResponses;
592                            _timeout = -1;
593                            _timeoutGet = false;
594                            _timeUnit = TimeUnit.SECONDS;
595                    }
596    
597                    public ClusterResponseCallbackJob(
598                            ClusterResponseCallback clusterResponseCallback,
599                            FutureClusterResponses futureClusterResponses, long timeout,
600                            TimeUnit timeUnit) {
601    
602                            _clusterResponseCallback = clusterResponseCallback;
603                            _futureClusterResponses = futureClusterResponses;
604                            _timeout = timeout;
605                            _timeoutGet = true;
606                            _timeUnit = timeUnit;
607                    }
608    
609                    @Override
610                    public void run() {
611                            BlockingQueue<ClusterNodeResponse> blockingQueue =
612                                    _futureClusterResponses.getPartialResults();
613    
614                            _clusterResponseCallback.callback(blockingQueue);
615    
616                            ClusterNodeResponses clusterNodeResponses = null;
617    
618                            try {
619                                    if (_timeoutGet) {
620                                            clusterNodeResponses = _futureClusterResponses.get(
621                                                    _timeout, _timeUnit);
622                                    }
623                                    else {
624                                            clusterNodeResponses = _futureClusterResponses.get();
625                                    }
626    
627                                    _clusterResponseCallback.callback(clusterNodeResponses);
628                            }
629                            catch (InterruptedException ie) {
630                                    _clusterResponseCallback.processInterruptedException(ie);
631                            }
632                            catch (TimeoutException te) {
633                                    _clusterResponseCallback.processTimeoutException(te);
634                            }
635                    }
636    
637                    private final ClusterResponseCallback _clusterResponseCallback;
638                    private final FutureClusterResponses _futureClusterResponses;
639                    private final long _timeout;
640                    private final boolean _timeoutGet;
641                    private final TimeUnit _timeUnit;
642    
643            }
644    
645    }