001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.InetAddressUtil;
035    import com.liferay.portal.kernel.util.MethodHandler;
036    import com.liferay.portal.kernel.util.PropsKeys;
037    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039    import com.liferay.portal.util.PortalPortEventListener;
040    import com.liferay.portal.util.PortalUtil;
041    import com.liferay.portal.util.PropsUtil;
042    import com.liferay.portal.util.PropsValues;
043    
044    import java.io.Serializable;
045    
046    import java.net.InetAddress;
047    
048    import java.util.ArrayList;
049    import java.util.Collection;
050    import java.util.Collections;
051    import java.util.List;
052    import java.util.Map;
053    import java.util.Properties;
054    import java.util.concurrent.BlockingQueue;
055    import java.util.concurrent.ConcurrentHashMap;
056    import java.util.concurrent.CopyOnWriteArrayList;
057    import java.util.concurrent.ExecutorService;
058    import java.util.concurrent.TimeUnit;
059    import java.util.concurrent.TimeoutException;
060    
061    import org.jgroups.JChannel;
062    
063    /**
064     * @author Tina Tian
065     * @author Shuyang Zhou
066     */
067    @DoPrivileged
068    public class ClusterExecutorImpl
069            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
070    
071            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
072                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
073    
074            @Override
075            public void addClusterEventListener(
076                    ClusterEventListener clusterEventListener) {
077    
078                    if (!isEnabled()) {
079                            return;
080                    }
081    
082                    _clusterEventListeners.addIfAbsent(clusterEventListener);
083            }
084    
085            @Override
086            public void afterPropertiesSet() {
087                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
088                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
089                    }
090    
091                    if (PropsValues.LIVE_USERS_ENABLED) {
092                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
093                    }
094    
095                    super.afterPropertiesSet();
096            }
097    
098            @Override
099            public void destroy() {
100                    if (!isEnabled()) {
101                            return;
102                    }
103    
104                    PortalExecutorManagerUtil.shutdown(
105                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
106    
107                    _controlJChannel.setReceiver(null);
108    
109                    _controlJChannel.close();
110    
111                    _clusterEventListeners.clear();
112                    _clusterNodeAddresses.clear();
113                    _futureClusterResponses.clear();
114                    _liveInstances.clear();
115                    _localAddress = null;
116                    _localClusterNode = null;
117            }
118    
119            @Override
120            public FutureClusterResponses execute(ClusterRequest clusterRequest)
121                    throws SystemException {
122    
123                    if (!isEnabled()) {
124                            return null;
125                    }
126    
127                    List<Address> addresses = prepareAddresses(clusterRequest);
128    
129                    FutureClusterResponses futureClusterResponses =
130                            new FutureClusterResponses(addresses);
131    
132                    if (!clusterRequest.isFireAndForget()) {
133                            String uuid = clusterRequest.getUuid();
134    
135                            _futureClusterResponses.put(uuid, futureClusterResponses);
136                    }
137    
138                    if (_shortcutLocalMethod &&
139                            addresses.remove(getLocalClusterNodeAddress())) {
140    
141                            runLocalMethod(clusterRequest, futureClusterResponses);
142                    }
143    
144                    if (clusterRequest.isMulticast()) {
145                            try {
146                                    _controlJChannel.send(null, clusterRequest);
147                            }
148                            catch (Exception e) {
149                                    throw new SystemException(
150                                            "Unable to send multicast request", e);
151                            }
152                    }
153                    else {
154                            for (Address address : addresses) {
155                                    org.jgroups.Address jGroupsAddress =
156                                            (org.jgroups.Address)address.getRealAddress();
157    
158                                    try {
159                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
160                                    }
161                                    catch (Exception e) {
162                                            throw new SystemException(
163                                                    "Unable to send unicast request", e);
164                                    }
165                            }
166                    }
167    
168                    return futureClusterResponses;
169            }
170    
171            @Override
172            public void execute(
173                            ClusterRequest clusterRequest,
174                            ClusterResponseCallback clusterResponseCallback)
175                    throws SystemException {
176    
177                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
178    
179                    ClusterResponseCallbackJob clusterResponseCallbackJob =
180                            new ClusterResponseCallbackJob(
181                                    clusterResponseCallback, futureClusterResponses);
182    
183                    _executorService.execute(clusterResponseCallbackJob);
184            }
185    
186            @Override
187            public void execute(
188                            ClusterRequest clusterRequest,
189                            ClusterResponseCallback clusterResponseCallback, long timeout,
190                            TimeUnit timeUnit)
191                    throws SystemException {
192    
193                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
194    
195                    ClusterResponseCallbackJob clusterResponseCallbackJob =
196                            new ClusterResponseCallbackJob(
197                                    clusterResponseCallback, futureClusterResponses, timeout,
198                                    timeUnit);
199    
200                    _executorService.execute(clusterResponseCallbackJob);
201            }
202    
203            @Override
204            public List<ClusterEventListener> getClusterEventListeners() {
205                    if (!isEnabled()) {
206                            return Collections.emptyList();
207                    }
208    
209                    return Collections.unmodifiableList(_clusterEventListeners);
210            }
211    
212            @Override
213            public List<Address> getClusterNodeAddresses() {
214                    if (!isEnabled()) {
215                            return Collections.emptyList();
216                    }
217    
218                    return getAddresses(_controlJChannel);
219            }
220    
221            @Override
222            public List<ClusterNode> getClusterNodes() {
223                    if (!isEnabled()) {
224                            return Collections.emptyList();
225                    }
226    
227                    return new ArrayList<ClusterNode>(_liveInstances.values());
228            }
229    
230            @Override
231            public ClusterNode getLocalClusterNode() {
232                    if (!isEnabled()) {
233                            return null;
234                    }
235    
236                    return _localClusterNode;
237            }
238    
239            @Override
240            public Address getLocalClusterNodeAddress() {
241                    if (!isEnabled()) {
242                            return null;
243                    }
244    
245                    return _localAddress;
246            }
247    
248            @Override
249            public void initialize() {
250                    if (!isEnabled()) {
251                            return;
252                    }
253    
254                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
255                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
256    
257                    PortalUtil.addPortalPortEventListener(this);
258    
259                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
260    
261                    try {
262                            initLocalClusterNode();
263    
264                            memberJoined(_localAddress, _localClusterNode);
265    
266                            sendNotifyRequest();
267                    }
268                    catch (Exception e) {
269                            _log.error("Unable to determine local network address", e);
270                    }
271    
272                    ClusterRequestReceiver clusterRequestReceiver =
273                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
274    
275                    clusterRequestReceiver.openLatch();
276            }
277    
278            @Override
279            public boolean isClusterNodeAlive(Address address) {
280                    if (!isEnabled()) {
281                            return false;
282                    }
283    
284                    List<Address> addresses = getAddresses(_controlJChannel);
285    
286                    return addresses.contains(address);
287            }
288    
289            @Override
290            public boolean isClusterNodeAlive(String clusterNodeId) {
291                    if (!isEnabled()) {
292                            return false;
293                    }
294    
295                    return _clusterNodeAddresses.containsKey(clusterNodeId);
296            }
297    
298            @Override
299            public void portalPortConfigured(int port) {
300                    if (!isEnabled() ||
301                            (_localClusterNode.getPort() ==
302                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
303    
304                            return;
305                    }
306    
307                    try {
308                            _localClusterNode.setPort(port);
309    
310                            memberJoined(_localAddress, _localClusterNode);
311    
312                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
313                                    ClusterMessageType.UPDATE, _localClusterNode);
314    
315                            _controlJChannel.send(null, clusterRequest);
316                    }
317                    catch (Exception e) {
318                            _log.error("Unable to determine configure node port", e);
319                    }
320            }
321    
322            @Override
323            public void removeClusterEventListener(
324                    ClusterEventListener clusterEventListener) {
325    
326                    if (!isEnabled()) {
327                            return;
328                    }
329    
330                    _clusterEventListeners.remove(clusterEventListener);
331            }
332    
333            public void setClusterEventListeners(
334                    List<ClusterEventListener> clusterEventListeners) {
335    
336                    if (!isEnabled()) {
337                            return;
338                    }
339    
340                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
341            }
342    
343            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
344                    if (!isEnabled()) {
345                            return;
346                    }
347    
348                    _shortcutLocalMethod = shortcutLocalMethod;
349            }
350    
351            protected void fireClusterEvent(ClusterEvent clusterEvent) {
352                    for (ClusterEventListener listener : _clusterEventListeners) {
353                            listener.processClusterEvent(clusterEvent);
354                    }
355            }
356    
357            protected ClusterNodeResponse generateClusterNodeResponse(
358                    ClusterRequest clusterRequest, Object returnValue,
359                    Exception exception) {
360    
361                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
362    
363                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
364                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
365                    clusterNodeResponse.setClusterMessageType(
366                            clusterRequest.getClusterMessageType());
367                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
368                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
369    
370                    if (exception != null) {
371                            clusterNodeResponse.setException(exception);
372                    }
373                    else {
374                            if (returnValue instanceof Serializable) {
375                                    clusterNodeResponse.setResult(returnValue);
376                            }
377                            else if (returnValue != null) {
378                                    clusterNodeResponse.setException(
379                                            new ClusterException("Return value is not serializable"));
380                            }
381                    }
382    
383                    return clusterNodeResponse;
384            }
385    
386            protected JChannel getControlChannel() {
387                    return _controlJChannel;
388            }
389    
390            protected FutureClusterResponses getExecutionResults(String uuid) {
391                    return _futureClusterResponses.get(uuid);
392            }
393    
394            @Override
395            protected void initChannels() throws Exception {
396                    Properties controlProperties = PropsUtil.getProperties(
397                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
398    
399                    String controlProperty = controlProperties.getProperty(
400                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
401    
402                    ClusterRequestReceiver clusterRequestReceiver =
403                            new ClusterRequestReceiver(this);
404    
405                    _controlJChannel = createJChannel(
406                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
407            }
408    
409            protected void initLocalClusterNode() throws Exception {
410                    InetAddress inetAddress = bindInetAddress;
411    
412                    if (inetAddress == null) {
413                            inetAddress = InetAddressUtil.getLocalInetAddress();
414                    }
415    
416                    ClusterNode localClusterNode = new ClusterNode(
417                            PortalUUIDUtil.generate(), inetAddress);
418    
419                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
420                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
421                    }
422                    else {
423                            localClusterNode.setPort(PortalUtil.getPortalPort(false));
424                    }
425    
426                    _localClusterNode = localClusterNode;
427            }
428    
429            protected boolean isShortcutLocalMethod() {
430                    return _shortcutLocalMethod;
431            }
432    
433            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
434                    _liveInstances.put(joinAddress, clusterNode);
435    
436                    Address previousAddress = _clusterNodeAddresses.put(
437                            clusterNode.getClusterNodeId(), joinAddress);
438    
439                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
440                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
441    
442                            // PLACEHOLDER
443    
444                            fireClusterEvent(clusterEvent);
445                    }
446            }
447    
448            protected void memberRemoved(List<Address> departAddresses) {
449                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
450    
451                    for (Address departAddress : departAddresses) {
452                            ClusterNode departClusterNode = _liveInstances.remove(
453                                    departAddress);
454    
455                            if (departClusterNode == null) {
456                                    continue;
457                            }
458    
459                            departClusterNodes.add(departClusterNode);
460    
461                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
462                    }
463    
464                    if (departClusterNodes.isEmpty()) {
465                            return;
466                    }
467    
468                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
469    
470                    fireClusterEvent(clusterEvent);
471            }
472    
473            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
474                    boolean isMulticast = clusterRequest.isMulticast();
475    
476                    List<Address> addresses = null;
477    
478                    if (isMulticast) {
479                            addresses = getAddresses(_controlJChannel);
480                    }
481                    else {
482                            addresses = new ArrayList<Address>();
483    
484                            Collection<Address> clusterNodeAddresses =
485                                    clusterRequest.getTargetClusterNodeAddresses();
486    
487                            if (clusterNodeAddresses != null) {
488                                    addresses.addAll(clusterNodeAddresses);
489                            }
490    
491                            Collection<String> clusterNodeIds =
492                                    clusterRequest.getTargetClusterNodeIds();
493    
494                            if (clusterNodeIds != null) {
495                                    for (String clusterNodeId : clusterNodeIds) {
496                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
497    
498                                            addresses.add(address);
499                                    }
500                            }
501                    }
502    
503                    if (clusterRequest.isSkipLocal()) {
504                            addresses.remove(getLocalClusterNodeAddress());
505                    }
506    
507                    return addresses;
508            }
509    
510            protected void runLocalMethod(
511                    ClusterRequest clusterRequest,
512                    FutureClusterResponses futureClusterResponses) {
513    
514                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
515    
516                    Object returnValue = null;
517                    Exception exception = null;
518    
519                    if (methodHandler == null) {
520                            exception = new ClusterException(
521                                    "Payload is not of type " + MethodHandler.class.getName());
522                    }
523                    else {
524                            try {
525                                    returnValue = methodHandler.invoke(true);
526                            }
527                            catch (Exception e) {
528                                    exception = e;
529                            }
530                    }
531    
532                    if (!clusterRequest.isFireAndForget()) {
533                            ClusterNodeResponse clusterNodeResponse =
534                                    generateClusterNodeResponse(
535                                            clusterRequest, returnValue, exception);
536    
537                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
538                    }
539            }
540    
541            protected void sendNotifyRequest() {
542                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
543                            ClusterMessageType.NOTIFY, _localClusterNode);
544    
545                    try {
546                            _controlJChannel.send(null, clusterRequest);
547                    }
548                    catch (Exception e) {
549                            _log.error("Unable to send notify message", e);
550                    }
551            }
552    
553            private static final String _DEFAULT_CLUSTER_NAME =
554                    "LIFERAY-CONTROL-CHANNEL";
555    
556            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
557    
558            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
559                    new CopyOnWriteArrayList<ClusterEventListener>();
560            private Map<String, Address> _clusterNodeAddresses =
561                    new ConcurrentHashMap<String, Address>();
562            private JChannel _controlJChannel;
563            private ExecutorService _executorService;
564            private Map<String, FutureClusterResponses> _futureClusterResponses =
565                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
566            private Map<Address, ClusterNode> _liveInstances =
567                    new ConcurrentHashMap<Address, ClusterNode>();
568            private Address _localAddress;
569            private ClusterNode _localClusterNode;
570            private boolean _shortcutLocalMethod;
571    
572            private class ClusterResponseCallbackJob implements Runnable {
573    
574                    public ClusterResponseCallbackJob(
575                            ClusterResponseCallback clusterResponseCallback,
576                            FutureClusterResponses futureClusterResponses) {
577    
578                            _clusterResponseCallback = clusterResponseCallback;
579                            _futureClusterResponses = futureClusterResponses;
580                            _timeout = -1;
581                            _timeoutGet = false;
582                            _timeUnit = TimeUnit.SECONDS;
583                    }
584    
585                    public ClusterResponseCallbackJob(
586                            ClusterResponseCallback clusterResponseCallback,
587                            FutureClusterResponses futureClusterResponses, long timeout,
588                            TimeUnit timeUnit) {
589    
590                            _clusterResponseCallback = clusterResponseCallback;
591                            _futureClusterResponses = futureClusterResponses;
592                            _timeout = timeout;
593                            _timeoutGet = true;
594                            _timeUnit = timeUnit;
595                    }
596    
597                    @Override
598                    public void run() {
599                            BlockingQueue<ClusterNodeResponse> blockingQueue =
600                                    _futureClusterResponses.getPartialResults();
601    
602                            _clusterResponseCallback.callback(blockingQueue);
603    
604                            ClusterNodeResponses clusterNodeResponses = null;
605    
606                            try {
607                                    if (_timeoutGet) {
608                                            clusterNodeResponses = _futureClusterResponses.get(
609                                                    _timeout, _timeUnit);
610                                    }
611                                    else {
612                                            clusterNodeResponses = _futureClusterResponses.get();
613                                    }
614    
615                                    _clusterResponseCallback.callback(clusterNodeResponses);
616                            }
617                            catch (InterruptedException ie) {
618                                    _clusterResponseCallback.processInterruptedException(ie);
619                            }
620                            catch (TimeoutException te) {
621                                    _clusterResponseCallback.processTimeoutException(te);
622                            }
623                    }
624    
625                    private final ClusterResponseCallback _clusterResponseCallback;
626                    private final FutureClusterResponses _futureClusterResponses;
627                    private final long _timeout;
628                    private final boolean _timeoutGet;
629                    private final TimeUnit _timeUnit;
630    
631            }
632    
633    }