001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cache.Lifecycle;
018    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    
031    import java.util.ArrayList;
032    import java.util.Collections;
033    import java.util.List;
034    
035    import org.jgroups.Channel;
036    import org.jgroups.Message;
037    import org.jgroups.View;
038    
039    /**
040     * @author Michael C. Han
041     * @author Tina Tian
042     */
043    public class ClusterRequestReceiver extends BaseReceiver {
044    
045            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
046                    _clusterExecutorImpl = clusterExecutorImpl;
047            }
048    
049            @Override
050            protected void doReceive(Message message) {
051                    Object obj = message.getObject();
052    
053                    if (obj == null) {
054                            if (_log.isWarnEnabled()) {
055                                    _log.warn("Message content is null");
056                            }
057    
058                            return;
059                    }
060    
061                    Address sourceAddress = new AddressImpl(message.getSrc());
062    
063                    if (sourceAddress.equals(
064                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
065    
066                            boolean isProcessed = processLocalMessage(obj);
067    
068                            if (isProcessed) {
069                                    return;
070                            }
071                    }
072    
073                    try {
074                            if (obj instanceof ClusterRequest) {
075                                    ClusterRequest clusterRequest = (ClusterRequest)obj;
076    
077                                    processClusterRequest(clusterRequest, sourceAddress);
078                            }
079                            else if (obj instanceof ClusterNodeResponse) {
080                                    ClusterNodeResponse clusterNodeResponse =
081                                            (ClusterNodeResponse)obj;
082    
083                                    processClusterResponse(clusterNodeResponse, sourceAddress);
084                            }
085                            else if (_log.isWarnEnabled()) {
086                                    _log.warn(
087                                            "Unable to process message content of type " +
088                                                    obj.getClass());
089                            }
090                    }
091                    finally {
092                            ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
093    
094                            CentralizedThreadLocal.clearShortLivedThreadLocals();
095                    }
096            }
097    
098            @Override
099            protected void doViewAccepted(View oldView, View newView) {
100                    List<Address> departAddresses = getDepartAddresses(oldView, newView);
101                    List<Address> newAddresses = getNewAddresses(oldView, newView);
102    
103                    if (!newAddresses.isEmpty()) {
104                            _clusterExecutorImpl.sendNotifyRequest();
105                    }
106    
107                    if (!departAddresses.isEmpty()) {
108                            _clusterExecutorImpl.memberRemoved(departAddresses);
109                    }
110            }
111    
112            protected List<Address> getDepartAddresses(View oldView, View newView) {
113                    List<org.jgroups.Address> currentJGroupsAddresses =
114                            newView.getMembers();
115                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
116    
117                    List<org.jgroups.Address> departJGroupsAddresses =
118                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
119    
120                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
121    
122                    if (departJGroupsAddresses.isEmpty()) {
123                            return Collections.emptyList();
124                    }
125    
126                    List<Address> departAddresses = new ArrayList<Address>(
127                            departJGroupsAddresses.size());
128    
129                    for (org.jgroups.Address departJGroupsAddress :
130                                    departJGroupsAddresses) {
131    
132                            Address departAddress = new AddressImpl(departJGroupsAddress);
133    
134                            departAddresses.add(departAddress);
135                    }
136    
137                    return departAddresses;
138            }
139    
140            protected List<Address> getNewAddresses(View oldView, View newView) {
141                    List<org.jgroups.Address> currentJGroupsAddresses =
142                            newView.getMembers();
143                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
144    
145                    List<org.jgroups.Address> newJGroupsAddresses =
146                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
147    
148                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
149    
150                    if (newJGroupsAddresses.isEmpty()) {
151                            return Collections.emptyList();
152                    }
153    
154                    List<Address> newAddresses = new ArrayList<Address>(
155                            newJGroupsAddresses.size());
156    
157                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
158                            Address newAddress = new AddressImpl(newJGroupsAddress);
159    
160                            newAddresses.add(newAddress);
161                    }
162    
163                    return newAddresses;
164            }
165    
166            protected void handleResponse(
167                    Address address, ClusterRequest clusterRequest, Object returnValue,
168                    Exception exception) {
169    
170                    ClusterNodeResponse clusterNodeResponse =
171                            _clusterExecutorImpl.generateClusterNodeResponse(
172                                    clusterRequest, returnValue, exception);
173    
174                    Channel channel = _clusterExecutorImpl.getControlChannel();
175    
176                    try {
177                            channel.send(
178                                    (org.jgroups.Address)address.getRealAddress(),
179                                    clusterNodeResponse);
180                    }
181                    catch (Exception e) {
182                            _log.error(
183                                    "Unable to send response message " + clusterNodeResponse, e);
184                    }
185                    catch (Throwable t) {
186                            _log.error(t, t);
187                    }
188            }
189    
190            protected void processClusterRequest(
191                    ClusterRequest clusterRequest, Address sourceAddress) {
192    
193                    ClusterMessageType clusterMessageType =
194                            clusterRequest.getClusterMessageType();
195    
196                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
197                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
198    
199                            _clusterExecutorImpl.memberJoined(
200                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
201    
202                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
203                                    handleResponse(sourceAddress, clusterRequest, null, null);
204                            }
205    
206                            return;
207                    }
208    
209                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
210    
211                    Object returnValue = null;
212                    Exception exception = null;
213    
214                    if (methodHandler != null) {
215                            try {
216                                    ClusterInvokeThreadLocal.setEnabled(false);
217    
218                                    returnValue = methodHandler.invoke();
219                            }
220                            catch (Exception e) {
221                                    exception = e;
222    
223                                    _log.error("Unable to invoke method " + methodHandler, e);
224                            }
225                            finally {
226                                    ClusterInvokeThreadLocal.setEnabled(true);
227                            }
228                    }
229                    else {
230                            exception = new ClusterException(
231                                    "Payload is not of type " + MethodHandler.class.getName());
232                    }
233    
234                    if (!clusterRequest.isFireAndForget()) {
235                            handleResponse(
236                                    sourceAddress, clusterRequest, returnValue, exception);
237                    }
238            }
239    
240            protected void processClusterResponse(
241                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
242    
243                    ClusterMessageType clusterMessageType =
244                            clusterNodeResponse.getClusterMessageType();
245    
246                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
247                            _clusterExecutorImpl.memberJoined(
248                                    sourceAddress, clusterNodeResponse.getClusterNode());
249    
250                            return;
251                    }
252    
253                    String uuid = clusterNodeResponse.getUuid();
254    
255                    FutureClusterResponses futureClusterResponses =
256                            _clusterExecutorImpl.getExecutionResults(uuid);
257    
258                    if (futureClusterResponses == null) {
259                            if (_log.isInfoEnabled()) {
260                                    _log.info("Unable to find response container for " + uuid);
261                            }
262    
263                            return;
264                    }
265    
266                    if (futureClusterResponses.expectsReply(sourceAddress)) {
267                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
268                    }
269                    else {
270                            if (_log.isWarnEnabled()) {
271                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
272                            }
273                    }
274            }
275    
276            protected boolean processLocalMessage(Object message) {
277                    if (message instanceof ClusterRequest) {
278                            ClusterRequest clusterRequest = (ClusterRequest)message;
279    
280                            if (clusterRequest.isSkipLocal()) {
281                                    return true;
282                            }
283                    }
284    
285                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
286                            return true;
287                    }
288    
289                    return false;
290            }
291    
292            private static final Log _log = LogFactoryUtil.getLog(
293                    ClusterRequestReceiver.class);
294    
295            private final ClusterExecutorImpl _clusterExecutorImpl;
296    
297    }