001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterException;
019    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
020    import com.liferay.portal.kernel.cluster.ClusterMessageType;
021    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022    import com.liferay.portal.kernel.cluster.ClusterRequest;
023    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024    import com.liferay.portal.kernel.log.Log;
025    import com.liferay.portal.kernel.log.LogFactoryUtil;
026    import com.liferay.portal.kernel.util.MethodHandler;
027    
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.List;
031    
032    import org.jgroups.Channel;
033    import org.jgroups.Message;
034    import org.jgroups.View;
035    
036    /**
037     * @author Michael C. Han
038     * @author Tina Tian
039     */
040    public class ClusterRequestReceiver extends BaseReceiver {
041    
042            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
043                    _clusterExecutorImpl = clusterExecutorImpl;
044            }
045    
046            @Override
047            protected void doReceive(Message message) {
048                    Object obj = message.getObject();
049    
050                    if (obj == null) {
051                            if (_log.isWarnEnabled()) {
052                                    _log.warn("Message content is null");
053                            }
054    
055                            return;
056                    }
057    
058                    Address sourceAddress = new AddressImpl(message.getSrc());
059    
060                    if (sourceAddress.equals(
061                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
062    
063                            boolean isProcessed = processLocalMessage(obj);
064    
065                            if (isProcessed) {
066                                    return;
067                            }
068                    }
069    
070                    if (obj instanceof ClusterRequest) {
071                            ClusterRequest clusterRequest = (ClusterRequest)obj;
072    
073                            processClusterRequest(clusterRequest, sourceAddress);
074                    }
075                    else if (obj instanceof ClusterNodeResponse) {
076                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
077    
078                            processClusterResponse(clusterNodeResponse, sourceAddress);
079                    }
080                    else if (_log.isWarnEnabled()) {
081                            _log.warn(
082                                    "Unable to process message content of type " + obj.getClass());
083                    }
084            }
085    
086            @Override
087            protected void doViewAccepted(View oldView, View newView) {
088                    List<Address> departAddresses = getDepartAddresses(oldView, newView);
089                    List<Address> newAddresses = getNewAddresses(oldView, newView);
090    
091                    if (!newAddresses.isEmpty()) {
092                            _clusterExecutorImpl.sendNotifyRequest();
093                    }
094    
095                    if (!departAddresses.isEmpty()) {
096                            _clusterExecutorImpl.memberRemoved(departAddresses);
097                    }
098            }
099    
100            protected List<Address> getDepartAddresses(View oldView, View newView) {
101                    List<org.jgroups.Address> currentJGroupsAddresses =
102                            newView.getMembers();
103                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
104    
105                    List<org.jgroups.Address> departJGroupsAddresses =
106                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
107    
108                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
109    
110                    if (departJGroupsAddresses.isEmpty()) {
111                            return Collections.emptyList();
112                    }
113    
114                    List<Address> departAddresses = new ArrayList<Address>(
115                            departJGroupsAddresses.size());
116    
117                    for (org.jgroups.Address departJGroupsAddress :
118                                    departJGroupsAddresses) {
119    
120                            Address departAddress = new AddressImpl(departJGroupsAddress);
121    
122                            departAddresses.add(departAddress);
123                    }
124    
125                    return departAddresses;
126            }
127    
128            protected List<Address> getNewAddresses(View oldView, View newView) {
129                    List<org.jgroups.Address> currentJGroupsAddresses =
130                            newView.getMembers();
131                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
132    
133                    List<org.jgroups.Address> newJGroupsAddresses =
134                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
135    
136                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
137    
138                    if (newJGroupsAddresses.isEmpty()) {
139                            return Collections.emptyList();
140                    }
141    
142                    List<Address> newAddresses = new ArrayList<Address>(
143                            newJGroupsAddresses.size());
144    
145                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
146                            Address newAddress = new AddressImpl(newJGroupsAddress);
147    
148                            newAddresses.add(newAddress);
149                    }
150    
151                    return newAddresses;
152            }
153    
154            protected void handleResponse(
155                    Address address, ClusterRequest clusterRequest, Object returnValue,
156                    Exception exception) {
157    
158                    ClusterNodeResponse clusterNodeResponse =
159                            _clusterExecutorImpl.generateClusterNodeResponse(
160                                    clusterRequest, returnValue, exception);
161    
162                    Channel channel = _clusterExecutorImpl.getControlChannel();
163    
164                    try {
165                            channel.send(
166                                    (org.jgroups.Address)address.getRealAddress(),
167                                    clusterNodeResponse);
168                    }
169                    catch (Exception e) {
170                            _log.error(
171                                    "Unable to send response message " + clusterNodeResponse, e);
172                    }
173                    catch (Throwable t) {
174                            _log.error(t, t);
175                    }
176            }
177    
178            protected void processClusterRequest(
179                    ClusterRequest clusterRequest, Address sourceAddress) {
180    
181                    ClusterMessageType clusterMessageType =
182                            clusterRequest.getClusterMessageType();
183    
184                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
185                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
186    
187                            _clusterExecutorImpl.memberJoined(
188                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
189    
190                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
191                                    handleResponse(sourceAddress, clusterRequest, null, null);
192                            }
193    
194                            return;
195                    }
196    
197                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
198    
199                    Object returnValue = null;
200                    Exception exception = null;
201    
202                    if (methodHandler != null) {
203                            try {
204                                    ClusterInvokeThreadLocal.setEnabled(false);
205    
206                                    returnValue = methodHandler.invoke(true);
207                            }
208                            catch (Exception e) {
209                                    exception = e;
210    
211                                    _log.error("Unable to invoke method " + methodHandler, e);
212                            }
213                            finally {
214                                    ClusterInvokeThreadLocal.setEnabled(true);
215                            }
216                    }
217                    else {
218                            exception = new ClusterException(
219                                    "Payload is not of type " + MethodHandler.class.getName());
220                    }
221    
222                    if (!clusterRequest.isFireAndForget()) {
223                            handleResponse(
224                                    sourceAddress, clusterRequest, returnValue, exception);
225                    }
226            }
227    
228            protected void processClusterResponse(
229                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
230    
231                    ClusterMessageType clusterMessageType =
232                            clusterNodeResponse.getClusterMessageType();
233    
234                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
235                            _clusterExecutorImpl.memberJoined(
236                                    sourceAddress, clusterNodeResponse.getClusterNode());
237    
238                            return;
239                    }
240    
241                    String uuid = clusterNodeResponse.getUuid();
242    
243                    FutureClusterResponses futureClusterResponses =
244                            _clusterExecutorImpl.getExecutionResults(uuid);
245    
246                    if (futureClusterResponses == null) {
247                            if (_log.isInfoEnabled()) {
248                                    _log.info("Unable to find response container for " + uuid);
249                            }
250    
251                            return;
252                    }
253    
254                    if (futureClusterResponses.expectsReply(sourceAddress)) {
255                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
256                    }
257                    else {
258                            if (_log.isWarnEnabled()) {
259                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
260                            }
261                    }
262            }
263    
264            protected boolean processLocalMessage(Object message) {
265                    if (message instanceof ClusterRequest) {
266                            ClusterRequest clusterRequest = (ClusterRequest)message;
267    
268                            if (clusterRequest.isSkipLocal()) {
269                                    return true;
270                            }
271                    }
272    
273                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
274                            return true;
275                    }
276    
277                    return false;
278            }
279    
280            private static Log _log = LogFactoryUtil.getLog(
281                    ClusterRequestReceiver.class);
282    
283            private ClusterExecutorImpl _clusterExecutorImpl;
284    
285    }