001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018    import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterMessageType;
022    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
023    import com.liferay.portal.kernel.cluster.ClusterRequest;
024    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
025    import com.liferay.portal.kernel.log.Log;
026    import com.liferay.portal.kernel.log.LogFactoryUtil;
027    import com.liferay.portal.kernel.util.MethodHandler;
028    import com.liferay.portal.kernel.util.Validator;
029    import com.liferay.portal.security.pacl.PACLClassLoaderUtil;
030    
031    import java.util.ArrayList;
032    import java.util.Collections;
033    import java.util.List;
034    import java.util.concurrent.CountDownLatch;
035    
036    import org.jgroups.Channel;
037    import org.jgroups.Message;
038    import org.jgroups.View;
039    
040    /**
041     * @author Michael C. Han
042     * @author Tina Tian
043     */
044    public class ClusterRequestReceiver extends BaseReceiver {
045    
046            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
047                    _countDownLatch = new CountDownLatch(1);
048                    _clusterExecutorImpl = clusterExecutorImpl;
049            }
050    
051            public void openLatch() {
052                    _countDownLatch.countDown();
053            }
054    
055            @Override
056            public void receive(Message message) {
057                    try {
058                            _countDownLatch.await();
059                    }
060                    catch (InterruptedException ie) {
061                            _log.error(
062                                    "Latch opened prematurely by interruption. Dependence may " +
063                                            "not be ready.");
064                    }
065    
066                    Object obj = message.getObject();
067    
068                    if (obj == null) {
069                            if (_log.isWarnEnabled()) {
070                                    _log.warn("Message content is null");
071                            }
072    
073                            return;
074                    }
075    
076                    Address sourceAddress = new AddressImpl(message.getSrc());
077    
078                    if (sourceAddress.equals(
079                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
080    
081                            boolean isProcessed = processLocalMessage(obj);
082    
083                            if (isProcessed) {
084                                    return;
085                            }
086                    }
087    
088                    if (obj instanceof ClusterRequest) {
089                            ClusterRequest clusterRequest = (ClusterRequest)obj;
090    
091                            processClusterRequest(clusterRequest, sourceAddress);
092                    }
093                    else if (obj instanceof ClusterNodeResponse) {
094                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
095    
096                            processClusterResponse(clusterNodeResponse, sourceAddress);
097                    }
098                    else if (_log.isWarnEnabled()) {
099                            _log.warn(
100                                    "Unable to process message content of type " + obj.getClass());
101                    }
102            }
103    
104            @Override
105            public void viewAccepted(View view) {
106                    super.viewAccepted(view);
107    
108                    if (_lastView == null) {
109                            _lastView = view;
110    
111                            return;
112                    }
113    
114                    List<Address> departAddresses = getDepartAddresses(view);
115                    List<Address> newAddresses = getNewAddresses(view);
116    
117                    _lastView = view;
118    
119                    try {
120                            _countDownLatch.await();
121                    }
122                    catch (InterruptedException ie) {
123                            _log.error(
124                                    "Latch opened prematurely by interruption. Dependence may " +
125                                            "not be ready.");
126                    }
127    
128                    if (!newAddresses.isEmpty()) {
129                            _clusterExecutorImpl.sendNotifyRequest();
130                    }
131    
132                    if (!departAddresses.isEmpty()) {
133                            _clusterExecutorImpl.memberRemoved(departAddresses);
134                    }
135            }
136    
137            protected List<Address> getDepartAddresses(View view) {
138                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
139                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
140    
141                    List<org.jgroups.Address> departJGroupsAddresses =
142                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
143    
144                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
145    
146                    if (departJGroupsAddresses.isEmpty()) {
147                            return Collections.emptyList();
148                    }
149    
150                    List<Address> departAddresses = new ArrayList<Address>(
151                            departJGroupsAddresses.size());
152    
153                    for (org.jgroups.Address departJGroupsAddress :
154                                    departJGroupsAddresses) {
155    
156                            Address departAddress = new AddressImpl(departJGroupsAddress);
157    
158                            departAddresses.add(departAddress);
159                    }
160    
161                    return departAddresses;
162            }
163    
164            protected List<Address> getNewAddresses(View view) {
165                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
166                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
167    
168                    List<org.jgroups.Address> newJGroupsAddresses =
169                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
170    
171                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
172    
173                    if (newJGroupsAddresses.isEmpty()) {
174                            return Collections.emptyList();
175                    }
176    
177                    List<Address> newAddresses = new ArrayList<Address>(
178                            newJGroupsAddresses.size());
179    
180                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
181                            Address newAddress = new AddressImpl(newJGroupsAddress);
182    
183                            newAddresses.add(newAddress);
184                    }
185    
186                    return newAddresses;
187            }
188    
189            protected void handleResponse(
190                    Address address, ClusterRequest clusterRequest, Object returnValue,
191                    Exception exception) {
192    
193                    ClusterNodeResponse clusterNodeResponse =
194                            _clusterExecutorImpl.generateClusterNodeResponse(
195                                    clusterRequest, returnValue, exception);
196    
197                    Channel channel = _clusterExecutorImpl.getControlChannel();
198    
199                    try {
200                            channel.send(
201                                    (org.jgroups.Address)address.getRealAddress(),
202                                    clusterNodeResponse);
203                    }
204                    catch (Exception e) {
205                            _log.error(
206                                    "Unable to send response message " + clusterNodeResponse, e);
207                    }
208                    catch (Throwable t) {
209                            _log.error(t, t);
210                    }
211            }
212    
213            protected Object invoke(
214                            String servletContextName, String beanIdentifier,
215                            MethodHandler methodHandler)
216                    throws Exception {
217    
218                    if (servletContextName == null) {
219                            if (Validator.isNull(beanIdentifier)) {
220                                    return methodHandler.invoke(true);
221                            }
222                            else {
223                                    Object bean = PortalBeanLocatorUtil.locate(beanIdentifier);
224    
225                                    return methodHandler.invoke(bean);
226                            }
227                    }
228    
229                    ClassLoader contextClassLoader =
230                            PACLClassLoaderUtil.getContextClassLoader();
231    
232                    try {
233                            ClassLoader classLoader =
234                                    (ClassLoader)PortletBeanLocatorUtil.locate(
235                                            servletContextName, "portletClassLoader");
236    
237                            PACLClassLoaderUtil.setContextClassLoader(classLoader);
238    
239                            if (Validator.isNull(beanIdentifier)) {
240                                    return methodHandler.invoke(true);
241                            }
242                            else {
243                                    Object bean = PortletBeanLocatorUtil.locate(
244                                            servletContextName, beanIdentifier);
245    
246                                    return methodHandler.invoke(bean);
247                            }
248                    }
249                    finally {
250                            PACLClassLoaderUtil.setContextClassLoader(contextClassLoader);
251                    }
252            }
253    
254            protected void processClusterRequest(
255                    ClusterRequest clusterRequest, Address sourceAddress) {
256    
257                    ClusterMessageType clusterMessageType =
258                            clusterRequest.getClusterMessageType();
259    
260                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
261                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
262    
263                            _clusterExecutorImpl.memberJoined(
264                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
265    
266                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
267                                    handleResponse(sourceAddress, clusterRequest, null, null);
268                            }
269    
270                            return;
271                    }
272    
273                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
274    
275                    Object returnValue = null;
276                    Exception exception = null;
277    
278                    if (methodHandler != null) {
279                            try {
280                                    ClusterInvokeThreadLocal.setEnabled(false);
281    
282                                    returnValue = invoke(
283                                            clusterRequest.getServletContextName(),
284                                            clusterRequest.getBeanIdentifier(), methodHandler);
285                            }
286                            catch (Exception e) {
287                                    exception = e;
288    
289                                    _log.error("Unable to invoke method " + methodHandler, e);
290                            }
291                            finally {
292                                    ClusterInvokeThreadLocal.setEnabled(true);
293                            }
294                    }
295                    else {
296                            exception = new ClusterException(
297                                    "Payload is not of type " + MethodHandler.class.getName());
298                    }
299    
300                    if (!clusterRequest.isFireAndForget()) {
301                            handleResponse(
302                                    sourceAddress, clusterRequest, returnValue, exception);
303                    }
304            }
305    
306            protected void processClusterResponse(
307                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
308    
309                    ClusterMessageType clusterMessageType =
310                            clusterNodeResponse.getClusterMessageType();
311    
312                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
313                            _clusterExecutorImpl.memberJoined(
314                                    sourceAddress, clusterNodeResponse.getClusterNode());
315    
316                            return;
317                    }
318    
319                    String uuid = clusterNodeResponse.getUuid();
320    
321                    FutureClusterResponses futureClusterResponses =
322                            _clusterExecutorImpl.getExecutionResults(uuid);
323    
324                    if (futureClusterResponses == null) {
325                            if (_log.isInfoEnabled()) {
326                                    _log.info("Unable to find response container for " + uuid);
327                            }
328    
329                            return;
330                    }
331    
332                    if (futureClusterResponses.expectsReply(sourceAddress)) {
333                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
334                    }
335                    else {
336                            if (_log.isWarnEnabled()) {
337                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
338                            }
339                    }
340            }
341    
342            protected boolean processLocalMessage(Object message) {
343                    if (message instanceof ClusterRequest) {
344                            ClusterRequest clusterRequest = (ClusterRequest)message;
345    
346                            if (clusterRequest.isSkipLocal()) {
347                                    return true;
348                            }
349                    }
350    
351                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
352                            return true;
353                    }
354    
355                    return false;
356            }
357    
358            private static Log _log = LogFactoryUtil.getLog(
359                    ClusterRequestReceiver.class);
360    
361            private ClusterExecutorImpl _clusterExecutorImpl;
362            private CountDownLatch _countDownLatch;
363            private volatile View _lastView;
364    
365    }