001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.InetAddressUtil;
035    import com.liferay.portal.kernel.util.MethodHandler;
036    import com.liferay.portal.kernel.util.PropsKeys;
037    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039    import com.liferay.portal.util.PortalPortEventListener;
040    import com.liferay.portal.util.PortalUtil;
041    import com.liferay.portal.util.PropsUtil;
042    import com.liferay.portal.util.PropsValues;
043    
044    import java.io.Serializable;
045    
046    import java.net.InetAddress;
047    
048    import java.util.ArrayList;
049    import java.util.Collection;
050    import java.util.Collections;
051    import java.util.List;
052    import java.util.Map;
053    import java.util.Properties;
054    import java.util.concurrent.BlockingQueue;
055    import java.util.concurrent.ConcurrentHashMap;
056    import java.util.concurrent.CopyOnWriteArrayList;
057    import java.util.concurrent.ExecutorService;
058    import java.util.concurrent.TimeUnit;
059    import java.util.concurrent.TimeoutException;
060    
061    import org.jgroups.JChannel;
062    
063    /**
064     * @author Tina Tian
065     * @author Shuyang Zhou
066     */
067    @DoPrivileged
068    public class ClusterExecutorImpl
069            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
070    
071            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
072                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
073    
074            public void addClusterEventListener(
075                    ClusterEventListener clusterEventListener) {
076    
077                    if (!isEnabled()) {
078                            return;
079                    }
080    
081                    _clusterEventListeners.addIfAbsent(clusterEventListener);
082            }
083    
084            @Override
085            public void afterPropertiesSet() {
086                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
087                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
088                    }
089    
090                    if (PropsValues.LIVE_USERS_ENABLED) {
091                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
092                    }
093    
094                    super.afterPropertiesSet();
095            }
096    
097            @Override
098            public void destroy() {
099                    if (!isEnabled()) {
100                            return;
101                    }
102    
103                    PortalExecutorManagerUtil.shutdown(
104                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
105    
106                    _controlJChannel.setReceiver(null);
107    
108                    _controlJChannel.close();
109    
110                    _clusterEventListeners.clear();
111                    _clusterNodeAddresses.clear();
112                    _futureClusterResponses.clear();
113                    _liveInstances.clear();
114                    _localAddress = null;
115                    _localClusterNode = null;
116            }
117    
118            public FutureClusterResponses execute(ClusterRequest clusterRequest)
119                    throws SystemException {
120    
121                    if (!isEnabled()) {
122                            return null;
123                    }
124    
125                    List<Address> addresses = prepareAddresses(clusterRequest);
126    
127                    FutureClusterResponses futureClusterResponses =
128                            new FutureClusterResponses(addresses);
129    
130                    if (!clusterRequest.isFireAndForget()) {
131                            String uuid = clusterRequest.getUuid();
132    
133                            _futureClusterResponses.put(uuid, futureClusterResponses);
134                    }
135    
136                    if (_shortcutLocalMethod &&
137                            addresses.remove(getLocalClusterNodeAddress())) {
138    
139                            runLocalMethod(clusterRequest, futureClusterResponses);
140                    }
141    
142                    if (clusterRequest.isMulticast()) {
143                            try {
144                                    _controlJChannel.send(null, clusterRequest);
145                            }
146                            catch (Exception e) {
147                                    throw new SystemException(
148                                            "Unable to send multicast request", e);
149                            }
150                    }
151                    else {
152                            for (Address address : addresses) {
153                                    org.jgroups.Address jGroupsAddress =
154                                            (org.jgroups.Address)address.getRealAddress();
155    
156                                    try {
157                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
158                                    }
159                                    catch (Exception e) {
160                                            throw new SystemException(
161                                                    "Unable to send unicast request", e);
162                                    }
163                            }
164                    }
165    
166                    return futureClusterResponses;
167            }
168    
169            public void execute(
170                            ClusterRequest clusterRequest,
171                            ClusterResponseCallback clusterResponseCallback)
172                    throws SystemException {
173    
174                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
175    
176                    ClusterResponseCallbackJob clusterResponseCallbackJob =
177                            new ClusterResponseCallbackJob(
178                                    clusterResponseCallback, futureClusterResponses);
179    
180                    _executorService.execute(clusterResponseCallbackJob);
181            }
182    
183            public void execute(
184                            ClusterRequest clusterRequest,
185                            ClusterResponseCallback clusterResponseCallback, long timeout,
186                            TimeUnit timeUnit)
187                    throws SystemException {
188    
189                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
190    
191                    ClusterResponseCallbackJob clusterResponseCallbackJob =
192                            new ClusterResponseCallbackJob(
193                                    clusterResponseCallback, futureClusterResponses, timeout,
194                                    timeUnit);
195    
196                    _executorService.execute(clusterResponseCallbackJob);
197            }
198    
199            public List<ClusterEventListener> getClusterEventListeners() {
200                    if (!isEnabled()) {
201                            return Collections.emptyList();
202                    }
203    
204                    return Collections.unmodifiableList(_clusterEventListeners);
205            }
206    
207            public List<Address> getClusterNodeAddresses() {
208                    if (!isEnabled()) {
209                            return Collections.emptyList();
210                    }
211    
212                    return getAddresses(_controlJChannel);
213            }
214    
215            public List<ClusterNode> getClusterNodes() {
216                    if (!isEnabled()) {
217                            return Collections.emptyList();
218                    }
219    
220                    return new ArrayList<ClusterNode>(_liveInstances.values());
221            }
222    
223            public ClusterNode getLocalClusterNode() {
224                    if (!isEnabled()) {
225                            return null;
226                    }
227    
228                    return _localClusterNode;
229            }
230    
231            public Address getLocalClusterNodeAddress() {
232                    if (!isEnabled()) {
233                            return null;
234                    }
235    
236                    return _localAddress;
237            }
238    
239            public void initialize() {
240                    if (!isEnabled()) {
241                            return;
242                    }
243    
244                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
245                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
246    
247                    PortalUtil.addPortalPortEventListener(this);
248    
249                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
250    
251                    try {
252                            initLocalClusterNode();
253    
254                            memberJoined(_localAddress, _localClusterNode);
255    
256                            sendNotifyRequest();
257                    }
258                    catch (Exception e) {
259                            _log.error("Unable to determine local network address", e);
260                    }
261    
262                    ClusterRequestReceiver clusterRequestReceiver =
263                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
264    
265                    clusterRequestReceiver.openLatch();
266            }
267    
268            public boolean isClusterNodeAlive(Address address) {
269                    if (!isEnabled()) {
270                            return false;
271                    }
272    
273                    List<Address> addresses = getAddresses(_controlJChannel);
274    
275                    return addresses.contains(address);
276            }
277    
278            public boolean isClusterNodeAlive(String clusterNodeId) {
279                    if (!isEnabled()) {
280                            return false;
281                    }
282    
283                    return _clusterNodeAddresses.containsKey(clusterNodeId);
284            }
285    
286            public void portalPortConfigured(int port) {
287                    if (!isEnabled() ||
288                            (_localClusterNode.getPort() ==
289                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
290    
291                            return;
292                    }
293    
294                    try {
295                            _localClusterNode.setPort(port);
296    
297                            memberJoined(_localAddress, _localClusterNode);
298    
299                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
300                                    ClusterMessageType.UPDATE, _localClusterNode);
301    
302                            _controlJChannel.send(null, clusterRequest);
303                    }
304                    catch (Exception e) {
305                            _log.error("Unable to determine configure node port", e);
306                    }
307            }
308    
309            public void removeClusterEventListener(
310                    ClusterEventListener clusterEventListener) {
311    
312                    if (!isEnabled()) {
313                            return;
314                    }
315    
316                    _clusterEventListeners.remove(clusterEventListener);
317            }
318    
319            public void setClusterEventListeners(
320                    List<ClusterEventListener> clusterEventListeners) {
321    
322                    if (!isEnabled()) {
323                            return;
324                    }
325    
326                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
327            }
328    
329            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
330                    if (!isEnabled()) {
331                            return;
332                    }
333    
334                    _shortcutLocalMethod = shortcutLocalMethod;
335            }
336    
337            protected void fireClusterEvent(ClusterEvent clusterEvent) {
338                    for (ClusterEventListener listener : _clusterEventListeners) {
339                            listener.processClusterEvent(clusterEvent);
340                    }
341            }
342    
343            protected ClusterNodeResponse generateClusterNodeResponse(
344                    ClusterRequest clusterRequest, Object returnValue,
345                    Exception exception) {
346    
347                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
348    
349                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
350                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
351                    clusterNodeResponse.setClusterMessageType(
352                            clusterRequest.getClusterMessageType());
353                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
354                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
355    
356                    if (exception != null) {
357                            clusterNodeResponse.setException(exception);
358                    }
359                    else {
360                            if (returnValue instanceof Serializable) {
361                                    clusterNodeResponse.setResult(returnValue);
362                            }
363                            else if (returnValue != null) {
364                                    clusterNodeResponse.setException(
365                                            new ClusterException("Return value is not serializable"));
366                            }
367                    }
368    
369                    return clusterNodeResponse;
370            }
371    
372            protected JChannel getControlChannel() {
373                    return _controlJChannel;
374            }
375    
376            protected FutureClusterResponses getExecutionResults(String uuid) {
377                    return _futureClusterResponses.get(uuid);
378            }
379    
380            @Override
381            protected void initChannels() throws Exception {
382                    Properties controlProperties = PropsUtil.getProperties(
383                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
384    
385                    String controlProperty = controlProperties.getProperty(
386                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
387    
388                    ClusterRequestReceiver clusterRequestReceiver =
389                            new ClusterRequestReceiver(this);
390    
391                    _controlJChannel = createJChannel(
392                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
393            }
394    
395            protected void initLocalClusterNode() throws Exception {
396                    InetAddress inetAddress = bindInetAddress;
397    
398                    if (inetAddress == null) {
399                            inetAddress = InetAddressUtil.getLocalInetAddress();
400                    }
401    
402                    ClusterNode localClusterNode = new ClusterNode(
403                            PortalUUIDUtil.generate(), inetAddress);
404    
405                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
406                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
407                    }
408                    else {
409                            localClusterNode.setPort(PortalUtil.getPortalPort(false));
410                    }
411    
412                    _localClusterNode = localClusterNode;
413            }
414    
415            protected boolean isShortcutLocalMethod() {
416                    return _shortcutLocalMethod;
417            }
418    
419            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
420                    _liveInstances.put(joinAddress, clusterNode);
421    
422                    Address previousAddress = _clusterNodeAddresses.put(
423                            clusterNode.getClusterNodeId(), joinAddress);
424    
425                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
426                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
427    
428                            // PLACEHOLDER
429    
430                            fireClusterEvent(clusterEvent);
431                    }
432            }
433    
434            protected void memberRemoved(List<Address> departAddresses) {
435                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
436    
437                    for (Address departAddress : departAddresses) {
438                            ClusterNode departClusterNode = _liveInstances.remove(
439                                    departAddress);
440    
441                            if (departClusterNode == null) {
442                                    continue;
443                            }
444    
445                            departClusterNodes.add(departClusterNode);
446    
447                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
448                    }
449    
450                    if (departClusterNodes.isEmpty()) {
451                            return;
452                    }
453    
454                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
455    
456                    fireClusterEvent(clusterEvent);
457            }
458    
459            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
460                    boolean isMulticast = clusterRequest.isMulticast();
461    
462                    List<Address> addresses = null;
463    
464                    if (isMulticast) {
465                            addresses = getAddresses(_controlJChannel);
466                    }
467                    else {
468                            addresses = new ArrayList<Address>();
469    
470                            Collection<Address> clusterNodeAddresses =
471                                    clusterRequest.getTargetClusterNodeAddresses();
472    
473                            if (clusterNodeAddresses != null) {
474                                    addresses.addAll(clusterNodeAddresses);
475                            }
476    
477                            Collection<String> clusterNodeIds =
478                                    clusterRequest.getTargetClusterNodeIds();
479    
480                            if (clusterNodeIds != null) {
481                                    for (String clusterNodeId : clusterNodeIds) {
482                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
483    
484                                            addresses.add(address);
485                                    }
486                            }
487                    }
488    
489                    if (clusterRequest.isSkipLocal()) {
490                            addresses.remove(getLocalClusterNodeAddress());
491                    }
492    
493                    return addresses;
494            }
495    
496            protected void runLocalMethod(
497                    ClusterRequest clusterRequest,
498                    FutureClusterResponses futureClusterResponses) {
499    
500                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
501    
502                    Object returnValue = null;
503                    Exception exception = null;
504    
505                    if (methodHandler == null) {
506                            exception = new ClusterException(
507                                    "Payload is not of type " + MethodHandler.class.getName());
508                    }
509                    else {
510                            try {
511                                    returnValue = methodHandler.invoke(true);
512                            }
513                            catch (Exception e) {
514                                    exception = e;
515                            }
516                    }
517    
518                    if (!clusterRequest.isFireAndForget()) {
519                            ClusterNodeResponse clusterNodeResponse =
520                                    generateClusterNodeResponse(
521                                            clusterRequest, returnValue, exception);
522    
523                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
524                    }
525            }
526    
527            protected void sendNotifyRequest() {
528                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
529                            ClusterMessageType.NOTIFY, _localClusterNode);
530    
531                    try {
532                            _controlJChannel.send(null, clusterRequest);
533                    }
534                    catch (Exception e) {
535                            _log.error("Unable to send notify message", e);
536                    }
537            }
538    
539            private static final String _DEFAULT_CLUSTER_NAME =
540                    "LIFERAY-CONTROL-CHANNEL";
541    
542            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
543    
544            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
545                    new CopyOnWriteArrayList<ClusterEventListener>();
546            private Map<String, Address> _clusterNodeAddresses =
547                    new ConcurrentHashMap<String, Address>();
548            private JChannel _controlJChannel;
549            private ExecutorService _executorService;
550            private Map<String, FutureClusterResponses> _futureClusterResponses =
551                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
552            private Map<Address, ClusterNode> _liveInstances =
553                    new ConcurrentHashMap<Address, ClusterNode>();
554            private Address _localAddress;
555            private ClusterNode _localClusterNode;
556            private boolean _shortcutLocalMethod;
557    
558            private class ClusterResponseCallbackJob implements Runnable {
559    
560                    public ClusterResponseCallbackJob(
561                            ClusterResponseCallback clusterResponseCallback,
562                            FutureClusterResponses futureClusterResponses) {
563    
564                            _clusterResponseCallback = clusterResponseCallback;
565                            _futureClusterResponses = futureClusterResponses;
566                            _timeout = -1;
567                            _timeoutGet = false;
568                            _timeUnit = TimeUnit.SECONDS;
569                    }
570    
571                    public ClusterResponseCallbackJob(
572                            ClusterResponseCallback clusterResponseCallback,
573                            FutureClusterResponses futureClusterResponses, long timeout,
574                            TimeUnit timeUnit) {
575    
576                            _clusterResponseCallback = clusterResponseCallback;
577                            _futureClusterResponses = futureClusterResponses;
578                            _timeout = timeout;
579                            _timeoutGet = true;
580                            _timeUnit = timeUnit;
581                    }
582    
583                    public void run() {
584                            BlockingQueue<ClusterNodeResponse> blockingQueue =
585                                    _futureClusterResponses.getPartialResults();
586    
587                            _clusterResponseCallback.callback(blockingQueue);
588    
589                            ClusterNodeResponses clusterNodeResponses = null;
590    
591                            try {
592                                    if (_timeoutGet) {
593                                            clusterNodeResponses = _futureClusterResponses.get(
594                                                    _timeout, _timeUnit);
595                                    }
596                                    else {
597                                            clusterNodeResponses = _futureClusterResponses.get();
598                                    }
599    
600                                    _clusterResponseCallback.callback(clusterNodeResponses);
601                            }
602                            catch (InterruptedException ie) {
603                                    _clusterResponseCallback.processInterruptedException(ie);
604                            }
605                            catch (TimeoutException te) {
606                                    _clusterResponseCallback.processTimeoutException(te);
607                            }
608                    }
609    
610                    private final ClusterResponseCallback _clusterResponseCallback;
611                    private final FutureClusterResponses _futureClusterResponses;
612                    private final long _timeout;
613                    private final boolean _timeoutGet;
614                    private final TimeUnit _timeUnit;
615    
616            }
617    
618    }