001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.util.InetAddressUtil;
034    import com.liferay.portal.kernel.util.MethodHandler;
035    import com.liferay.portal.kernel.util.PropsKeys;
036    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
037    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038    import com.liferay.portal.util.PortalPortEventListener;
039    import com.liferay.portal.util.PortalUtil;
040    import com.liferay.portal.util.PropsUtil;
041    import com.liferay.portal.util.PropsValues;
042    
043    import java.io.Serializable;
044    
045    import java.net.InetAddress;
046    
047    import java.util.ArrayList;
048    import java.util.Collection;
049    import java.util.Collections;
050    import java.util.List;
051    import java.util.Map;
052    import java.util.Properties;
053    import java.util.concurrent.BlockingQueue;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    import java.util.concurrent.ExecutorService;
057    import java.util.concurrent.TimeUnit;
058    import java.util.concurrent.TimeoutException;
059    
060    import org.jgroups.JChannel;
061    
062    /**
063     * @author Tina Tian
064     * @author Shuyang Zhou
065     */
066    public class ClusterExecutorImpl
067            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
068    
069            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
070                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
071    
072            public void addClusterEventListener(
073                    ClusterEventListener clusterEventListener) {
074    
075                    if (!isEnabled()) {
076                            return;
077                    }
078    
079                    _clusterEventListeners.addIfAbsent(clusterEventListener);
080            }
081    
082            @Override
083            public void afterPropertiesSet() {
084                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
085                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
086                    }
087    
088                    if (PropsValues.LIVE_USERS_ENABLED) {
089                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
090                    }
091    
092                    super.afterPropertiesSet();
093            }
094    
095            @Override
096            public void destroy() {
097                    if (!isEnabled()) {
098                            return;
099                    }
100    
101                    PortalExecutorManagerUtil.shutdown(
102                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
103    
104                    _controlJChannel.setReceiver(null);
105    
106                    _controlJChannel.close();
107    
108                    _clusterEventListeners.clear();
109                    _clusterNodeAddresses.clear();
110                    _futureClusterResponses.clear();
111                    _liveInstances.clear();
112                    _localAddress = null;
113                    _localClusterNode = null;
114            }
115    
116            public FutureClusterResponses execute(ClusterRequest clusterRequest)
117                    throws SystemException {
118    
119                    if (!isEnabled()) {
120                            return null;
121                    }
122    
123                    List<Address> addresses = prepareAddresses(clusterRequest);
124    
125                    FutureClusterResponses futureClusterResponses =
126                            new FutureClusterResponses(addresses);
127    
128                    if (!clusterRequest.isFireAndForget()) {
129                            String uuid = clusterRequest.getUuid();
130    
131                            _futureClusterResponses.put(uuid, futureClusterResponses);
132                    }
133    
134                    if (_shortcutLocalMethod &&
135                            addresses.remove(getLocalClusterNodeAddress())) {
136    
137                            runLocalMethod(clusterRequest, futureClusterResponses);
138                    }
139    
140                    if (clusterRequest.isMulticast()) {
141                            try {
142                                    _controlJChannel.send(null, clusterRequest);
143                            }
144                            catch (Exception e) {
145                                    throw new SystemException(
146                                            "Unable to send multicast request", e);
147                            }
148                    }
149                    else {
150                            for (Address address : addresses) {
151                                    org.jgroups.Address jGroupsAddress =
152                                            (org.jgroups.Address)address.getRealAddress();
153    
154                                    try {
155                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
156                                    }
157                                    catch (Exception e) {
158                                            throw new SystemException(
159                                                    "Unable to send unicast request", e);
160                                    }
161                            }
162                    }
163    
164                    return futureClusterResponses;
165            }
166    
167            public void execute(
168                            ClusterRequest clusterRequest,
169                            ClusterResponseCallback clusterResponseCallback)
170                    throws SystemException {
171    
172                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
173    
174                    ClusterResponseCallbackJob clusterResponseCallbackJob =
175                            new ClusterResponseCallbackJob(
176                                    clusterResponseCallback, futureClusterResponses);
177    
178                    _executorService.execute(clusterResponseCallbackJob);
179            }
180    
181            public void execute(
182                            ClusterRequest clusterRequest,
183                            ClusterResponseCallback clusterResponseCallback, long timeout,
184                            TimeUnit timeUnit)
185                    throws SystemException {
186    
187                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
188    
189                    ClusterResponseCallbackJob clusterResponseCallbackJob =
190                            new ClusterResponseCallbackJob(
191                                    clusterResponseCallback, futureClusterResponses, timeout,
192                                    timeUnit);
193    
194                    _executorService.execute(clusterResponseCallbackJob);
195            }
196    
197            public List<ClusterEventListener> getClusterEventListeners() {
198                    if (!isEnabled()) {
199                            return Collections.emptyList();
200                    }
201    
202                    return Collections.unmodifiableList(_clusterEventListeners);
203            }
204    
205            public List<Address> getClusterNodeAddresses() {
206                    if (!isEnabled()) {
207                            return Collections.emptyList();
208                    }
209    
210                    return getAddresses(_controlJChannel);
211            }
212    
213            public List<ClusterNode> getClusterNodes() {
214                    if (!isEnabled()) {
215                            return Collections.emptyList();
216                    }
217    
218                    return new ArrayList<ClusterNode>(_liveInstances.values());
219            }
220    
221            public ClusterNode getLocalClusterNode() {
222                    if (!isEnabled()) {
223                            return null;
224                    }
225    
226                    return _localClusterNode;
227            }
228    
229            public Address getLocalClusterNodeAddress() {
230                    if (!isEnabled()) {
231                            return null;
232                    }
233    
234                    return _localAddress;
235            }
236    
237            public void initialize() {
238                    if (!isEnabled()) {
239                            return;
240                    }
241    
242                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
243                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
244    
245                    PortalUtil.addPortalPortEventListener(this);
246    
247                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
248    
249                    try {
250                            initLocalClusterNode();
251    
252                            memberJoined(_localAddress, _localClusterNode);
253    
254                            sendNotifyRequest();
255                    }
256                    catch (Exception e) {
257                            _log.error("Unable to determine local network address", e);
258                    }
259    
260                    ClusterRequestReceiver clusterRequestReceiver =
261                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
262    
263                    clusterRequestReceiver.openLatch();
264            }
265    
266            public boolean isClusterNodeAlive(Address address) {
267                    if (!isEnabled()) {
268                            return false;
269                    }
270    
271                    List<Address> addresses = getAddresses(_controlJChannel);
272    
273                    return addresses.contains(address);
274            }
275    
276            public boolean isClusterNodeAlive(String clusterNodeId) {
277                    if (!isEnabled()) {
278                            return false;
279                    }
280    
281                    return _clusterNodeAddresses.containsKey(clusterNodeId);
282            }
283    
284            public void portalPortConfigured(int port) {
285                    if (!isEnabled() ||
286                            (_localClusterNode.getPort() ==
287                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
288    
289                            return;
290                    }
291    
292                    try {
293                            _localClusterNode.setPort(port);
294    
295                            memberJoined(_localAddress, _localClusterNode);
296    
297                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
298                                    ClusterMessageType.UPDATE, _localClusterNode);
299    
300                            _controlJChannel.send(null, clusterRequest);
301                    }
302                    catch (Exception e) {
303                            _log.error("Unable to determine configure node port", e);
304                    }
305            }
306    
307            public void removeClusterEventListener(
308                    ClusterEventListener clusterEventListener) {
309    
310                    if (!isEnabled()) {
311                            return;
312                    }
313    
314                    _clusterEventListeners.remove(clusterEventListener);
315            }
316    
317            public void setClusterEventListeners(
318                    List<ClusterEventListener> clusterEventListeners) {
319    
320                    if (!isEnabled()) {
321                            return;
322                    }
323    
324                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
325            }
326    
327            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
328                    if (!isEnabled()) {
329                            return;
330                    }
331    
332                    _shortcutLocalMethod = shortcutLocalMethod;
333            }
334    
335            protected void fireClusterEvent(ClusterEvent clusterEvent) {
336                    for (ClusterEventListener listener : _clusterEventListeners) {
337                            listener.processClusterEvent(clusterEvent);
338                    }
339            }
340    
341            protected ClusterNodeResponse generateClusterNodeResponse(
342                    ClusterRequest clusterRequest, Object returnValue,
343                    Exception exception) {
344    
345                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
346    
347                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
348                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
349                    clusterNodeResponse.setClusterMessageType(
350                            clusterRequest.getClusterMessageType());
351                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
352                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
353    
354                    if (exception != null) {
355                            clusterNodeResponse.setException(exception);
356                    }
357                    else {
358                            if (returnValue instanceof Serializable) {
359                                    clusterNodeResponse.setResult(returnValue);
360                            }
361                            else if (returnValue != null) {
362                                    clusterNodeResponse.setException(
363                                            new ClusterException("Return value is not serializable"));
364                            }
365                    }
366    
367                    return clusterNodeResponse;
368            }
369    
370            protected JChannel getControlChannel() {
371                    return _controlJChannel;
372            }
373    
374            protected FutureClusterResponses getExecutionResults(String uuid) {
375                    return _futureClusterResponses.get(uuid);
376            }
377    
378            @Override
379            protected void initChannels() throws Exception {
380                    Properties controlProperties = PropsUtil.getProperties(
381                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
382    
383                    String controlProperty = controlProperties.getProperty(
384                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
385    
386                    ClusterRequestReceiver clusterRequestReceiver =
387                            new ClusterRequestReceiver(this);
388    
389                    _controlJChannel = createJChannel(
390                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
391            }
392    
393            protected void initLocalClusterNode() throws Exception {
394                    InetAddress inetAddress = bindInetAddress;
395    
396                    if (inetAddress == null) {
397                            inetAddress = InetAddressUtil.getLocalInetAddress();
398                    }
399    
400                    ClusterNode localClusterNode = new ClusterNode(
401                            PortalUUIDUtil.generate(), inetAddress);
402    
403                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
404                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
405                    }
406                    else {
407                            localClusterNode.setPort(PortalUtil.getPortalPort(false));
408                    }
409    
410                    _localClusterNode = localClusterNode;
411            }
412    
413            protected boolean isShortcutLocalMethod() {
414                    return _shortcutLocalMethod;
415            }
416    
417            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
418                    _liveInstances.put(joinAddress, clusterNode);
419    
420                    Address previousAddress = _clusterNodeAddresses.put(
421                            clusterNode.getClusterNodeId(), joinAddress);
422    
423                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
424                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
425    
426                            // PLACEHOLDER
427    
428                            fireClusterEvent(clusterEvent);
429                    }
430            }
431    
432            protected void memberRemoved(List<Address> departAddresses) {
433                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
434    
435                    for (Address departAddress : departAddresses) {
436                            ClusterNode departClusterNode = _liveInstances.remove(
437                                    departAddress);
438    
439                            if (departClusterNode == null) {
440                                    continue;
441                            }
442    
443                            departClusterNodes.add(departClusterNode);
444    
445                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
446                    }
447    
448                    if (departClusterNodes.isEmpty()) {
449                            return;
450                    }
451    
452                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
453    
454                    fireClusterEvent(clusterEvent);
455            }
456    
457            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
458                    boolean isMulticast = clusterRequest.isMulticast();
459    
460                    List<Address> addresses = null;
461    
462                    if (isMulticast) {
463                            addresses = getAddresses(_controlJChannel);
464                    }
465                    else {
466                            addresses = new ArrayList<Address>();
467    
468                            Collection<Address> clusterNodeAddresses =
469                                    clusterRequest.getTargetClusterNodeAddresses();
470    
471                            if (clusterNodeAddresses != null) {
472                                    addresses.addAll(clusterNodeAddresses);
473                            }
474    
475                            Collection<String> clusterNodeIds =
476                                    clusterRequest.getTargetClusterNodeIds();
477    
478                            if (clusterNodeIds != null) {
479                                    for (String clusterNodeId : clusterNodeIds) {
480                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
481    
482                                            addresses.add(address);
483                                    }
484                            }
485                    }
486    
487                    if (clusterRequest.isSkipLocal()) {
488                            addresses.remove(getLocalClusterNodeAddress());
489                    }
490    
491                    return addresses;
492            }
493    
494            protected void runLocalMethod(
495                    ClusterRequest clusterRequest,
496                    FutureClusterResponses futureClusterResponses) {
497    
498                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
499    
500                    Object returnValue = null;
501                    Exception exception = null;
502    
503                    if (methodHandler == null) {
504                            exception = new ClusterException(
505                                    "Payload is not of type " + MethodHandler.class.getName());
506                    }
507                    else {
508                            try {
509                                    returnValue = methodHandler.invoke(true);
510                            }
511                            catch (Exception e) {
512                                    exception = e;
513                            }
514                    }
515    
516                    if (!clusterRequest.isFireAndForget()) {
517                            ClusterNodeResponse clusterNodeResponse =
518                                    generateClusterNodeResponse(
519                                            clusterRequest, returnValue, exception);
520    
521                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
522                    }
523            }
524    
525            protected void sendNotifyRequest() {
526                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
527                            ClusterMessageType.NOTIFY, _localClusterNode);
528    
529                    try {
530                            _controlJChannel.send(null, clusterRequest);
531                    }
532                    catch (Exception e) {
533                            _log.error("Unable to send notify message", e);
534                    }
535            }
536    
537            private static final String _DEFAULT_CLUSTER_NAME =
538                    "LIFERAY-CONTROL-CHANNEL";
539    
540            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
541    
542            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
543                    new CopyOnWriteArrayList<ClusterEventListener>();
544            private Map<String, Address> _clusterNodeAddresses =
545                    new ConcurrentHashMap<String, Address>();
546            private JChannel _controlJChannel;
547            private ExecutorService _executorService;
548            private Map<String, FutureClusterResponses> _futureClusterResponses =
549                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
550            private Map<Address, ClusterNode> _liveInstances =
551                    new ConcurrentHashMap<Address, ClusterNode>();
552            private Address _localAddress;
553            private ClusterNode _localClusterNode;
554            private boolean _shortcutLocalMethod;
555    
556            private class ClusterResponseCallbackJob implements Runnable {
557    
558                    public ClusterResponseCallbackJob(
559                            ClusterResponseCallback clusterResponseCallback,
560                            FutureClusterResponses futureClusterResponses) {
561    
562                            _clusterResponseCallback = clusterResponseCallback;
563                            _futureClusterResponses = futureClusterResponses;
564                            _timeout = -1;
565                            _timeoutGet = false;
566                            _timeUnit = TimeUnit.SECONDS;
567                    }
568    
569                    public ClusterResponseCallbackJob(
570                            ClusterResponseCallback clusterResponseCallback,
571                            FutureClusterResponses futureClusterResponses, long timeout,
572                            TimeUnit timeUnit) {
573    
574                            _clusterResponseCallback = clusterResponseCallback;
575                            _futureClusterResponses = futureClusterResponses;
576                            _timeout = timeout;
577                            _timeoutGet = true;
578                            _timeUnit = timeUnit;
579                    }
580    
581                    public void run() {
582                            BlockingQueue<ClusterNodeResponse> blockingQueue =
583                                    _futureClusterResponses.getPartialResults();
584    
585                            _clusterResponseCallback.callback(blockingQueue);
586    
587                            ClusterNodeResponses clusterNodeResponses = null;
588    
589                            try {
590                                    if (_timeoutGet) {
591                                            clusterNodeResponses = _futureClusterResponses.get(
592                                                    _timeout, _timeUnit);
593                                    }
594                                    else {
595                                            clusterNodeResponses = _futureClusterResponses.get();
596                                    }
597    
598                                    _clusterResponseCallback.callback(clusterNodeResponses);
599                            }
600                            catch (InterruptedException ie) {
601                                    _clusterResponseCallback.processInterruptedException(ie);
602                            }
603                            catch (TimeoutException te) {
604                                    _clusterResponseCallback.processTimeoutException(te);
605                            }
606                    }
607    
608                    private final ClusterResponseCallback _clusterResponseCallback;
609                    private final FutureClusterResponses _futureClusterResponses;
610                    private final long _timeout;
611                    private final boolean _timeoutGet;
612                    private final TimeUnit _timeUnit;
613    
614            }
615    
616    }