001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.memory.FinalizeAction;
031    import com.liferay.portal.kernel.memory.FinalizeManager;
032    import com.liferay.portal.kernel.util.InetAddressUtil;
033    import com.liferay.portal.kernel.util.MethodInvoker;
034    import com.liferay.portal.kernel.util.MethodWrapper;
035    import com.liferay.portal.kernel.util.PropsKeys;
036    import com.liferay.portal.kernel.util.PropsUtil;
037    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038    import com.liferay.portal.util.PortalPortEventListener;
039    import com.liferay.portal.util.PortalUtil;
040    import com.liferay.portal.util.PropsValues;
041    
042    import java.io.Serializable;
043    
044    import java.lang.ref.Reference;
045    
046    import java.net.InetAddress;
047    
048    import java.util.ArrayList;
049    import java.util.Collection;
050    import java.util.Collections;
051    import java.util.List;
052    import java.util.Map;
053    import java.util.Properties;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    
057    import org.jgroups.ChannelException;
058    import org.jgroups.JChannel;
059    
060    /**
061     * @author Tina Tian
062     * @author Shuyang Zhou
063     */
064    public class ClusterExecutorImpl
065            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
066    
067            public void addClusterEventListener(
068                    ClusterEventListener clusterEventListener) {
069                    if (!isEnabled()) {
070                            return;
071                    }
072    
073                    _clusterEventListeners.addIfAbsent(clusterEventListener);
074            }
075    
076            public void afterPropertiesSet() {
077                    super.afterPropertiesSet();
078    
079                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
080                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
081                    }
082            }
083    
084            public void destroy() {
085                    if (!isEnabled()) {
086                            return;
087                    }
088    
089                    _controlChannel.close();
090            }
091    
092            public FutureClusterResponses execute(ClusterRequest clusterRequest)
093                    throws SystemException {
094    
095                    if (!isEnabled()) {
096                            return null;
097                    }
098    
099                    List<Address> addresses = prepareAddresses(clusterRequest);
100    
101                    FutureClusterResponses futureClusterResponses =
102                            new FutureClusterResponses(addresses);
103    
104                    if (!clusterRequest.isFireAndForget()) {
105                            String uuid = clusterRequest.getUuid();
106    
107                            Reference<FutureClusterResponses> reference =
108                                    FinalizeManager.register(
109                                            futureClusterResponses,
110                                            new RemoveResultKeyFinalizeAction(uuid));
111    
112                            _executionResultMap.put(uuid, reference);
113                    }
114    
115                    if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
116                            addresses.remove(getLocalControlAddress())) {
117    
118                            ClusterNodeResponse clusterNodeResponse = runLocalMethod(
119                                    clusterRequest.getMethodWrapper());
120    
121                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
122                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
123    
124                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
125                    }
126    
127                    if (clusterRequest.isMulticast()) {
128                            sendMulticastRequest(clusterRequest);
129                    }
130                    else {
131                            sendUnicastRequest(clusterRequest, addresses);
132                    }
133    
134                    return futureClusterResponses;
135            }
136    
137            public List<ClusterEventListener> getClusterEventListeners() {
138                    if (!isEnabled()) {
139                            return Collections.EMPTY_LIST;
140                    }
141    
142                    return Collections.unmodifiableList(_clusterEventListeners);
143            }
144    
145            public List<ClusterNode> getClusterNodes() {
146                    if (!isEnabled()) {
147                            return Collections.EMPTY_LIST;
148                    }
149    
150                    return new ArrayList<ClusterNode>(_addressMap.values());
151            }
152    
153            public ClusterNode getLocalClusterNode() throws SystemException {
154                    if (!isEnabled()) {
155                            return null;
156                    }
157    
158                    ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
159    
160                    if (clusterNode == null) {
161                            _localClusterNodeId = PortalUUIDUtil.generate();
162    
163                            clusterNode = new ClusterNode(_localClusterNodeId);
164    
165                            clusterNode.setPort(PortalUtil.getPortalPort());
166    
167                            try {
168                                    InetAddress inetAddress = bindInetAddress;
169    
170                                    if (inetAddress == null) {
171                                            inetAddress = InetAddressUtil.getLocalInetAddress();
172                                    }
173    
174                                    clusterNode.setInetAddress(inetAddress);
175    
176                                    clusterNode.setHostName(inetAddress.getHostName());
177                            }
178                            catch (Exception e) {
179                                    throw new SystemException(
180                                            "Unable to determine local network address", e);
181                            }
182                    }
183    
184                    return clusterNode;
185            }
186    
187            public void initialize() {
188                    if (!isEnabled()) {
189                            return;
190                    }
191    
192                    try {
193                            PortalUtil.addPortalPortEventListener(this);
194    
195                            ClusterNode clusterNode = getLocalClusterNode();
196    
197                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
198                                    ClusterMessageType.NOTIFY, clusterNode);
199    
200                            _controlChannel.send(null, null, clusterRequest);
201                    }
202                    catch (ChannelException ce) {
203                            _log.error("Unable to send multicast message ", ce);
204                    }
205                    catch (SystemException se) {
206                            _log.error("Unable to determine local network address", se);
207                    }
208            }
209    
210            public boolean isClusterNodeAlive(String clusterNodeId) {
211                    if (!isEnabled()) {
212                            return false;
213                    }
214    
215                    return _clusterNodeIdMap.containsKey(clusterNodeId);
216            }
217    
218            public boolean isEnabled() {
219                    return PropsValues.CLUSTER_LINK_ENABLED;
220            }
221    
222            public void portalPortConfigured(int port) {
223                    if (!isEnabled()) {
224                            return;
225                    }
226    
227                    try {
228                            ClusterNode clusterNode = getLocalClusterNode();
229    
230                            clusterNode.setPort(port);
231    
232                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
233                                    ClusterMessageType.UPDATE, clusterNode);
234    
235                            _controlChannel.send(null, null, clusterRequest);
236                    }
237                    catch (Exception e) {
238                            if (_log.isErrorEnabled()) {
239                                    _log.error("Unable to determine configure node port", e);
240                            }
241                    }
242            }
243    
244            public void removeClusterEventListener(
245                    ClusterEventListener clusterEventListener) {
246    
247                    if (!isEnabled()) {
248                            return;
249                    }
250    
251                    _clusterEventListeners.remove(clusterEventListener);
252            }
253    
254            public void setClusterEventListeners(
255                    List<ClusterEventListener> clusterEventListeners) {
256    
257                    if (!isEnabled()) {
258                            return;
259                    }
260    
261                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
262            }
263    
264            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
265                    if (!isEnabled()) {
266                            return;
267                    }
268    
269                    _shortcutLocalMethod = shortcutLocalMethod;
270            }
271    
272            protected void fireClusterEvent(ClusterEvent clusterEvent) {
273                    for (ClusterEventListener listener : _clusterEventListeners) {
274                            listener.processClusterEvent(clusterEvent);
275                    }
276            }
277    
278            protected JChannel getControlChannel() {
279                    return _controlChannel;
280            }
281    
282            protected FutureClusterResponses getExecutionResults(String uuid) {
283                    Reference<FutureClusterResponses> reference = _executionResultMap.get(
284                            uuid);
285    
286                    if (reference != null) {
287                            return reference.get();
288                    }
289                    else {
290                            return null;
291                    }
292            }
293    
294            protected Address getLocalControlAddress() {
295                    return new AddressImpl(_controlChannel.getLocalAddress());
296            }
297    
298            protected void initChannels() {
299                    Properties controlProperties = PropsUtil.getProperties(
300                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
301    
302                    String controlProperty = controlProperties.getProperty(
303                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
304    
305                    ClusterRequestReceiver clusterInvokeReceiver =
306                            new ClusterRequestReceiver(this);
307    
308                    try {
309                            _controlChannel = createChannel(
310                                    controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
311                    }
312                    catch (ChannelException ce) {
313                            _log.error(ce, ce);
314                    }
315                    catch (Exception e) {
316                            _log.error(e, e);
317                    }
318            }
319    
320            protected boolean isShortcutLocalMethod() {
321                    return _shortcutLocalMethod;
322            }
323    
324            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
325                    _addressMap.put(joinAddress, clusterNode);
326    
327                    Address previousAddress = _clusterNodeIdMap.put(
328                            clusterNode.getClusterNodeId(), joinAddress);
329    
330                    if ((previousAddress == null) &&
331                            !getLocalControlAddress().equals(joinAddress)) {
332    
333                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
334    
335                            fireClusterEvent(clusterEvent);
336                    }
337            }
338    
339            protected void memberRemoved(List<Address> departAddresses) {
340                    List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
341    
342                    for (Address departAddress : departAddresses) {
343                            ClusterNode departingClusterNode = _addressMap.remove(
344                                    departAddress);
345                            if (departingClusterNode != null) {
346                                    departingClusterNodes.add(departingClusterNode);
347    
348                                    _clusterNodeIdMap.remove(
349                                            departingClusterNode.getClusterNodeId());
350                            }
351                    }
352    
353                    if (departingClusterNodes.isEmpty()) {
354                            return;
355                    }
356    
357                    ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
358    
359                    fireClusterEvent(clusterEvent);
360            }
361    
362            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
363                    boolean isMulticast = clusterRequest.isMulticast();
364    
365                    List<Address> addresses = null;
366    
367                    if (isMulticast) {
368                            addresses = getAddresses(_controlChannel);
369                    }
370                    else {
371                            Collection<String> clusterNodeIds =
372                                    clusterRequest.getTargetClusterNodeIds();
373    
374                            addresses = new ArrayList<Address>(clusterNodeIds.size());
375    
376                            for (String clusterNodeId : clusterNodeIds) {
377                                    Address address = _clusterNodeIdMap.get(clusterNodeId);
378    
379                                    addresses.add(address);
380                            }
381                    }
382    
383                    return addresses;
384            }
385    
386            protected ClusterNodeResponse runLocalMethod(MethodWrapper methodWrapper)
387                    throws SystemException {
388    
389                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
390    
391                    ClusterNode localClusterNode = getLocalClusterNode();
392    
393                    clusterNodeResponse.setClusterNode(localClusterNode);
394                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
395    
396                    if (methodWrapper == null) {
397                            clusterNodeResponse.setException(
398                                    new ClusterException(
399                                            "Payload is not of type " + MethodWrapper.class.getName()));
400    
401                            return clusterNodeResponse;
402                    }
403    
404                    try {
405                            Object returnValue = MethodInvoker.invoke(methodWrapper);
406    
407                            if (returnValue instanceof Serializable) {
408                                    clusterNodeResponse.setResult(returnValue);
409                            }
410                            else if (returnValue != null) {
411                                    clusterNodeResponse.setException(
412                                            new ClusterException("Return value is not serializable"));
413                            }
414                    }
415                    catch (Exception e) {
416                            clusterNodeResponse.setException(e);
417                    }
418    
419                    return clusterNodeResponse;
420            }
421    
422            protected void sendMulticastRequest(ClusterRequest clusterRequest)
423                    throws SystemException {
424    
425                    try {
426                            _controlChannel.send(null, null, clusterRequest);
427                    }
428                    catch (ChannelException ce) {
429                            _log.error(
430                                    "Unable to send multicast message " + clusterRequest, ce);
431    
432                            throw new SystemException(
433                                    "Unable to send multicast request", ce);
434                    }
435            }
436    
437            protected void sendUnicastRequest(
438                            ClusterRequest clusterRequest, List<Address> addresses)
439                    throws SystemException {
440    
441                    for (Address address : addresses) {
442                            org.jgroups.Address jGroupsAddress =
443                                    (org.jgroups.Address)address.getRealAddress();
444    
445                            try {
446                                    _controlChannel.send(jGroupsAddress, null, clusterRequest);
447                            }
448                            catch (ChannelException ce) {
449                                    _log.error(
450                                            "Unable to send unicast message " + clusterRequest, ce);
451    
452                                    throw new SystemException(
453                                            "Unable to send unicast request", ce);
454                            }
455                    }
456            }
457    
458            private static final String _DEFAULT_CLUSTER_NAME =
459                    "LIFERAY-CONTROL-CHANNEL";
460    
461            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
462    
463            private Map<Address, ClusterNode> _addressMap =
464                    new ConcurrentHashMap<Address, ClusterNode>();
465            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
466                    new CopyOnWriteArrayList<ClusterEventListener>();
467            private Map<String, Address> _clusterNodeIdMap =
468                    new ConcurrentHashMap<String, Address>();
469            private JChannel _controlChannel;
470            private Map<String, Reference<FutureClusterResponses>> _executionResultMap =
471                    new ConcurrentHashMap<String, Reference<FutureClusterResponses>>();
472            private String _localClusterNodeId;
473            private boolean _shortcutLocalMethod;
474    
475            private class RemoveResultKeyFinalizeAction implements FinalizeAction {
476    
477                    private String _uuid;
478    
479                    public RemoveResultKeyFinalizeAction(String uuid) {
480                            _uuid = uuid;
481                    }
482    
483                    public void finalize() {
484                            _executionResultMap.remove(_uuid);
485                    }
486    
487            }
488    
489    }