001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
029 import com.liferay.portal.kernel.util.MethodHandler;
030
031 import java.util.ArrayList;
032 import java.util.Collections;
033 import java.util.List;
034 import java.util.concurrent.CountDownLatch;
035
036 import org.jgroups.Channel;
037 import org.jgroups.Message;
038 import org.jgroups.View;
039
040
044 public class ClusterRequestReceiver extends BaseReceiver {
045
046 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
047 _countDownLatch = new CountDownLatch(1);
048 _clusterExecutorImpl = clusterExecutorImpl;
049 }
050
051 public void openLatch() {
052 _countDownLatch.countDown();
053 }
054
055 @Override
056 public void receive(Message message) {
057 try {
058 _countDownLatch.await();
059 }
060 catch (InterruptedException ie) {
061 _log.error(
062 "Latch opened prematurely by interruption. Dependence may " +
063 "not be ready.");
064 }
065
066 Object obj = message.getObject();
067
068 if (obj == null) {
069 if (_log.isWarnEnabled()) {
070 _log.warn("Message content is null");
071 }
072
073 return;
074 }
075
076 Address sourceAddress = new AddressImpl(message.getSrc());
077
078 if (sourceAddress.equals(
079 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
080
081 boolean isProcessed = processLocalMessage(obj);
082
083 if (isProcessed) {
084 return;
085 }
086 }
087
088 try {
089 if (obj instanceof ClusterRequest) {
090 ClusterRequest clusterRequest = (ClusterRequest)obj;
091
092 processClusterRequest(clusterRequest, sourceAddress);
093 }
094 else if (obj instanceof ClusterNodeResponse) {
095 ClusterNodeResponse clusterNodeResponse =
096 (ClusterNodeResponse)obj;
097
098 processClusterResponse(clusterNodeResponse, sourceAddress);
099 }
100 else if (_log.isWarnEnabled()) {
101 _log.warn(
102 "Unable to process message content of type " +
103 obj.getClass());
104 }
105 }
106 finally {
107 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
108
109 CentralizedThreadLocal.clearShortLivedThreadLocals();
110 }
111 }
112
113 @Override
114 public void viewAccepted(View view) {
115 super.viewAccepted(view);
116
117 if (_lastView == null) {
118 _lastView = view;
119
120 return;
121 }
122
123 List<Address> departAddresses = getDepartAddresses(view);
124 List<Address> newAddresses = getNewAddresses(view);
125
126 _lastView = view;
127
128 try {
129 _countDownLatch.await();
130 }
131 catch (InterruptedException ie) {
132 _log.error(
133 "Latch opened prematurely by interruption. Dependence may " +
134 "not be ready.");
135 }
136
137 if (!newAddresses.isEmpty()) {
138 _clusterExecutorImpl.sendNotifyRequest();
139 }
140
141 if (!departAddresses.isEmpty()) {
142 _clusterExecutorImpl.memberRemoved(departAddresses);
143 }
144 }
145
146 protected List<Address> getDepartAddresses(View view) {
147 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
148 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
149
150 List<org.jgroups.Address> departJGroupsAddresses =
151 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
152
153 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
154
155 if (departJGroupsAddresses.isEmpty()) {
156 return Collections.emptyList();
157 }
158
159 List<Address> departAddresses = new ArrayList<Address>(
160 departJGroupsAddresses.size());
161
162 for (org.jgroups.Address departJGroupsAddress :
163 departJGroupsAddresses) {
164
165 Address departAddress = new AddressImpl(departJGroupsAddress);
166
167 departAddresses.add(departAddress);
168 }
169
170 return departAddresses;
171 }
172
173 protected List<Address> getNewAddresses(View view) {
174 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
175 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
176
177 List<org.jgroups.Address> newJGroupsAddresses =
178 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
179
180 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
181
182 if (newJGroupsAddresses.isEmpty()) {
183 return Collections.emptyList();
184 }
185
186 List<Address> newAddresses = new ArrayList<Address>(
187 newJGroupsAddresses.size());
188
189 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
190 Address newAddress = new AddressImpl(newJGroupsAddress);
191
192 newAddresses.add(newAddress);
193 }
194
195 return newAddresses;
196 }
197
198 protected void handleResponse(
199 Address address, ClusterRequest clusterRequest, Object returnValue,
200 Exception exception) {
201
202 ClusterNodeResponse clusterNodeResponse =
203 _clusterExecutorImpl.generateClusterNodeResponse(
204 clusterRequest, returnValue, exception);
205
206 Channel channel = _clusterExecutorImpl.getControlChannel();
207
208 try {
209 channel.send(
210 (org.jgroups.Address)address.getRealAddress(),
211 clusterNodeResponse);
212 }
213 catch (Exception e) {
214 _log.error(
215 "Unable to send response message " + clusterNodeResponse, e);
216 }
217 catch (Throwable t) {
218 _log.error(t, t);
219 }
220 }
221
222 protected void processClusterRequest(
223 ClusterRequest clusterRequest, Address sourceAddress) {
224
225 ClusterMessageType clusterMessageType =
226 clusterRequest.getClusterMessageType();
227
228 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
229 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
230
231 _clusterExecutorImpl.memberJoined(
232 sourceAddress, clusterRequest.getOriginatingClusterNode());
233
234 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
235 handleResponse(sourceAddress, clusterRequest, null, null);
236 }
237
238 return;
239 }
240
241 MethodHandler methodHandler = clusterRequest.getMethodHandler();
242
243 Object returnValue = null;
244 Exception exception = null;
245
246 if (methodHandler != null) {
247 try {
248 ClusterInvokeThreadLocal.setEnabled(false);
249
250 returnValue = methodHandler.invoke();
251 }
252 catch (Exception e) {
253 exception = e;
254
255 _log.error("Unable to invoke method " + methodHandler, e);
256 }
257 finally {
258 ClusterInvokeThreadLocal.setEnabled(true);
259 }
260 }
261 else {
262 exception = new ClusterException(
263 "Payload is not of type " + MethodHandler.class.getName());
264 }
265
266 if (!clusterRequest.isFireAndForget()) {
267 handleResponse(
268 sourceAddress, clusterRequest, returnValue, exception);
269 }
270 }
271
272 protected void processClusterResponse(
273 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
274
275 ClusterMessageType clusterMessageType =
276 clusterNodeResponse.getClusterMessageType();
277
278 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
279 _clusterExecutorImpl.memberJoined(
280 sourceAddress, clusterNodeResponse.getClusterNode());
281
282 return;
283 }
284
285 String uuid = clusterNodeResponse.getUuid();
286
287 FutureClusterResponses futureClusterResponses =
288 _clusterExecutorImpl.getExecutionResults(uuid);
289
290 if (futureClusterResponses == null) {
291 if (_log.isInfoEnabled()) {
292 _log.info("Unable to find response container for " + uuid);
293 }
294
295 return;
296 }
297
298 if (futureClusterResponses.expectsReply(sourceAddress)) {
299 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
300 }
301 else {
302 if (_log.isWarnEnabled()) {
303 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
304 }
305 }
306 }
307
308 protected boolean processLocalMessage(Object message) {
309 if (message instanceof ClusterRequest) {
310 ClusterRequest clusterRequest = (ClusterRequest)message;
311
312 if (clusterRequest.isSkipLocal()) {
313 return true;
314 }
315 }
316
317 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
318 return true;
319 }
320
321 return false;
322 }
323
324 private static final Log _log = LogFactoryUtil.getLog(
325 ClusterRequestReceiver.class);
326
327 private final ClusterExecutorImpl _clusterExecutorImpl;
328 private final CountDownLatch _countDownLatch;
329 private volatile View _lastView;
330
331 }