001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
018    import com.liferay.portal.kernel.cluster.Address;
019    import com.liferay.portal.kernel.cluster.ClusterException;
020    import com.liferay.portal.kernel.cluster.ClusterMessageType;
021    import com.liferay.portal.kernel.cluster.ClusterNode;
022    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
023    import com.liferay.portal.kernel.cluster.ClusterRequest;
024    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
025    import com.liferay.portal.kernel.log.Log;
026    import com.liferay.portal.kernel.log.LogFactoryUtil;
027    import com.liferay.portal.kernel.util.MethodInvoker;
028    import com.liferay.portal.kernel.util.MethodWrapper;
029    
030    import java.io.Serializable;
031    
032    import java.util.ArrayList;
033    import java.util.Iterator;
034    import java.util.List;
035    import java.util.Vector;
036    
037    import org.jgroups.Channel;
038    import org.jgroups.ChannelException;
039    import org.jgroups.Message;
040    import org.jgroups.View;
041    
042    /**
043     * @author Michael C. Han
044     * @author Tina Tian
045     */
046    public class ClusterRequestReceiver extends BaseReceiver {
047    
048            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
049                    _clusterExecutorImpl = clusterExecutorImpl;
050            }
051    
052            public void receive(Message message) {
053                    org.jgroups.Address sourceAddress = message.getSrc();
054    
055                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
056    
057                    org.jgroups.Address localAddress = controlChannel.getAddress();
058    
059                    Object obj = message.getObject();
060    
061                    if (obj == null) {
062                            if (_log.isWarnEnabled()) {
063                                    _log.warn("Message content is null");
064                            }
065    
066                            return;
067                    }
068    
069                    if (localAddress.equals(sourceAddress)) {
070                            boolean isProcessed = processLocalMessage(obj, sourceAddress);
071    
072                            if (isProcessed) {
073                                    return;
074                            }
075                    }
076    
077                    if (obj instanceof ClusterRequest) {
078                            ClusterRequest clusterRequest = (ClusterRequest)obj;
079    
080                            processClusterRequest(clusterRequest, sourceAddress, localAddress);
081                    }
082                    else if (obj instanceof ClusterNodeResponse) {
083                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
084    
085                            processClusterResponse(
086                                    clusterNodeResponse, sourceAddress, localAddress);
087                    }
088                    else {
089                            if (_log.isWarnEnabled()) {
090                                    _log.warn(
091                                            "Unable to process message content of type " +
092                                                    obj.getClass().getName());
093                            }
094                    }
095            }
096    
097            public void viewAccepted(View view) {
098                    if (_log.isDebugEnabled()) {
099                            _log.debug("Accepted view " + view);
100                    }
101    
102                    if (_lastView == null) {
103                            _lastView = view;
104    
105                            return;
106                    }
107    
108                    List<Address> departAddresses = getDepartAddresses(view);
109    
110                    _lastView = view;
111    
112                    if (departAddresses.isEmpty()) {
113                            return;
114                    }
115    
116                    _clusterExecutorImpl.memberRemoved(departAddresses);
117            }
118    
119            protected Object invoke(
120                            String servletContextName, MethodWrapper methodWrapper)
121                    throws Exception {
122    
123                    if (servletContextName == null) {
124                            return MethodInvoker.invoke(methodWrapper);
125                    }
126    
127                    Thread currentThread = Thread.currentThread();
128    
129                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
130    
131                    try {
132                            ClassLoader classLoader =
133                                    (ClassLoader)PortletBeanLocatorUtil.locate(
134                                            servletContextName, "portletClassLoader");
135    
136                            currentThread.setContextClassLoader(classLoader);
137    
138                            return MethodInvoker.invoke(methodWrapper);
139                    }
140                    catch(Exception e) {
141                            throw e;
142                    }
143                    finally {
144                            currentThread.setContextClassLoader(contextClassLoader);
145                    }
146            }
147    
148            protected List<Address> getDepartAddresses(View view) {
149                    List<Address> departAddresses = new ArrayList<Address>();
150    
151                    Vector<org.jgroups.Address> jGroupsAddresses = view.getMembers();
152                    Vector<org.jgroups.Address> lastJGroupsAddresses =
153                            _lastView.getMembers();
154    
155                    List<org.jgroups.Address> tempAddresses =
156                            new ArrayList<org.jgroups.Address>(jGroupsAddresses.size());
157    
158                    tempAddresses.addAll(jGroupsAddresses);
159    
160                    List<org.jgroups.Address> lastAddresses =
161                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses.size());
162    
163                    lastAddresses.addAll(lastJGroupsAddresses);
164    
165                    tempAddresses.retainAll(lastJGroupsAddresses);
166                    lastAddresses.removeAll(tempAddresses);
167    
168                    if (!lastAddresses.isEmpty()) {
169                            Iterator<org.jgroups.Address> itr = lastAddresses.iterator();
170    
171                            while (itr.hasNext()) {
172                                    departAddresses.add(new AddressImpl(itr.next()));
173                            }
174                    }
175    
176                    return departAddresses;
177            }
178    
179            protected void processClusterRequest(
180                    ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
181                    org.jgroups.Address localAddress) {
182    
183                    ClusterMessageType clusterMessageType =
184                            clusterRequest.getClusterMessageType();
185    
186                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
187    
188                    try {
189                            ClusterNode localClusterNode =
190                                    _clusterExecutorImpl.getLocalClusterNode();
191    
192                            clusterNodeResponse.setClusterNode(localClusterNode);
193                    }
194                    catch (Exception e) {
195                            clusterNodeResponse.setException(e);
196                    }
197    
198                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
199                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
200    
201                            ClusterNode originatingClusterNode =
202                                    clusterRequest.getOriginatingClusterNode();
203    
204                            if (originatingClusterNode != null) {
205                                    _clusterExecutorImpl.memberJoined(
206                                            new AddressImpl(sourceAddress), originatingClusterNode);
207    
208                                    clusterNodeResponse.setClusterMessageType(clusterMessageType);
209                            }
210                            else {
211                                    if (_log.isWarnEnabled()) {
212                                            _log.warn(
213                                                    "Content of notify message does not contain cluster " +
214                                                            "node information");
215                                    }
216    
217                                    return;
218                            }
219                    }
220                    else {
221                            clusterNodeResponse.setClusterMessageType(
222                                    ClusterMessageType.EXECUTE);
223                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
224                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
225    
226                            MethodWrapper methodWrapper = clusterRequest.getMethodWrapper();
227    
228                            if (methodWrapper != null) {
229                                    try {
230                                            ClusterInvokeThreadLocal.setEnabled(false);
231    
232                                            Object returnValue = invoke(
233                                                    clusterRequest.getServletContextName(), methodWrapper);
234    
235                                            if (returnValue instanceof Serializable) {
236                                                    clusterNodeResponse.setResult(returnValue);
237                                            }
238                                            else if (returnValue != null) {
239                                                    clusterNodeResponse.setException(
240                                                            new ClusterException(
241                                                                    "Return value is not serializable"));
242                                            }
243                                    }
244                                    catch (Exception e) {
245                                            clusterNodeResponse.setException(e);
246    
247                                            _log.error("Failed to invoke method " + methodWrapper, e);
248                                    }
249                                    finally {
250                                            ClusterInvokeThreadLocal.setEnabled(true);
251                                    }
252                            }
253                            else {
254                                    clusterNodeResponse.setException(
255                                            new ClusterException(
256                                                    "Payload is not of type " +
257                                                            MethodWrapper.class.getName()));
258                            }
259                    }
260    
261                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
262    
263                    try {
264                            controlChannel.send(
265                                    sourceAddress, localAddress, clusterNodeResponse);
266                    }
267                    catch (ChannelException ce) {
268                            _log.error(
269                                    "Unable to send response message " + clusterNodeResponse, ce);
270                    }
271                    catch (Throwable t) {
272                            _log.error(t, t);
273                    }
274            }
275    
276            protected void processClusterResponse(
277                    ClusterNodeResponse clusterNodeResponse,
278                    org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
279    
280                    ClusterMessageType clusterMessageType =
281                            clusterNodeResponse.getClusterMessageType();
282    
283                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
284                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
285    
286                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
287    
288                            if (clusterNode != null) {
289                                    Address joinAddress = new AddressImpl(sourceAddress);
290    
291                                    _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
292                            }
293                            else {
294                                    if (_log.isWarnEnabled()) {
295                                            _log.warn(
296                                                    "Response of notify message does not contain cluster " +
297                                                            "node information");
298                                    }
299                            }
300    
301                            return;
302                    }
303    
304                    String uuid = clusterNodeResponse.getUuid();
305    
306                    FutureClusterResponses futureClusterResponses =
307                            _clusterExecutorImpl.getExecutionResults(uuid);
308    
309                    if (futureClusterResponses == null) {
310                            if (_log.isInfoEnabled()) {
311                                    _log.info("Unable to find response container for " + uuid);
312                            }
313    
314                            return;
315                    }
316    
317                    Address address = new AddressImpl(sourceAddress);
318    
319                    if (futureClusterResponses.expectsReply(address)) {
320                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
321                    }
322                    else {
323                            if (_log.isWarnEnabled()) {
324                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
325                            }
326                    }
327            }
328    
329            protected boolean processLocalMessage(
330                    Object message, org.jgroups.Address sourceAddress) {
331    
332                    if (message instanceof ClusterRequest) {
333                            ClusterRequest clusterRequest = (ClusterRequest)message;
334    
335                            if (clusterRequest.isSkipLocal()) {
336                                    return true;
337                            }
338    
339                            ClusterMessageType clusterMessageType =
340                                    clusterRequest.getClusterMessageType();
341    
342                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
343                                    clusterMessageType.equals(ClusterMessageType.UPDATE)) {
344    
345                                    ClusterNode originatingClusterNode =
346                                            clusterRequest.getOriginatingClusterNode();
347    
348                                    if (originatingClusterNode != null) {
349                                            Address joinAddress = new AddressImpl(sourceAddress);
350    
351                                            _clusterExecutorImpl.memberJoined(
352                                                    joinAddress, originatingClusterNode);
353                                    }
354                                    else {
355                                            if (_log.isWarnEnabled()) {
356                                                    _log.warn(
357                                                            "Content of notify message does not contain " +
358                                                                    "cluster node information");
359                                            }
360                                    }
361    
362                                    return true;
363                            }
364                    }
365    
366                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
367                            return true;
368                    }
369    
370                    return false;
371            }
372    
373            private static Log _log = LogFactoryUtil.getLog(
374                    ClusterRequestReceiver.class);
375    
376            private ClusterExecutorImpl _clusterExecutorImpl;
377            private View _lastView;
378    
379    }