001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
029 import com.liferay.portal.kernel.util.MethodHandler;
030
031 import java.util.ArrayList;
032 import java.util.Collections;
033 import java.util.List;
034
035 import org.jgroups.Channel;
036 import org.jgroups.Message;
037 import org.jgroups.View;
038
039
043 public class ClusterRequestReceiver extends BaseReceiver {
044
045 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
046 _clusterExecutorImpl = clusterExecutorImpl;
047 }
048
049 @Override
050 protected void doReceive(Message message) {
051 Object obj = message.getObject();
052
053 if (obj == null) {
054 if (_log.isWarnEnabled()) {
055 _log.warn("Message content is null");
056 }
057
058 return;
059 }
060
061 Address sourceAddress = new AddressImpl(message.getSrc());
062
063 if (sourceAddress.equals(
064 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
065
066 boolean isProcessed = processLocalMessage(obj);
067
068 if (isProcessed) {
069 return;
070 }
071 }
072
073 try {
074 if (obj instanceof ClusterRequest) {
075 ClusterRequest clusterRequest = (ClusterRequest)obj;
076
077 processClusterRequest(clusterRequest, sourceAddress);
078 }
079 else if (obj instanceof ClusterNodeResponse) {
080 ClusterNodeResponse clusterNodeResponse =
081 (ClusterNodeResponse)obj;
082
083 processClusterResponse(clusterNodeResponse, sourceAddress);
084 }
085 else if (_log.isWarnEnabled()) {
086 _log.warn(
087 "Unable to process message content of type " +
088 obj.getClass());
089 }
090 }
091 finally {
092 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
093
094 CentralizedThreadLocal.clearShortLivedThreadLocals();
095 }
096 }
097
098 @Override
099 protected void doViewAccepted(View oldView, View newView) {
100 List<Address> departAddresses = getDepartAddresses(oldView, newView);
101 List<Address> newAddresses = getNewAddresses(oldView, newView);
102
103 if (!newAddresses.isEmpty()) {
104 _clusterExecutorImpl.sendNotifyRequest();
105 }
106
107 if (!departAddresses.isEmpty()) {
108 _clusterExecutorImpl.memberRemoved(departAddresses);
109 }
110 }
111
112 protected List<Address> getDepartAddresses(View oldView, View newView) {
113 List<org.jgroups.Address> currentJGroupsAddresses =
114 newView.getMembers();
115 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
116
117 List<org.jgroups.Address> departJGroupsAddresses =
118 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
119
120 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
121
122 if (departJGroupsAddresses.isEmpty()) {
123 return Collections.emptyList();
124 }
125
126 List<Address> departAddresses = new ArrayList<Address>(
127 departJGroupsAddresses.size());
128
129 for (org.jgroups.Address departJGroupsAddress :
130 departJGroupsAddresses) {
131
132 Address departAddress = new AddressImpl(departJGroupsAddress);
133
134 departAddresses.add(departAddress);
135 }
136
137 return departAddresses;
138 }
139
140 protected List<Address> getNewAddresses(View oldView, View newView) {
141 List<org.jgroups.Address> currentJGroupsAddresses =
142 newView.getMembers();
143 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
144
145 List<org.jgroups.Address> newJGroupsAddresses =
146 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
147
148 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
149
150 if (newJGroupsAddresses.isEmpty()) {
151 return Collections.emptyList();
152 }
153
154 List<Address> newAddresses = new ArrayList<Address>(
155 newJGroupsAddresses.size());
156
157 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
158 Address newAddress = new AddressImpl(newJGroupsAddress);
159
160 newAddresses.add(newAddress);
161 }
162
163 return newAddresses;
164 }
165
166 protected void handleResponse(
167 Address address, ClusterRequest clusterRequest, Object returnValue,
168 Exception exception) {
169
170 ClusterNodeResponse clusterNodeResponse =
171 _clusterExecutorImpl.generateClusterNodeResponse(
172 clusterRequest, returnValue, exception);
173
174 Channel channel = _clusterExecutorImpl.getControlChannel();
175
176 try {
177 channel.send(
178 (org.jgroups.Address)address.getRealAddress(),
179 clusterNodeResponse);
180 }
181 catch (Exception e) {
182 _log.error(
183 "Unable to send response message " + clusterNodeResponse, e);
184 }
185 catch (Throwable t) {
186 _log.error(t, t);
187 }
188 }
189
190 protected void processClusterRequest(
191 ClusterRequest clusterRequest, Address sourceAddress) {
192
193 ClusterMessageType clusterMessageType =
194 clusterRequest.getClusterMessageType();
195
196 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
197 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
198
199 _clusterExecutorImpl.memberJoined(
200 sourceAddress, clusterRequest.getOriginatingClusterNode());
201
202 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
203 handleResponse(sourceAddress, clusterRequest, null, null);
204 }
205
206 return;
207 }
208
209 MethodHandler methodHandler = clusterRequest.getMethodHandler();
210
211 Object returnValue = null;
212 Exception exception = null;
213
214 if (methodHandler != null) {
215 try {
216 ClusterInvokeThreadLocal.setEnabled(false);
217
218 returnValue = methodHandler.invoke();
219 }
220 catch (Exception e) {
221 exception = e;
222
223 _log.error("Unable to invoke method " + methodHandler, e);
224 }
225 finally {
226 ClusterInvokeThreadLocal.setEnabled(true);
227 }
228 }
229 else {
230 exception = new ClusterException(
231 "Payload is not of type " + MethodHandler.class.getName());
232 }
233
234 if (!clusterRequest.isFireAndForget()) {
235 handleResponse(
236 sourceAddress, clusterRequest, returnValue, exception);
237 }
238 }
239
240 protected void processClusterResponse(
241 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
242
243 ClusterMessageType clusterMessageType =
244 clusterNodeResponse.getClusterMessageType();
245
246 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
247 _clusterExecutorImpl.memberJoined(
248 sourceAddress, clusterNodeResponse.getClusterNode());
249
250 return;
251 }
252
253 String uuid = clusterNodeResponse.getUuid();
254
255 FutureClusterResponses futureClusterResponses =
256 _clusterExecutorImpl.getExecutionResults(uuid);
257
258 if (futureClusterResponses == null) {
259 if (_log.isInfoEnabled()) {
260 _log.info("Unable to find response container for " + uuid);
261 }
262
263 return;
264 }
265
266 if (futureClusterResponses.expectsReply(sourceAddress)) {
267 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
268 }
269 else {
270 if (_log.isWarnEnabled()) {
271 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
272 }
273 }
274 }
275
276 protected boolean processLocalMessage(Object message) {
277 if (message instanceof ClusterRequest) {
278 ClusterRequest clusterRequest = (ClusterRequest)message;
279
280 if (clusterRequest.isSkipLocal()) {
281 return true;
282 }
283 }
284
285 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
286 return true;
287 }
288
289 return false;
290 }
291
292 private static final Log _log = LogFactoryUtil.getLog(
293 ClusterRequestReceiver.class);
294
295 private final ClusterExecutorImpl _clusterExecutorImpl;
296
297 }