001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cache.Lifecycle;
018    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.log.Log;
027    import com.liferay.portal.kernel.log.LogFactoryUtil;
028    import com.liferay.portal.kernel.util.CentralizedThreadLocal;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    
031    import java.util.ArrayList;
032    import java.util.Collections;
033    import java.util.List;
034    import java.util.concurrent.CountDownLatch;
035    
036    import org.jgroups.Channel;
037    import org.jgroups.Message;
038    import org.jgroups.View;
039    
040    /**
041     * @author Michael C. Han
042     * @author Tina Tian
043     */
044    public class ClusterRequestReceiver extends BaseReceiver {
045    
046            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
047                    _countDownLatch = new CountDownLatch(1);
048                    _clusterExecutorImpl = clusterExecutorImpl;
049            }
050    
051            public void openLatch() {
052                    _countDownLatch.countDown();
053            }
054    
055            @Override
056            public void receive(Message message) {
057                    try {
058                            _countDownLatch.await();
059                    }
060                    catch (InterruptedException ie) {
061                            _log.error(
062                                    "Latch opened prematurely by interruption. Dependence may " +
063                                            "not be ready.");
064                    }
065    
066                    Object obj = message.getObject();
067    
068                    if (obj == null) {
069                            if (_log.isWarnEnabled()) {
070                                    _log.warn("Message content is null");
071                            }
072    
073                            return;
074                    }
075    
076                    Address sourceAddress = new AddressImpl(message.getSrc());
077    
078                    if (sourceAddress.equals(
079                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
080    
081                            boolean isProcessed = processLocalMessage(obj);
082    
083                            if (isProcessed) {
084                                    return;
085                            }
086                    }
087    
088                    try {
089                            if (obj instanceof ClusterRequest) {
090                                    ClusterRequest clusterRequest = (ClusterRequest)obj;
091    
092                                    processClusterRequest(clusterRequest, sourceAddress);
093                            }
094                            else if (obj instanceof ClusterNodeResponse) {
095                                    ClusterNodeResponse clusterNodeResponse =
096                                            (ClusterNodeResponse)obj;
097    
098                                    processClusterResponse(clusterNodeResponse, sourceAddress);
099                            }
100                            else if (_log.isWarnEnabled()) {
101                                    _log.warn(
102                                            "Unable to process message content of type " +
103                                                    obj.getClass());
104                            }
105                    }
106                    finally {
107                            ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
108    
109                            CentralizedThreadLocal.clearShortLivedThreadLocals();
110                    }
111            }
112    
113            @Override
114            public void viewAccepted(View view) {
115                    super.viewAccepted(view);
116    
117                    if (_lastView == null) {
118                            _lastView = view;
119    
120                            return;
121                    }
122    
123                    List<Address> departAddresses = getDepartAddresses(view);
124                    List<Address> newAddresses = getNewAddresses(view);
125    
126                    _lastView = view;
127    
128                    try {
129                            _countDownLatch.await();
130                    }
131                    catch (InterruptedException ie) {
132                            _log.error(
133                                    "Latch opened prematurely by interruption. Dependence may " +
134                                            "not be ready.");
135                    }
136    
137                    if (!newAddresses.isEmpty()) {
138                            _clusterExecutorImpl.sendNotifyRequest();
139                    }
140    
141                    if (!departAddresses.isEmpty()) {
142                            _clusterExecutorImpl.memberRemoved(departAddresses);
143                    }
144            }
145    
146            protected List<Address> getDepartAddresses(View view) {
147                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
148                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
149    
150                    List<org.jgroups.Address> departJGroupsAddresses =
151                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
152    
153                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
154    
155                    if (departJGroupsAddresses.isEmpty()) {
156                            return Collections.emptyList();
157                    }
158    
159                    List<Address> departAddresses = new ArrayList<Address>(
160                            departJGroupsAddresses.size());
161    
162                    for (org.jgroups.Address departJGroupsAddress :
163                                    departJGroupsAddresses) {
164    
165                            Address departAddress = new AddressImpl(departJGroupsAddress);
166    
167                            departAddresses.add(departAddress);
168                    }
169    
170                    return departAddresses;
171            }
172    
173            protected List<Address> getNewAddresses(View view) {
174                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
175                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
176    
177                    List<org.jgroups.Address> newJGroupsAddresses =
178                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
179    
180                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
181    
182                    if (newJGroupsAddresses.isEmpty()) {
183                            return Collections.emptyList();
184                    }
185    
186                    List<Address> newAddresses = new ArrayList<Address>(
187                            newJGroupsAddresses.size());
188    
189                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
190                            Address newAddress = new AddressImpl(newJGroupsAddress);
191    
192                            newAddresses.add(newAddress);
193                    }
194    
195                    return newAddresses;
196            }
197    
198            protected void handleResponse(
199                    Address address, ClusterRequest clusterRequest, Object returnValue,
200                    Exception exception) {
201    
202                    ClusterNodeResponse clusterNodeResponse =
203                            _clusterExecutorImpl.generateClusterNodeResponse(
204                                    clusterRequest, returnValue, exception);
205    
206                    Channel channel = _clusterExecutorImpl.getControlChannel();
207    
208                    try {
209                            channel.send(
210                                    (org.jgroups.Address)address.getRealAddress(),
211                                    clusterNodeResponse);
212                    }
213                    catch (Exception e) {
214                            _log.error(
215                                    "Unable to send response message " + clusterNodeResponse, e);
216                    }
217                    catch (Throwable t) {
218                            _log.error(t, t);
219                    }
220            }
221    
222            protected void processClusterRequest(
223                    ClusterRequest clusterRequest, Address sourceAddress) {
224    
225                    ClusterMessageType clusterMessageType =
226                            clusterRequest.getClusterMessageType();
227    
228                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
229                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
230    
231                            _clusterExecutorImpl.memberJoined(
232                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
233    
234                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
235                                    handleResponse(sourceAddress, clusterRequest, null, null);
236                            }
237    
238                            return;
239                    }
240    
241                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
242    
243                    Object returnValue = null;
244                    Exception exception = null;
245    
246                    if (methodHandler != null) {
247                            try {
248                                    ClusterInvokeThreadLocal.setEnabled(false);
249    
250                                    returnValue = methodHandler.invoke();
251                            }
252                            catch (Exception e) {
253                                    exception = e;
254    
255                                    _log.error("Unable to invoke method " + methodHandler, e);
256                            }
257                            finally {
258                                    ClusterInvokeThreadLocal.setEnabled(true);
259                            }
260                    }
261                    else {
262                            exception = new ClusterException(
263                                    "Payload is not of type " + MethodHandler.class.getName());
264                    }
265    
266                    if (!clusterRequest.isFireAndForget()) {
267                            handleResponse(
268                                    sourceAddress, clusterRequest, returnValue, exception);
269                    }
270            }
271    
272            protected void processClusterResponse(
273                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
274    
275                    ClusterMessageType clusterMessageType =
276                            clusterNodeResponse.getClusterMessageType();
277    
278                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
279                            _clusterExecutorImpl.memberJoined(
280                                    sourceAddress, clusterNodeResponse.getClusterNode());
281    
282                            return;
283                    }
284    
285                    String uuid = clusterNodeResponse.getUuid();
286    
287                    FutureClusterResponses futureClusterResponses =
288                            _clusterExecutorImpl.getExecutionResults(uuid);
289    
290                    if (futureClusterResponses == null) {
291                            if (_log.isInfoEnabled()) {
292                                    _log.info("Unable to find response container for " + uuid);
293                            }
294    
295                            return;
296                    }
297    
298                    if (futureClusterResponses.expectsReply(sourceAddress)) {
299                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
300                    }
301                    else {
302                            if (_log.isWarnEnabled()) {
303                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
304                            }
305                    }
306            }
307    
308            protected boolean processLocalMessage(Object message) {
309                    if (message instanceof ClusterRequest) {
310                            ClusterRequest clusterRequest = (ClusterRequest)message;
311    
312                            if (clusterRequest.isSkipLocal()) {
313                                    return true;
314                            }
315                    }
316    
317                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
318                            return true;
319                    }
320    
321                    return false;
322            }
323    
324            private static final Log _log = LogFactoryUtil.getLog(
325                    ClusterRequestReceiver.class);
326    
327            private final ClusterExecutorImpl _clusterExecutorImpl;
328            private final CountDownLatch _countDownLatch;
329            private volatile View _lastView;
330    
331    }