001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterMessageType;
022 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
023 import com.liferay.portal.kernel.cluster.ClusterRequest;
024 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
025 import com.liferay.portal.kernel.log.Log;
026 import com.liferay.portal.kernel.log.LogFactoryUtil;
027 import com.liferay.portal.kernel.util.MethodHandler;
028 import com.liferay.portal.kernel.util.Validator;
029 import com.liferay.portal.security.pacl.PACLClassLoaderUtil;
030
031 import java.util.ArrayList;
032 import java.util.Collections;
033 import java.util.List;
034 import java.util.concurrent.CountDownLatch;
035
036 import org.jgroups.Channel;
037 import org.jgroups.Message;
038 import org.jgroups.View;
039
040
044 public class ClusterRequestReceiver extends BaseReceiver {
045
046 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
047 _countDownLatch = new CountDownLatch(1);
048 _clusterExecutorImpl = clusterExecutorImpl;
049 }
050
051 public void openLatch() {
052 _countDownLatch.countDown();
053 }
054
055 @Override
056 public void receive(Message message) {
057 try {
058 _countDownLatch.await();
059 }
060 catch (InterruptedException ie) {
061 _log.error(
062 "Latch opened prematurely by interruption. Dependence may " +
063 "not be ready.");
064 }
065
066 Object obj = message.getObject();
067
068 if (obj == null) {
069 if (_log.isWarnEnabled()) {
070 _log.warn("Message content is null");
071 }
072
073 return;
074 }
075
076 Address sourceAddress = new AddressImpl(message.getSrc());
077
078 if (sourceAddress.equals(
079 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
080
081 boolean isProcessed = processLocalMessage(obj);
082
083 if (isProcessed) {
084 return;
085 }
086 }
087
088 if (obj instanceof ClusterRequest) {
089 ClusterRequest clusterRequest = (ClusterRequest)obj;
090
091 processClusterRequest(clusterRequest, sourceAddress);
092 }
093 else if (obj instanceof ClusterNodeResponse) {
094 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
095
096 processClusterResponse(clusterNodeResponse, sourceAddress);
097 }
098 else if (_log.isWarnEnabled()) {
099 _log.warn(
100 "Unable to process message content of type " + obj.getClass());
101 }
102 }
103
104 @Override
105 public void viewAccepted(View view) {
106 super.viewAccepted(view);
107
108 if (_lastView == null) {
109 _lastView = view;
110
111 return;
112 }
113
114 List<Address> departAddresses = getDepartAddresses(view);
115 List<Address> newAddresses = getNewAddresses(view);
116
117 _lastView = view;
118
119 try {
120 _countDownLatch.await();
121 }
122 catch (InterruptedException ie) {
123 _log.error(
124 "Latch opened prematurely by interruption. Dependence may " +
125 "not be ready.");
126 }
127
128 if (!newAddresses.isEmpty()) {
129 _clusterExecutorImpl.sendNotifyRequest();
130 }
131
132 if (!departAddresses.isEmpty()) {
133 _clusterExecutorImpl.memberRemoved(departAddresses);
134 }
135 }
136
137 protected List<Address> getDepartAddresses(View view) {
138 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
139 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
140
141 List<org.jgroups.Address> departJGroupsAddresses =
142 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
143
144 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
145
146 if (departJGroupsAddresses.isEmpty()) {
147 return Collections.emptyList();
148 }
149
150 List<Address> departAddresses = new ArrayList<Address>(
151 departJGroupsAddresses.size());
152
153 for (org.jgroups.Address departJGroupsAddress :
154 departJGroupsAddresses) {
155
156 Address departAddress = new AddressImpl(departJGroupsAddress);
157
158 departAddresses.add(departAddress);
159 }
160
161 return departAddresses;
162 }
163
164 protected List<Address> getNewAddresses(View view) {
165 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
166 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
167
168 List<org.jgroups.Address> newJGroupsAddresses =
169 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
170
171 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
172
173 if (newJGroupsAddresses.isEmpty()) {
174 return Collections.emptyList();
175 }
176
177 List<Address> newAddresses = new ArrayList<Address>(
178 newJGroupsAddresses.size());
179
180 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
181 Address newAddress = new AddressImpl(newJGroupsAddress);
182
183 newAddresses.add(newAddress);
184 }
185
186 return newAddresses;
187 }
188
189 protected void handleResponse(
190 Address address, ClusterRequest clusterRequest, Object returnValue,
191 Exception exception) {
192
193 ClusterNodeResponse clusterNodeResponse =
194 _clusterExecutorImpl.generateClusterNodeResponse(
195 clusterRequest, returnValue, exception);
196
197 Channel channel = _clusterExecutorImpl.getControlChannel();
198
199 try {
200 channel.send(
201 (org.jgroups.Address)address.getRealAddress(),
202 clusterNodeResponse);
203 }
204 catch (Exception e) {
205 _log.error(
206 "Unable to send response message " + clusterNodeResponse, e);
207 }
208 catch (Throwable t) {
209 _log.error(t, t);
210 }
211 }
212
213 protected Object invoke(
214 String servletContextName, String beanIdentifier,
215 MethodHandler methodHandler)
216 throws Exception {
217
218 if (servletContextName == null) {
219 if (Validator.isNull(beanIdentifier)) {
220 return methodHandler.invoke(true);
221 }
222 else {
223 Object bean = PortalBeanLocatorUtil.locate(beanIdentifier);
224
225 return methodHandler.invoke(bean);
226 }
227 }
228
229 ClassLoader contextClassLoader =
230 PACLClassLoaderUtil.getContextClassLoader();
231
232 try {
233 ClassLoader classLoader =
234 (ClassLoader)PortletBeanLocatorUtil.locate(
235 servletContextName, "portletClassLoader");
236
237 PACLClassLoaderUtil.setContextClassLoader(classLoader);
238
239 if (Validator.isNull(beanIdentifier)) {
240 return methodHandler.invoke(true);
241 }
242 else {
243 Object bean = PortletBeanLocatorUtil.locate(
244 servletContextName, beanIdentifier);
245
246 return methodHandler.invoke(bean);
247 }
248 }
249 finally {
250 PACLClassLoaderUtil.setContextClassLoader(contextClassLoader);
251 }
252 }
253
254 protected void processClusterRequest(
255 ClusterRequest clusterRequest, Address sourceAddress) {
256
257 ClusterMessageType clusterMessageType =
258 clusterRequest.getClusterMessageType();
259
260 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
261 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
262
263 _clusterExecutorImpl.memberJoined(
264 sourceAddress, clusterRequest.getOriginatingClusterNode());
265
266 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
267 handleResponse(sourceAddress, clusterRequest, null, null);
268 }
269
270 return;
271 }
272
273 MethodHandler methodHandler = clusterRequest.getMethodHandler();
274
275 Object returnValue = null;
276 Exception exception = null;
277
278 if (methodHandler != null) {
279 try {
280 ClusterInvokeThreadLocal.setEnabled(false);
281
282 returnValue = invoke(
283 clusterRequest.getServletContextName(),
284 clusterRequest.getBeanIdentifier(), methodHandler);
285 }
286 catch (Exception e) {
287 exception = e;
288
289 _log.error("Unable to invoke method " + methodHandler, e);
290 }
291 finally {
292 ClusterInvokeThreadLocal.setEnabled(true);
293 }
294 }
295 else {
296 exception = new ClusterException(
297 "Payload is not of type " + MethodHandler.class.getName());
298 }
299
300 if (!clusterRequest.isFireAndForget()) {
301 handleResponse(
302 sourceAddress, clusterRequest, returnValue, exception);
303 }
304 }
305
306 protected void processClusterResponse(
307 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
308
309 ClusterMessageType clusterMessageType =
310 clusterNodeResponse.getClusterMessageType();
311
312 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
313 _clusterExecutorImpl.memberJoined(
314 sourceAddress, clusterNodeResponse.getClusterNode());
315
316 return;
317 }
318
319 String uuid = clusterNodeResponse.getUuid();
320
321 FutureClusterResponses futureClusterResponses =
322 _clusterExecutorImpl.getExecutionResults(uuid);
323
324 if (futureClusterResponses == null) {
325 if (_log.isInfoEnabled()) {
326 _log.info("Unable to find response container for " + uuid);
327 }
328
329 return;
330 }
331
332 if (futureClusterResponses.expectsReply(sourceAddress)) {
333 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
334 }
335 else {
336 if (_log.isWarnEnabled()) {
337 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
338 }
339 }
340 }
341
342 protected boolean processLocalMessage(Object message) {
343 if (message instanceof ClusterRequest) {
344 ClusterRequest clusterRequest = (ClusterRequest)message;
345
346 if (clusterRequest.isSkipLocal()) {
347 return true;
348 }
349 }
350
351 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
352 return true;
353 }
354
355 return false;
356 }
357
358 private static Log _log = LogFactoryUtil.getLog(
359 ClusterRequestReceiver.class);
360
361 private ClusterExecutorImpl _clusterExecutorImpl;
362 private CountDownLatch _countDownLatch;
363 private volatile View _lastView;
364
365 }