001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.concurrent.ConcurrentReferenceValueHashMap;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.memory.FinalizeManager;
034    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
035    import com.liferay.portal.kernel.util.CharPool;
036    import com.liferay.portal.kernel.util.GetterUtil;
037    import com.liferay.portal.kernel.util.Http;
038    import com.liferay.portal.kernel.util.MethodHandler;
039    import com.liferay.portal.kernel.util.PropsKeys;
040    import com.liferay.portal.kernel.util.StringUtil;
041    import com.liferay.portal.kernel.util.Validator;
042    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043    import com.liferay.portal.util.PortalInetSocketAddressEventListener;
044    import com.liferay.portal.util.PortalUtil;
045    import com.liferay.portal.util.PropsUtil;
046    import com.liferay.portal.util.PropsValues;
047    
048    import java.io.Serializable;
049    
050    import java.net.InetAddress;
051    import java.net.InetSocketAddress;
052    
053    import java.util.ArrayList;
054    import java.util.Collection;
055    import java.util.Collections;
056    import java.util.List;
057    import java.util.Map;
058    import java.util.Properties;
059    import java.util.concurrent.BlockingQueue;
060    import java.util.concurrent.ConcurrentHashMap;
061    import java.util.concurrent.CopyOnWriteArrayList;
062    import java.util.concurrent.ExecutorService;
063    
064    import org.jgroups.JChannel;
065    
066    /**
067     * @author Tina Tian
068     * @author Shuyang Zhou
069     */
070    @DoPrivileged
071    public class ClusterExecutorImpl
072            extends ClusterBase
073            implements ClusterExecutor, PortalInetSocketAddressEventListener {
074    
075            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
076                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
077    
078            @Override
079            public void addClusterEventListener(
080                    ClusterEventListener clusterEventListener) {
081    
082                    if (!isEnabled()) {
083                            return;
084                    }
085    
086                    _clusterEventListeners.addIfAbsent(clusterEventListener);
087            }
088    
089            @Override
090            public void destroy() {
091                    if (!isEnabled()) {
092                            return;
093                    }
094    
095                    PortalExecutorManagerUtil.shutdown(
096                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
097    
098                    _controlJChannel.setReceiver(null);
099    
100                    _controlJChannel.close();
101    
102                    _clusterEventListeners.clear();
103                    _clusterNodeAddresses.clear();
104                    _futureClusterResponses.clear();
105                    _liveInstances.clear();
106                    _localAddress = null;
107                    _localClusterNode = null;
108            }
109    
110            @Override
111            public FutureClusterResponses execute(ClusterRequest clusterRequest) {
112                    if (!isEnabled()) {
113                            return null;
114                    }
115    
116                    List<Address> addresses = prepareAddresses(clusterRequest);
117    
118                    FutureClusterResponses futureClusterResponses =
119                            new FutureClusterResponses(addresses);
120    
121                    if (!clusterRequest.isFireAndForget()) {
122                            String uuid = clusterRequest.getUuid();
123    
124                            _futureClusterResponses.put(uuid, futureClusterResponses);
125                    }
126    
127                    if (_shortcutLocalMethod &&
128                            addresses.remove(getLocalClusterNodeAddress())) {
129    
130                            runLocalMethod(clusterRequest, futureClusterResponses);
131                    }
132    
133                    if (clusterRequest.isMulticast()) {
134                            try {
135                                    _controlJChannel.send(null, clusterRequest);
136                            }
137                            catch (Exception e) {
138                                    throw new SystemException(
139                                            "Unable to send multicast request", e);
140                            }
141                    }
142                    else {
143                            for (Address address : addresses) {
144                                    org.jgroups.Address jGroupsAddress =
145                                            (org.jgroups.Address)address.getRealAddress();
146    
147                                    try {
148                                            _controlJChannel.send(jGroupsAddress, clusterRequest);
149                                    }
150                                    catch (Exception e) {
151                                            throw new SystemException(
152                                                    "Unable to send unicast request", e);
153                                    }
154                            }
155                    }
156    
157                    return futureClusterResponses;
158            }
159    
160            @Override
161            public FutureClusterResponses execute(
162                    ClusterRequest clusterRequest,
163                    ClusterResponseCallback clusterResponseCallback) {
164    
165                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
166    
167                    ClusterResponseCallbackJob clusterResponseCallbackJob =
168                            new ClusterResponseCallbackJob(
169                                    clusterResponseCallback, futureClusterResponses);
170    
171                    _executorService.execute(clusterResponseCallbackJob);
172    
173                    return futureClusterResponses;
174            }
175    
176            @Override
177            public List<ClusterEventListener> getClusterEventListeners() {
178                    if (!isEnabled()) {
179                            return Collections.emptyList();
180                    }
181    
182                    return Collections.unmodifiableList(_clusterEventListeners);
183            }
184    
185            @Override
186            public List<Address> getClusterNodeAddresses() {
187                    if (!isEnabled()) {
188                            return Collections.emptyList();
189                    }
190    
191                    return getAddresses(_controlJChannel);
192            }
193    
194            @Override
195            public List<ClusterNode> getClusterNodes() {
196                    if (!isEnabled()) {
197                            return Collections.emptyList();
198                    }
199    
200                    return new ArrayList<ClusterNode>(_liveInstances.values());
201            }
202    
203            @Override
204            public ClusterNode getLocalClusterNode() {
205                    if (!isEnabled()) {
206                            return null;
207                    }
208    
209                    return _localClusterNode;
210            }
211    
212            @Override
213            public Address getLocalClusterNodeAddress() {
214                    if (!isEnabled()) {
215                            return null;
216                    }
217    
218                    return _localAddress;
219            }
220    
221            @Override
222            public void initialize() {
223                    if (!isEnabled()) {
224                            return;
225                    }
226    
227                    _secure = StringUtil.equalsIgnoreCase(
228                            Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
229    
230                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
231                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
232    
233                    PortalUtil.addPortalInetSocketAddressEventListener(this);
234    
235                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
236                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
237                    }
238    
239                    if (PropsValues.LIVE_USERS_ENABLED) {
240                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
241                    }
242    
243                    try {
244                            initControlChannel();
245    
246                            _localAddress = new AddressImpl(_controlJChannel.getAddress());
247    
248                            initLocalClusterNode();
249    
250                            memberJoined(_localAddress, _localClusterNode);
251    
252                            sendNotifyRequest();
253    
254                            BaseReceiver baseReceiver =
255                                    (BaseReceiver)_controlJChannel.getReceiver();
256    
257                            baseReceiver.openLatch();
258                    }
259                    catch (Exception e) {
260                            if (_log.isErrorEnabled()) {
261                                    _log.error("Unable to initialize", e);
262                            }
263    
264                            throw new IllegalStateException(e);
265                    }
266            }
267    
268            @Override
269            public boolean isClusterNodeAlive(Address address) {
270                    if (!isEnabled()) {
271                            return false;
272                    }
273    
274                    List<Address> addresses = getAddresses(_controlJChannel);
275    
276                    return addresses.contains(address);
277            }
278    
279            @Override
280            public boolean isClusterNodeAlive(String clusterNodeId) {
281                    if (!isEnabled()) {
282                            return false;
283                    }
284    
285                    return _clusterNodeAddresses.containsKey(clusterNodeId);
286            }
287    
288            @Override
289            public void portalLocalInetSockAddressConfigured(
290                    InetSocketAddress inetSocketAddress) {
291    
292                    if (!isEnabled() ||
293                            (_localClusterNode.getPortalInetSocketAddress() != null)) {
294    
295                            return;
296                    }
297    
298                    try {
299                            _localClusterNode.setPortalInetSocketAddress(inetSocketAddress);
300    
301                            memberJoined(_localAddress, _localClusterNode);
302    
303                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
304                                    ClusterMessageType.UPDATE, _localClusterNode);
305    
306                            _controlJChannel.send(null, clusterRequest);
307                    }
308                    catch (Exception e) {
309                            _log.error("Unable to determine configure node port", e);
310                    }
311            }
312    
313            @Override
314            public void portalServerInetSocketAddressConfigured(
315                    InetSocketAddress inetSocketAddress) {
316            }
317    
318            @Override
319            public void removeClusterEventListener(
320                    ClusterEventListener clusterEventListener) {
321    
322                    if (!isEnabled()) {
323                            return;
324                    }
325    
326                    _clusterEventListeners.remove(clusterEventListener);
327            }
328    
329            public void setClusterEventListeners(
330                    List<ClusterEventListener> clusterEventListeners) {
331    
332                    if (!isEnabled()) {
333                            return;
334                    }
335    
336                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
337            }
338    
339            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
340                    if (!isEnabled()) {
341                            return;
342                    }
343    
344                    _shortcutLocalMethod = shortcutLocalMethod;
345            }
346    
347            protected void fireClusterEvent(ClusterEvent clusterEvent) {
348                    for (ClusterEventListener listener : _clusterEventListeners) {
349                            listener.processClusterEvent(clusterEvent);
350                    }
351            }
352    
353            protected ClusterNodeResponse generateClusterNodeResponse(
354                    ClusterRequest clusterRequest, Object returnValue,
355                    Exception exception) {
356    
357                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
358    
359                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
360                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
361                    clusterNodeResponse.setClusterMessageType(
362                            clusterRequest.getClusterMessageType());
363                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
364                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
365    
366                    if (exception != null) {
367                            clusterNodeResponse.setException(exception);
368                    }
369                    else {
370                            if (returnValue instanceof Serializable) {
371                                    clusterNodeResponse.setResult(returnValue);
372                            }
373                            else if (returnValue != null) {
374                                    clusterNodeResponse.setException(
375                                            new ClusterException("Return value is not serializable"));
376                            }
377                    }
378    
379                    return clusterNodeResponse;
380            }
381    
382            protected InetSocketAddress getConfiguredPortalInetSockAddress(
383                    boolean secure) {
384    
385                    InetSocketAddress inetSocketAddress = null;
386    
387                    String portalInetSocketAddressValue = null;
388    
389                    if (secure) {
390                            portalInetSocketAddressValue =
391                                    PropsValues.PORTAL_INSTANCE_HTTPS_INET_SOCKET_ADDRESS;
392                    }
393                    else {
394                            portalInetSocketAddressValue =
395                                    PropsValues.PORTAL_INSTANCE_HTTP_INET_SOCKET_ADDRESS;
396                    }
397    
398                    if (Validator.isNotNull(portalInetSocketAddressValue)) {
399                            String[] parts = StringUtil.split(
400                                    portalInetSocketAddressValue, CharPool.COLON);
401    
402                            if (parts.length == 2) {
403                                    try {
404                                            inetSocketAddress = new InetSocketAddress(
405                                                    InetAddress.getByName(parts[0]),
406                                                    GetterUtil.getIntegerStrict(parts[1]));
407                                    }
408                                    catch (Exception e) {
409                                            _log.error(
410                                                    "Unable to parse portal InetSocketAddress from " +
411                                                            portalInetSocketAddressValue,
412                                                    e);
413                                    }
414                            }
415                    }
416    
417                    return inetSocketAddress;
418            }
419    
420            protected JChannel getControlChannel() {
421                    return _controlJChannel;
422            }
423    
424            protected FutureClusterResponses getExecutionResults(String uuid) {
425                    return _futureClusterResponses.get(uuid);
426            }
427    
428            protected void initControlChannel() throws Exception {
429                    Properties controlProperties = PropsUtil.getProperties(
430                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
431    
432                    String controlProperty = controlProperties.getProperty(
433                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
434    
435                    ClusterRequestReceiver clusterRequestReceiver =
436                            new ClusterRequestReceiver(this);
437    
438                    _controlJChannel = createJChannel(
439                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
440            }
441    
442            protected void initLocalClusterNode() {
443                    InetAddress inetAddress = getBindInetAddress(_controlJChannel);
444    
445                    ClusterNode clusterNode = new ClusterNode(
446                            PortalUUIDUtil.generate(), inetAddress);
447    
448                    InetSocketAddress inetSocketAddress =
449                            getConfiguredPortalInetSockAddress(_secure);
450    
451                    if (inetSocketAddress != null) {
452                            clusterNode.setPortalInetSocketAddress(inetSocketAddress);
453                    }
454    
455                    _localClusterNode = clusterNode;
456            }
457    
458            protected boolean isShortcutLocalMethod() {
459                    return _shortcutLocalMethod;
460            }
461    
462            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
463                    _liveInstances.put(joinAddress, clusterNode);
464    
465                    Address previousAddress = _clusterNodeAddresses.put(
466                            clusterNode.getClusterNodeId(), joinAddress);
467    
468                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
469                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
470    
471                            // PLACEHOLDER
472    
473                            fireClusterEvent(clusterEvent);
474                    }
475            }
476    
477            protected void memberRemoved(List<Address> departAddresses) {
478                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
479    
480                    for (Address departAddress : departAddresses) {
481                            ClusterNode departClusterNode = _liveInstances.remove(
482                                    departAddress);
483    
484                            if (departClusterNode == null) {
485                                    continue;
486                            }
487    
488                            departClusterNodes.add(departClusterNode);
489    
490                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
491                    }
492    
493                    if (departClusterNodes.isEmpty()) {
494                            return;
495                    }
496    
497                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
498    
499                    fireClusterEvent(clusterEvent);
500            }
501    
502            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
503                    boolean isMulticast = clusterRequest.isMulticast();
504    
505                    List<Address> addresses = null;
506    
507                    if (isMulticast) {
508                            addresses = getAddresses(_controlJChannel);
509                    }
510                    else {
511                            addresses = new ArrayList<Address>();
512    
513                            Collection<Address> clusterNodeAddresses =
514                                    clusterRequest.getTargetClusterNodeAddresses();
515    
516                            if (clusterNodeAddresses != null) {
517                                    addresses.addAll(clusterNodeAddresses);
518                            }
519    
520                            Collection<String> clusterNodeIds =
521                                    clusterRequest.getTargetClusterNodeIds();
522    
523                            if (clusterNodeIds != null) {
524                                    for (String clusterNodeId : clusterNodeIds) {
525                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
526    
527                                            addresses.add(address);
528                                    }
529                            }
530                    }
531    
532                    if (clusterRequest.isSkipLocal()) {
533                            addresses.remove(getLocalClusterNodeAddress());
534                    }
535    
536                    return addresses;
537            }
538    
539            protected void runLocalMethod(
540                    ClusterRequest clusterRequest,
541                    FutureClusterResponses futureClusterResponses) {
542    
543                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
544    
545                    Object returnValue = null;
546                    Exception exception = null;
547    
548                    if (methodHandler == null) {
549                            exception = new ClusterException(
550                                    "Payload is not of type " + MethodHandler.class.getName());
551                    }
552                    else {
553                            try {
554                                    returnValue = methodHandler.invoke();
555                            }
556                            catch (Exception e) {
557                                    exception = e;
558                            }
559                    }
560    
561                    if (!clusterRequest.isFireAndForget()) {
562                            ClusterNodeResponse clusterNodeResponse =
563                                    generateClusterNodeResponse(
564                                            clusterRequest, returnValue, exception);
565    
566                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
567                    }
568            }
569    
570            protected void sendNotifyRequest() {
571                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
572                            ClusterMessageType.NOTIFY, _localClusterNode);
573    
574                    try {
575                            _controlJChannel.send(null, clusterRequest);
576                    }
577                    catch (Exception e) {
578                            _log.error("Unable to send notify message", e);
579                    }
580            }
581    
582            private static final String _DEFAULT_CLUSTER_NAME =
583                    "LIFERAY-CONTROL-CHANNEL";
584    
585            private static final Log _log = LogFactoryUtil.getLog(
586                    ClusterExecutorImpl.class);
587    
588            private final CopyOnWriteArrayList<ClusterEventListener>
589                    _clusterEventListeners =
590                            new CopyOnWriteArrayList<ClusterEventListener>();
591            private final Map<String, Address> _clusterNodeAddresses =
592                    new ConcurrentHashMap<String, Address>();
593            private JChannel _controlJChannel;
594            private ExecutorService _executorService;
595            private Map<String, FutureClusterResponses> _futureClusterResponses =
596                    new ConcurrentReferenceValueHashMap<String, FutureClusterResponses>(
597                            FinalizeManager.WEAK_REFERENCE_FACTORY);
598            private final Map<Address, ClusterNode> _liveInstances =
599                    new ConcurrentHashMap<Address, ClusterNode>();
600            private Address _localAddress;
601            private ClusterNode _localClusterNode;
602            private boolean _secure;
603            private boolean _shortcutLocalMethod;
604    
605            private class ClusterResponseCallbackJob implements Runnable {
606    
607                    public ClusterResponseCallbackJob(
608                            ClusterResponseCallback clusterResponseCallback,
609                            FutureClusterResponses futureClusterResponses) {
610    
611                            _clusterResponseCallback = clusterResponseCallback;
612                            _futureClusterResponses = futureClusterResponses;
613                    }
614    
615                    @Override
616                    public void run() {
617                            BlockingQueue<ClusterNodeResponse> blockingQueue =
618                                    _futureClusterResponses.getPartialResults();
619    
620                            _clusterResponseCallback.callback(blockingQueue);
621                    }
622    
623                    private final ClusterResponseCallback _clusterResponseCallback;
624                    private final FutureClusterResponses _futureClusterResponses;
625    
626            }
627    
628    }