001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.InetAddressUtil;
031    import com.liferay.portal.kernel.util.MethodInvoker;
032    import com.liferay.portal.kernel.util.MethodWrapper;
033    import com.liferay.portal.kernel.util.PropsKeys;
034    import com.liferay.portal.kernel.util.PropsUtil;
035    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
036    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
037    import com.liferay.portal.util.PortalPortEventListener;
038    import com.liferay.portal.util.PortalUtil;
039    import com.liferay.portal.util.PropsValues;
040    
041    import java.io.Serializable;
042    
043    import java.net.InetAddress;
044    
045    import java.util.ArrayList;
046    import java.util.Collection;
047    import java.util.Collections;
048    import java.util.List;
049    import java.util.Map;
050    import java.util.Properties;
051    import java.util.concurrent.ConcurrentHashMap;
052    import java.util.concurrent.CopyOnWriteArrayList;
053    
054    import org.jgroups.ChannelException;
055    import org.jgroups.JChannel;
056    
057    /**
058     * @author Tina Tian
059     * @author Shuyang Zhou
060     */
061    public class ClusterExecutorImpl
062            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
063    
064            public void addClusterEventListener(
065                    ClusterEventListener clusterEventListener) {
066                    if (!isEnabled()) {
067                            return;
068                    }
069    
070                    _clusterEventListeners.addIfAbsent(clusterEventListener);
071            }
072    
073            public void afterPropertiesSet() {
074                    super.afterPropertiesSet();
075    
076                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
077                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
078                    }
079            }
080    
081            public void destroy() {
082                    if (!isEnabled()) {
083                            return;
084                    }
085    
086                    _controlChannel.close();
087            }
088    
089            public FutureClusterResponses execute(ClusterRequest clusterRequest)
090                    throws SystemException {
091    
092                    if (!isEnabled()) {
093                            return null;
094                    }
095    
096                    List<Address> addresses = prepareAddresses(clusterRequest);
097    
098                    FutureClusterResponses futureClusterResponses =
099                            new FutureClusterResponses(addresses);
100    
101                    if (!clusterRequest.isFireAndForget()) {
102                            String uuid = clusterRequest.getUuid();
103    
104                            _executionResultMap.put(uuid, futureClusterResponses);
105                    }
106    
107                    if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
108                            addresses.remove(getLocalControlAddress())) {
109    
110                            ClusterNodeResponse clusterNodeResponse = runLocalMethod(
111                                    clusterRequest.getMethodWrapper());
112    
113                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
114                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
115    
116                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
117                    }
118    
119                    if (clusterRequest.isMulticast()) {
120                            sendMulticastRequest(clusterRequest);
121                    }
122                    else {
123                            sendUnicastRequest(clusterRequest, addresses);
124                    }
125    
126                    return futureClusterResponses;
127            }
128    
129            public List<ClusterEventListener> getClusterEventListeners() {
130                    if (!isEnabled()) {
131                            return Collections.EMPTY_LIST;
132                    }
133    
134                    return Collections.unmodifiableList(_clusterEventListeners);
135            }
136    
137            public List<ClusterNode> getClusterNodes() {
138                    if (!isEnabled()) {
139                            return Collections.EMPTY_LIST;
140                    }
141    
142                    return new ArrayList<ClusterNode>(_addressMap.values());
143            }
144    
145            public ClusterNode getLocalClusterNode() throws SystemException {
146                    if (!isEnabled()) {
147                            return null;
148                    }
149    
150                    ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
151    
152                    if (clusterNode == null) {
153                            _localClusterNodeId = PortalUUIDUtil.generate();
154    
155                            clusterNode = new ClusterNode(_localClusterNodeId);
156    
157                            clusterNode.setPort(PortalUtil.getPortalPort());
158    
159                            try {
160                                    InetAddress inetAddress = bindInetAddress;
161    
162                                    if (inetAddress == null) {
163                                            inetAddress = InetAddressUtil.getLocalInetAddress();
164                                    }
165    
166                                    clusterNode.setInetAddress(inetAddress);
167    
168                                    clusterNode.setHostName(inetAddress.getHostName());
169                            }
170                            catch (Exception e) {
171                                    throw new SystemException(
172                                            "Unable to determine local network address", e);
173                            }
174                    }
175    
176                    return clusterNode;
177            }
178    
179            public void initialize() {
180                    if (!isEnabled()) {
181                            return;
182                    }
183    
184                    try {
185                            PortalUtil.addPortalPortEventListener(this);
186    
187                            ClusterNode clusterNode = getLocalClusterNode();
188    
189                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
190                                    ClusterMessageType.NOTIFY, clusterNode);
191    
192                            _controlChannel.send(null, null, clusterRequest);
193                    }
194                    catch (ChannelException ce) {
195                            _log.error("Unable to send multicast message ", ce);
196                    }
197                    catch (SystemException se) {
198                            _log.error("Unable to determine local network address", se);
199                    }
200            }
201    
202            public boolean isClusterNodeAlive(String clusterNodeId) {
203                    if (!isEnabled()) {
204                            return false;
205                    }
206    
207                    return _clusterNodeIdMap.containsKey(clusterNodeId);
208            }
209    
210            public boolean isEnabled() {
211                    return PropsValues.CLUSTER_LINK_ENABLED;
212            }
213    
214            public void portalPortConfigured(int port) {
215                    if (!isEnabled()) {
216                            return;
217                    }
218    
219                    try {
220                            ClusterNode clusterNode = getLocalClusterNode();
221    
222                            clusterNode.setPort(port);
223    
224                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
225                                    ClusterMessageType.UPDATE, clusterNode);
226    
227                            _controlChannel.send(null, null, clusterRequest);
228                    }
229                    catch (Exception e) {
230                            if (_log.isErrorEnabled()) {
231                                    _log.error("Unable to determine configure node port", e);
232                            }
233                    }
234            }
235    
236            public void removeClusterEventListener(
237                    ClusterEventListener clusterEventListener) {
238    
239                    if (!isEnabled()) {
240                            return;
241                    }
242    
243                    _clusterEventListeners.remove(clusterEventListener);
244            }
245    
246            public void setClusterEventListeners(
247                    List<ClusterEventListener> clusterEventListeners) {
248    
249                    if (!isEnabled()) {
250                            return;
251                    }
252    
253                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
254            }
255    
256            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
257                    if (!isEnabled()) {
258                            return;
259                    }
260    
261                    _shortcutLocalMethod = shortcutLocalMethod;
262            }
263    
264            protected void fireClusterEvent(ClusterEvent clusterEvent) {
265                    for (ClusterEventListener listener : _clusterEventListeners) {
266                            listener.processClusterEvent(clusterEvent);
267                    }
268            }
269    
270            protected JChannel getControlChannel() {
271                    return _controlChannel;
272            }
273    
274            protected FutureClusterResponses getExecutionResults(String uuid) {
275                    return _executionResultMap.get(uuid);
276            }
277    
278            protected Address getLocalControlAddress() {
279                    return new AddressImpl(_controlChannel.getLocalAddress());
280            }
281    
282            protected void initChannels() {
283                    Properties controlProperties = PropsUtil.getProperties(
284                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
285    
286                    String controlProperty = controlProperties.getProperty(
287                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
288    
289                    ClusterRequestReceiver clusterInvokeReceiver =
290                            new ClusterRequestReceiver(this);
291    
292                    try {
293                            _controlChannel = createJChannel(
294                                    controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
295                    }
296                    catch (ChannelException ce) {
297                            _log.error(ce, ce);
298                    }
299                    catch (Exception e) {
300                            _log.error(e, e);
301                    }
302            }
303    
304            protected boolean isShortcutLocalMethod() {
305                    return _shortcutLocalMethod;
306            }
307    
308            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
309                    _addressMap.put(joinAddress, clusterNode);
310    
311                    Address previousAddress = _clusterNodeIdMap.put(
312                            clusterNode.getClusterNodeId(), joinAddress);
313    
314                    if ((previousAddress == null) &&
315                            !getLocalControlAddress().equals(joinAddress)) {
316    
317                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
318    
319                            fireClusterEvent(clusterEvent);
320                    }
321            }
322    
323            protected void memberRemoved(List<Address> departAddresses) {
324                    List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
325    
326                    for (Address departAddress : departAddresses) {
327                            ClusterNode departingClusterNode = _addressMap.remove(
328                                    departAddress);
329                            if (departingClusterNode != null) {
330                                    departingClusterNodes.add(departingClusterNode);
331    
332                                    _clusterNodeIdMap.remove(
333                                            departingClusterNode.getClusterNodeId());
334                            }
335                    }
336    
337                    if (departingClusterNodes.isEmpty()) {
338                            return;
339                    }
340    
341                    ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
342    
343                    fireClusterEvent(clusterEvent);
344            }
345    
346            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
347                    boolean isMulticast = clusterRequest.isMulticast();
348    
349                    List<Address> addresses = null;
350    
351                    if (isMulticast) {
352                            addresses = getAddresses(_controlChannel);
353                    }
354                    else {
355                            Collection<String> clusterNodeIds =
356                                    clusterRequest.getTargetClusterNodeIds();
357    
358                            addresses = new ArrayList<Address>(clusterNodeIds.size());
359    
360                            for (String clusterNodeId : clusterNodeIds) {
361                                    Address address = _clusterNodeIdMap.get(clusterNodeId);
362    
363                                    addresses.add(address);
364                            }
365                    }
366    
367                    return addresses;
368            }
369    
370            protected ClusterNodeResponse runLocalMethod(MethodWrapper methodWrapper)
371                    throws SystemException {
372    
373                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
374    
375                    ClusterNode localClusterNode = getLocalClusterNode();
376    
377                    clusterNodeResponse.setClusterNode(localClusterNode);
378                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
379    
380                    if (methodWrapper == null) {
381                            clusterNodeResponse.setException(
382                                    new ClusterException(
383                                            "Payload is not of type " + MethodWrapper.class.getName()));
384    
385                            return clusterNodeResponse;
386                    }
387    
388                    try {
389                            Object returnValue = MethodInvoker.invoke(methodWrapper);
390    
391                            if (returnValue instanceof Serializable) {
392                                    clusterNodeResponse.setResult(returnValue);
393                            }
394                            else if (returnValue != null) {
395                                    clusterNodeResponse.setException(
396                                            new ClusterException("Return value is not serializable"));
397                            }
398                    }
399                    catch (Exception e) {
400                            clusterNodeResponse.setException(e);
401                    }
402    
403                    return clusterNodeResponse;
404            }
405    
406            protected void sendMulticastRequest(ClusterRequest clusterRequest)
407                    throws SystemException {
408    
409                    try {
410                            _controlChannel.send(null, null, clusterRequest);
411                    }
412                    catch (ChannelException ce) {
413                            _log.error(
414                                    "Unable to send multicast message " + clusterRequest, ce);
415    
416                            throw new SystemException(
417                                    "Unable to send multicast request", ce);
418                    }
419            }
420    
421            protected void sendUnicastRequest(
422                            ClusterRequest clusterRequest, List<Address> addresses)
423                    throws SystemException {
424    
425                    for (Address address : addresses) {
426                            org.jgroups.Address jGroupsAddress =
427                                    (org.jgroups.Address)address.getRealAddress();
428    
429                            try {
430                                    _controlChannel.send(jGroupsAddress, null, clusterRequest);
431                            }
432                            catch (ChannelException ce) {
433                                    _log.error(
434                                            "Unable to send unicast message " + clusterRequest, ce);
435    
436                                    throw new SystemException(
437                                            "Unable to send unicast request", ce);
438                            }
439                    }
440            }
441    
442            private static final String _DEFAULT_CLUSTER_NAME =
443                    "LIFERAY-CONTROL-CHANNEL";
444    
445            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
446    
447            private Map<Address, ClusterNode> _addressMap =
448                    new ConcurrentHashMap<Address, ClusterNode>();
449            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
450                    new CopyOnWriteArrayList<ClusterEventListener>();
451            private Map<String, Address> _clusterNodeIdMap =
452                    new ConcurrentHashMap<String, Address>();
453            private JChannel _controlChannel;
454            private Map<String, FutureClusterResponses> _executionResultMap =
455                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
456            private String _localClusterNodeId;
457            private boolean _shortcutLocalMethod;
458    
459    }