1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.kernel.cluster.Address;
18  import com.liferay.portal.kernel.cluster.ClusterException;
19  import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
20  import com.liferay.portal.kernel.cluster.ClusterRequest;
21  import com.liferay.portal.kernel.cluster.ClusterResponse;
22  import com.liferay.portal.kernel.log.Log;
23  import com.liferay.portal.kernel.log.LogFactoryUtil;
24  import com.liferay.portal.kernel.util.MethodInvoker;
25  import com.liferay.portal.kernel.util.MethodWrapper;
26  
27  import java.io.Serializable;
28  
29  import java.util.Map;
30  import java.util.concurrent.Future;
31  
32  import org.jgroups.ChannelException;
33  import org.jgroups.JChannel;
34  import org.jgroups.Message;
35  import org.jgroups.ReceiverAdapter;
36  import org.jgroups.View;
37  
38  /**
39   * <a href="ClusterInvokeReceiver.java.html"><b><i>View Source</i>
40   * </b></a>
41   *
42   * @author Shuyang Zhou
43   * @author Tina Tian
44   */
45  public class ClusterInvokeReceiver extends ReceiverAdapter {
46  
47      public ClusterInvokeReceiver(
48          Map<String, Map<Address, Future<?>>> multicastResultMap,
49          Map<String, Future<?>> unicastResultMap) {
50  
51          _multicastResultMap = multicastResultMap;
52          _unicastResultMap = unicastResultMap;
53      }
54  
55      public void receive(Message message) {
56          org.jgroups.Address sourceAddress = message.getSrc();
57          org.jgroups.Address localAddress = _channel.getLocalAddress();
58  
59          Object obj = message.getObject();
60  
61          if (obj == null) {
62              if (_log.isWarnEnabled()) {
63                  _log.warn("Message content is null");
64              }
65  
66              return;
67          }
68  
69          if (localAddress.equals(sourceAddress) &&
70              ClusterExecutorUtil.isShortcutLocalMethod()) {
71  
72              return;
73          }
74  
75          if (obj instanceof ClusterRequest) {
76              ClusterRequest clusterRequest = (ClusterRequest)obj;
77  
78              ClusterResponse clusterResponse = new ClusterResponseImpl();
79  
80              clusterResponse.setMulticast(clusterRequest.isMulticast());
81              clusterResponse.setUuid(clusterRequest.getUuid());
82  
83              Object payload = clusterRequest.getPayload();
84  
85              if (payload instanceof MethodWrapper) {
86                  MethodWrapper methodWrapper = (MethodWrapper)payload;
87  
88                  try {
89                      Object returnValue = MethodInvoker.invoke(methodWrapper);
90  
91                      if (returnValue instanceof Serializable) {
92                          clusterResponse.setResult(returnValue);
93                      }
94                      else if (returnValue != null) {
95                          clusterResponse.setException(
96                              new ClusterException(
97                                  "Return value is not serializable"));
98                      }
99                  }
100                 catch (Exception e) {
101                     clusterResponse.setException(e);
102                 }
103             }
104             else {
105                 clusterResponse.setException(
106                     new ClusterException(
107                         "Payload is not of type " +
108                             MethodWrapper.class.getName()));
109             }
110 
111             try {
112                 _channel.send(sourceAddress, localAddress, clusterResponse);
113             }
114             catch (ChannelException ce) {
115                 _log.error(
116                     "Unable to send response message " + clusterResponse, ce);
117             }
118         }
119         else if (obj instanceof ClusterResponse) {
120             ClusterResponse clusterResponse = (ClusterResponse)obj;
121 
122             String uuid = clusterResponse.getUuid();
123 
124             if (clusterResponse.isMulticast() &&
125                 _multicastResultMap.containsKey(uuid)) {
126 
127                 Map<Address, Future<?>> results = _multicastResultMap.get(uuid);
128 
129                 Address address = new AddressImpl(sourceAddress);
130 
131                 if (results.containsKey(address)) {
132                     FutureResult<Object> futureResult =
133                         (FutureResult<Object>)results.get(address);
134 
135                     if (clusterResponse.hasException()) {
136                         futureResult.setException(
137                             clusterResponse.getException());
138                     }
139                     else {
140                         futureResult.setResult(clusterResponse.getResult());
141                     }
142                 }
143                 else {
144                     _log.error("New node coming from " + sourceAddress);
145                 }
146             }
147             else if (_unicastResultMap.containsKey(uuid)) {
148                 FutureResult<Object> futureResult =
149                     (FutureResult<Object>)_unicastResultMap.get(uuid);
150 
151                 if (clusterResponse.hasException()) {
152                     futureResult.setException(clusterResponse.getException());
153                 }
154                 else {
155                     futureResult.setResult(clusterResponse.getResult());
156                 }
157             }
158             else {
159                 _log.error("Unknow UUID " + uuid + " from " + sourceAddress);
160             }
161         }
162         else {
163             if (_log.isWarnEnabled()) {
164                 _log.warn(
165                     "Unable to process message content of type " +
166                         obj.getClass().getName());
167             }
168 
169             return;
170         }
171     }
172 
173     public void setChannel(JChannel channel) {
174         _channel = channel;
175     }
176 
177     public void viewAccepted(View view) {
178         if (_log.isInfoEnabled()) {
179             _log.info("Accepted view " + view);
180         }
181     }
182 
183     private static Log _log = LogFactoryUtil.getLog(
184         ClusterInvokeReceiver.class);
185 
186     private JChannel _channel;
187     private Map<String, Map<Address, Future<?>>> _multicastResultMap;
188     private Map<String, Future<?>> _unicastResultMap;
189 
190 }