001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventType;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.util.MethodHandler;
029    
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.List;
033    
034    import org.jgroups.Message;
035    import org.jgroups.View;
036    
037    /**
038     * @author Michael C. Han
039     * @author Tina Tian
040     */
041    public class ClusterRequestReceiver extends BaseReceiver {
042    
043            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
044                    _clusterExecutorImpl = clusterExecutorImpl;
045            }
046    
047            @Override
048            protected void doReceive(Message message) {
049                    Object obj = retrievePayload(message);
050    
051                    if (obj == null) {
052                            return;
053                    }
054    
055                    Address sourceAddress = new AddressImpl(message.getSrc());
056    
057                    if (sourceAddress.equals(
058                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
059    
060                            boolean isProcessed = processLocalMessage(obj);
061    
062                            if (isProcessed) {
063                                    return;
064                            }
065                    }
066    
067                    if (obj instanceof ClusterRequest) {
068                            ClusterRequest clusterRequest = (ClusterRequest)obj;
069    
070                            processClusterRequest(clusterRequest, sourceAddress);
071                    }
072                    else if (obj instanceof ClusterNodeResponse) {
073                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
074    
075                            processClusterResponse(clusterNodeResponse, sourceAddress);
076                    }
077                    else if (_log.isWarnEnabled()) {
078                            _log.warn(
079                                    "Unable to process message content of type " + obj.getClass());
080                    }
081            }
082    
083            @Override
084            protected void doViewAccepted(View oldView, View newView) {
085                    List<Address> departAddresses = getDepartAddresses(oldView, newView);
086                    List<Address> newAddresses = getNewAddresses(oldView, newView);
087    
088                    if (!newAddresses.isEmpty()) {
089                            _clusterExecutorImpl.sendNotifyRequest();
090                    }
091    
092                    if (!departAddresses.isEmpty()) {
093                            _clusterExecutorImpl.memberRemoved(departAddresses);
094                    }
095            }
096    
097            @Override
098            protected void doCoordinatorUpdated(org.jgroups.Address coordinator) {
099                    _clusterExecutorImpl.fireClusterEvent(
100                            new ClusterEvent(ClusterEventType.COORDINATOR_UPDATE));
101            }
102    
103            protected List<Address> getDepartAddresses(View oldView, View newView) {
104                    List<org.jgroups.Address> currentJGroupsAddresses =
105                            newView.getMembers();
106                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
107    
108                    List<org.jgroups.Address> departJGroupsAddresses =
109                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
110    
111                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
112    
113                    if (departJGroupsAddresses.isEmpty()) {
114                            return Collections.emptyList();
115                    }
116    
117                    List<Address> departAddresses = new ArrayList<Address>(
118                            departJGroupsAddresses.size());
119    
120                    for (org.jgroups.Address departJGroupsAddress :
121                                    departJGroupsAddresses) {
122    
123                            Address departAddress = new AddressImpl(departJGroupsAddress);
124    
125                            departAddresses.add(departAddress);
126                    }
127    
128                    return departAddresses;
129            }
130    
131            protected List<Address> getNewAddresses(View oldView, View newView) {
132                    List<org.jgroups.Address> currentJGroupsAddresses =
133                            newView.getMembers();
134                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
135    
136                    List<org.jgroups.Address> newJGroupsAddresses =
137                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
138    
139                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
140    
141                    if (newJGroupsAddresses.isEmpty()) {
142                            return Collections.emptyList();
143                    }
144    
145                    List<Address> newAddresses = new ArrayList<Address>(
146                            newJGroupsAddresses.size());
147    
148                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
149                            Address newAddress = new AddressImpl(newJGroupsAddress);
150    
151                            newAddresses.add(newAddress);
152                    }
153    
154                    return newAddresses;
155            }
156    
157            protected void handleResponse(
158                    Address address, ClusterRequest clusterRequest, Object returnValue,
159                    Exception exception) {
160    
161                    ClusterNodeResponse clusterNodeResponse =
162                            _clusterExecutorImpl.generateClusterNodeResponse(
163                                    clusterRequest, returnValue, exception);
164    
165                    try {
166                            _clusterExecutorImpl.sendJGroupsMessage(
167                                    _clusterExecutorImpl.getControlChannel(),
168                                    (org.jgroups.Address)address.getRealAddress(),
169                                    clusterNodeResponse);
170                    }
171                    catch (Exception e) {
172                            _log.error(
173                                    "Unable to send response message " + clusterNodeResponse, e);
174                    }
175                    catch (Throwable t) {
176                            _log.error(t, t);
177                    }
178            }
179    
180            protected void processClusterRequest(
181                    ClusterRequest clusterRequest, Address sourceAddress) {
182    
183                    ClusterMessageType clusterMessageType =
184                            clusterRequest.getClusterMessageType();
185    
186                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
187                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
188    
189                            _clusterExecutorImpl.memberJoined(
190                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
191    
192                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
193                                    handleResponse(sourceAddress, clusterRequest, null, null);
194                            }
195    
196                            return;
197                    }
198    
199                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
200    
201                    Object returnValue = null;
202                    Exception exception = null;
203    
204                    if (methodHandler != null) {
205                            try {
206                                    ClusterInvokeThreadLocal.setEnabled(false);
207    
208                                    returnValue = methodHandler.invoke(true);
209                            }
210                            catch (Exception e) {
211                                    exception = e;
212    
213                                    _log.error("Unable to invoke method " + methodHandler, e);
214                            }
215                            finally {
216                                    ClusterInvokeThreadLocal.setEnabled(true);
217                            }
218                    }
219                    else {
220                            exception = new ClusterException(
221                                    "Payload is not of type " + MethodHandler.class.getName());
222                    }
223    
224                    if (!clusterRequest.isFireAndForget()) {
225                            handleResponse(
226                                    sourceAddress, clusterRequest, returnValue, exception);
227                    }
228            }
229    
230            protected void processClusterResponse(
231                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
232    
233                    ClusterMessageType clusterMessageType =
234                            clusterNodeResponse.getClusterMessageType();
235    
236                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
237                            _clusterExecutorImpl.memberJoined(
238                                    sourceAddress, clusterNodeResponse.getClusterNode());
239    
240                            return;
241                    }
242    
243                    String uuid = clusterNodeResponse.getUuid();
244    
245                    FutureClusterResponses futureClusterResponses =
246                            _clusterExecutorImpl.getExecutionResults(uuid);
247    
248                    if (futureClusterResponses == null) {
249                            if (_log.isInfoEnabled()) {
250                                    _log.info("Unable to find response container for " + uuid);
251                            }
252    
253                            return;
254                    }
255    
256                    if (futureClusterResponses.expectsReply(sourceAddress)) {
257                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
258                    }
259                    else {
260                            if (_log.isWarnEnabled()) {
261                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
262                            }
263                    }
264            }
265    
266            protected boolean processLocalMessage(Object message) {
267                    if (message instanceof ClusterRequest) {
268                            ClusterRequest clusterRequest = (ClusterRequest)message;
269    
270                            if (clusterRequest.isSkipLocal()) {
271                                    return true;
272                            }
273                    }
274    
275                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
276                            return true;
277                    }
278    
279                    return false;
280            }
281    
282            private static Log _log = LogFactoryUtil.getLog(
283                    ClusterRequestReceiver.class);
284    
285            private ClusterExecutorImpl _clusterExecutorImpl;
286    
287    }