001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterException;
019    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
020    import com.liferay.portal.kernel.cluster.ClusterMessageType;
021    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022    import com.liferay.portal.kernel.cluster.ClusterRequest;
023    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024    import com.liferay.portal.kernel.log.Log;
025    import com.liferay.portal.kernel.log.LogFactoryUtil;
026    import com.liferay.portal.kernel.util.MethodHandler;
027    
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.List;
031    import java.util.concurrent.CountDownLatch;
032    
033    import org.jgroups.Channel;
034    import org.jgroups.Message;
035    import org.jgroups.View;
036    
037    /**
038     * @author Michael C. Han
039     * @author Tina Tian
040     */
041    public class ClusterRequestReceiver extends BaseReceiver {
042    
043            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
044                    _countDownLatch = new CountDownLatch(1);
045                    _clusterExecutorImpl = clusterExecutorImpl;
046            }
047    
048            public void openLatch() {
049                    _countDownLatch.countDown();
050            }
051    
052            @Override
053            public void receive(Message message) {
054                    try {
055                            _countDownLatch.await();
056                    }
057                    catch (InterruptedException ie) {
058                            _log.error(
059                                    "Latch opened prematurely by interruption. Dependence may " +
060                                            "not be ready.");
061                    }
062    
063                    Object obj = message.getObject();
064    
065                    if (obj == null) {
066                            if (_log.isWarnEnabled()) {
067                                    _log.warn("Message content is null");
068                            }
069    
070                            return;
071                    }
072    
073                    Address sourceAddress = new AddressImpl(message.getSrc());
074    
075                    if (sourceAddress.equals(
076                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
077    
078                            boolean isProcessed = processLocalMessage(obj);
079    
080                            if (isProcessed) {
081                                    return;
082                            }
083                    }
084    
085                    if (obj instanceof ClusterRequest) {
086                            ClusterRequest clusterRequest = (ClusterRequest)obj;
087    
088                            processClusterRequest(clusterRequest, sourceAddress);
089                    }
090                    else if (obj instanceof ClusterNodeResponse) {
091                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
092    
093                            processClusterResponse(clusterNodeResponse, sourceAddress);
094                    }
095                    else if (_log.isWarnEnabled()) {
096                            _log.warn(
097                                    "Unable to process message content of type " + obj.getClass());
098                    }
099            }
100    
101            @Override
102            public void viewAccepted(View view) {
103                    super.viewAccepted(view);
104    
105                    if (_lastView == null) {
106                            _lastView = view;
107    
108                            return;
109                    }
110    
111                    List<Address> departAddresses = getDepartAddresses(view);
112                    List<Address> newAddresses = getNewAddresses(view);
113    
114                    _lastView = view;
115    
116                    try {
117                            _countDownLatch.await();
118                    }
119                    catch (InterruptedException ie) {
120                            _log.error(
121                                    "Latch opened prematurely by interruption. Dependence may " +
122                                            "not be ready.");
123                    }
124    
125                    if (!newAddresses.isEmpty()) {
126                            _clusterExecutorImpl.sendNotifyRequest();
127                    }
128    
129                    if (!departAddresses.isEmpty()) {
130                            _clusterExecutorImpl.memberRemoved(departAddresses);
131                    }
132            }
133    
134            protected List<Address> getDepartAddresses(View view) {
135                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
136                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
137    
138                    List<org.jgroups.Address> departJGroupsAddresses =
139                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
140    
141                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
142    
143                    if (departJGroupsAddresses.isEmpty()) {
144                            return Collections.emptyList();
145                    }
146    
147                    List<Address> departAddresses = new ArrayList<Address>(
148                            departJGroupsAddresses.size());
149    
150                    for (org.jgroups.Address departJGroupsAddress :
151                                    departJGroupsAddresses) {
152    
153                            Address departAddress = new AddressImpl(departJGroupsAddress);
154    
155                            departAddresses.add(departAddress);
156                    }
157    
158                    return departAddresses;
159            }
160    
161            protected List<Address> getNewAddresses(View view) {
162                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
163                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
164    
165                    List<org.jgroups.Address> newJGroupsAddresses =
166                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
167    
168                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
169    
170                    if (newJGroupsAddresses.isEmpty()) {
171                            return Collections.emptyList();
172                    }
173    
174                    List<Address> newAddresses = new ArrayList<Address>(
175                            newJGroupsAddresses.size());
176    
177                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
178                            Address newAddress = new AddressImpl(newJGroupsAddress);
179    
180                            newAddresses.add(newAddress);
181                    }
182    
183                    return newAddresses;
184            }
185    
186            protected void handleResponse(
187                    Address address, ClusterRequest clusterRequest, Object returnValue,
188                    Exception exception) {
189    
190                    ClusterNodeResponse clusterNodeResponse =
191                            _clusterExecutorImpl.generateClusterNodeResponse(
192                                    clusterRequest, returnValue, exception);
193    
194                    Channel channel = _clusterExecutorImpl.getControlChannel();
195    
196                    try {
197                            channel.send(
198                                    (org.jgroups.Address)address.getRealAddress(),
199                                    clusterNodeResponse);
200                    }
201                    catch (Exception e) {
202                            _log.error(
203                                    "Unable to send response message " + clusterNodeResponse, e);
204                    }
205                    catch (Throwable t) {
206                            _log.error(t, t);
207                    }
208            }
209    
210            protected void processClusterRequest(
211                    ClusterRequest clusterRequest, Address sourceAddress) {
212    
213                    ClusterMessageType clusterMessageType =
214                            clusterRequest.getClusterMessageType();
215    
216                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
217                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
218    
219                            _clusterExecutorImpl.memberJoined(
220                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
221    
222                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
223                                    handleResponse(sourceAddress, clusterRequest, null, null);
224                            }
225    
226                            return;
227                    }
228    
229                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
230    
231                    Object returnValue = null;
232                    Exception exception = null;
233    
234                    if (methodHandler != null) {
235                            try {
236                                    ClusterInvokeThreadLocal.setEnabled(false);
237    
238                                    returnValue = methodHandler.invoke(true);
239                            }
240                            catch (Exception e) {
241                                    exception = e;
242    
243                                    _log.error("Unable to invoke method " + methodHandler, e);
244                            }
245                            finally {
246                                    ClusterInvokeThreadLocal.setEnabled(true);
247                            }
248                    }
249                    else {
250                            exception = new ClusterException(
251                                    "Payload is not of type " + MethodHandler.class.getName());
252                    }
253    
254                    if (!clusterRequest.isFireAndForget()) {
255                            handleResponse(
256                                    sourceAddress, clusterRequest, returnValue, exception);
257                    }
258            }
259    
260            protected void processClusterResponse(
261                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
262    
263                    ClusterMessageType clusterMessageType =
264                            clusterNodeResponse.getClusterMessageType();
265    
266                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
267                            _clusterExecutorImpl.memberJoined(
268                                    sourceAddress, clusterNodeResponse.getClusterNode());
269    
270                            return;
271                    }
272    
273                    String uuid = clusterNodeResponse.getUuid();
274    
275                    FutureClusterResponses futureClusterResponses =
276                            _clusterExecutorImpl.getExecutionResults(uuid);
277    
278                    if (futureClusterResponses == null) {
279                            if (_log.isInfoEnabled()) {
280                                    _log.info("Unable to find response container for " + uuid);
281                            }
282    
283                            return;
284                    }
285    
286                    if (futureClusterResponses.expectsReply(sourceAddress)) {
287                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
288                    }
289                    else {
290                            if (_log.isWarnEnabled()) {
291                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
292                            }
293                    }
294            }
295    
296            protected boolean processLocalMessage(Object message) {
297                    if (message instanceof ClusterRequest) {
298                            ClusterRequest clusterRequest = (ClusterRequest)message;
299    
300                            if (clusterRequest.isSkipLocal()) {
301                                    return true;
302                            }
303                    }
304    
305                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
306                            return true;
307                    }
308    
309                    return false;
310            }
311    
312            private static Log _log = LogFactoryUtil.getLog(
313                    ClusterRequestReceiver.class);
314    
315            private ClusterExecutorImpl _clusterExecutorImpl;
316            private CountDownLatch _countDownLatch;
317            private volatile View _lastView;
318    
319    }