001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.memory.FinalizeAction;
031 import com.liferay.portal.kernel.memory.FinalizeManager;
032 import com.liferay.portal.kernel.util.InetAddressUtil;
033 import com.liferay.portal.kernel.util.MethodInvoker;
034 import com.liferay.portal.kernel.util.MethodWrapper;
035 import com.liferay.portal.kernel.util.PropsKeys;
036 import com.liferay.portal.kernel.util.PropsUtil;
037 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038 import com.liferay.portal.util.PortalPortEventListener;
039 import com.liferay.portal.util.PortalUtil;
040 import com.liferay.portal.util.PropsValues;
041
042 import java.io.Serializable;
043
044 import java.lang.ref.Reference;
045
046 import java.net.InetAddress;
047
048 import java.util.ArrayList;
049 import java.util.Collection;
050 import java.util.Collections;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.Properties;
054 import java.util.concurrent.ConcurrentHashMap;
055 import java.util.concurrent.CopyOnWriteArrayList;
056
057 import org.jgroups.ChannelException;
058 import org.jgroups.JChannel;
059
060
064 public class ClusterExecutorImpl
065 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
066
067 public void addClusterEventListener(
068 ClusterEventListener clusterEventListener) {
069 if (!isEnabled()) {
070 return;
071 }
072
073 _clusterEventListeners.addIfAbsent(clusterEventListener);
074 }
075
076 public void afterPropertiesSet() {
077 super.afterPropertiesSet();
078
079 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
080 addClusterEventListener(new DebuggingClusterEventListenerImpl());
081 }
082 }
083
084 public void destroy() {
085 if (!isEnabled()) {
086 return;
087 }
088
089 _controlChannel.close();
090 }
091
092 public FutureClusterResponses execute(ClusterRequest clusterRequest)
093 throws SystemException {
094
095 if (!isEnabled()) {
096 return null;
097 }
098
099 List<Address> addresses = prepareAddresses(clusterRequest);
100
101 FutureClusterResponses futureClusterResponses =
102 new FutureClusterResponses(addresses);
103
104 if (!clusterRequest.isFireAndForget()) {
105 String uuid = clusterRequest.getUuid();
106
107 Reference<FutureClusterResponses> reference =
108 FinalizeManager.register(
109 futureClusterResponses,
110 new RemoveResultKeyFinalizeAction(uuid));
111
112 _executionResultMap.put(uuid, reference);
113 }
114
115 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
116 addresses.remove(getLocalControlAddress())) {
117
118 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
119 clusterRequest.getMethodWrapper());
120
121 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
122 clusterNodeResponse.setUuid(clusterRequest.getUuid());
123
124 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
125 }
126
127 if (clusterRequest.isMulticast()) {
128 sendMulticastRequest(clusterRequest);
129 }
130 else {
131 sendUnicastRequest(clusterRequest, addresses);
132 }
133
134 return futureClusterResponses;
135 }
136
137 public List<ClusterEventListener> getClusterEventListeners() {
138 if (!isEnabled()) {
139 return Collections.EMPTY_LIST;
140 }
141
142 return Collections.unmodifiableList(_clusterEventListeners);
143 }
144
145 public List<ClusterNode> getClusterNodes() {
146 if (!isEnabled()) {
147 return Collections.EMPTY_LIST;
148 }
149
150 return new ArrayList<ClusterNode>(_addressMap.values());
151 }
152
153 public ClusterNode getLocalClusterNode() throws SystemException {
154 if (!isEnabled()) {
155 return null;
156 }
157
158 ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
159
160 if (clusterNode == null) {
161 _localClusterNodeId = PortalUUIDUtil.generate();
162
163 clusterNode = new ClusterNode(_localClusterNodeId);
164
165 clusterNode.setPort(PortalUtil.getPortalPort());
166
167 try {
168 InetAddress inetAddress = bindInetAddress;
169
170 if (inetAddress == null) {
171 inetAddress = InetAddressUtil.getLocalInetAddress();
172 }
173
174 clusterNode.setInetAddress(inetAddress);
175
176 clusterNode.setHostName(inetAddress.getHostName());
177 }
178 catch (Exception e) {
179 throw new SystemException(
180 "Unable to determine local network address", e);
181 }
182 }
183
184 return clusterNode;
185 }
186
187 public void initialize() {
188 if (!isEnabled()) {
189 return;
190 }
191
192 try {
193 PortalUtil.addPortalPortEventListener(this);
194
195 ClusterNode clusterNode = getLocalClusterNode();
196
197 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
198 ClusterMessageType.NOTIFY, clusterNode);
199
200 _controlChannel.send(null, null, clusterRequest);
201 }
202 catch (ChannelException ce) {
203 _log.error("Unable to send multicast message ", ce);
204 }
205 catch (SystemException se) {
206 _log.error("Unable to determine local network address", se);
207 }
208 }
209
210 public boolean isClusterNodeAlive(String clusterNodeId) {
211 if (!isEnabled()) {
212 return false;
213 }
214
215 return _clusterNodeIdMap.containsKey(clusterNodeId);
216 }
217
218 public boolean isEnabled() {
219 return PropsValues.CLUSTER_LINK_ENABLED;
220 }
221
222 public void portalPortConfigured(int port) {
223 if (!isEnabled()) {
224 return;
225 }
226
227 try {
228 ClusterNode clusterNode = getLocalClusterNode();
229
230 clusterNode.setPort(port);
231
232 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
233 ClusterMessageType.UPDATE, clusterNode);
234
235 _controlChannel.send(null, null, clusterRequest);
236 }
237 catch (Exception e) {
238 if (_log.isErrorEnabled()) {
239 _log.error("Unable to determine configure node port", e);
240 }
241 }
242 }
243
244 public void removeClusterEventListener(
245 ClusterEventListener clusterEventListener) {
246
247 if (!isEnabled()) {
248 return;
249 }
250
251 _clusterEventListeners.remove(clusterEventListener);
252 }
253
254 public void setClusterEventListeners(
255 List<ClusterEventListener> clusterEventListeners) {
256
257 if (!isEnabled()) {
258 return;
259 }
260
261 _clusterEventListeners.addAllAbsent(clusterEventListeners);
262 }
263
264 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
265 if (!isEnabled()) {
266 return;
267 }
268
269 _shortcutLocalMethod = shortcutLocalMethod;
270 }
271
272 protected void fireClusterEvent(ClusterEvent clusterEvent) {
273 for (ClusterEventListener listener : _clusterEventListeners) {
274 listener.processClusterEvent(clusterEvent);
275 }
276 }
277
278 protected JChannel getControlChannel() {
279 return _controlChannel;
280 }
281
282 protected FutureClusterResponses getExecutionResults(String uuid) {
283 Reference<FutureClusterResponses> reference = _executionResultMap.get(
284 uuid);
285
286 if (reference != null) {
287 return reference.get();
288 }
289 else {
290 return null;
291 }
292 }
293
294 protected Address getLocalControlAddress() {
295 return new AddressImpl(_controlChannel.getLocalAddress());
296 }
297
298 protected void initChannels() {
299 Properties controlProperties = PropsUtil.getProperties(
300 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
301
302 String controlProperty = controlProperties.getProperty(
303 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
304
305 ClusterRequestReceiver clusterInvokeReceiver =
306 new ClusterRequestReceiver(this);
307
308 try {
309 _controlChannel = createChannel(
310 controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
311 }
312 catch (ChannelException ce) {
313 _log.error(ce, ce);
314 }
315 catch (Exception e) {
316 _log.error(e, e);
317 }
318 }
319
320 protected boolean isShortcutLocalMethod() {
321 return _shortcutLocalMethod;
322 }
323
324 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
325 _addressMap.put(joinAddress, clusterNode);
326
327 Address previousAddress = _clusterNodeIdMap.put(
328 clusterNode.getClusterNodeId(), joinAddress);
329
330 if ((previousAddress == null) &&
331 !getLocalControlAddress().equals(joinAddress)) {
332
333 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
334
335 fireClusterEvent(clusterEvent);
336 }
337 }
338
339 protected void memberRemoved(List<Address> departAddresses) {
340 List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
341
342 for (Address departAddress : departAddresses) {
343 ClusterNode departingClusterNode = _addressMap.remove(
344 departAddress);
345 if (departingClusterNode != null) {
346 departingClusterNodes.add(departingClusterNode);
347
348 _clusterNodeIdMap.remove(
349 departingClusterNode.getClusterNodeId());
350 }
351 }
352
353 if (departingClusterNodes.isEmpty()) {
354 return;
355 }
356
357 ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
358
359 fireClusterEvent(clusterEvent);
360 }
361
362 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
363 boolean isMulticast = clusterRequest.isMulticast();
364
365 List<Address> addresses = null;
366
367 if (isMulticast) {
368 addresses = getAddresses(_controlChannel);
369 }
370 else {
371 Collection<String> clusterNodeIds =
372 clusterRequest.getTargetClusterNodeIds();
373
374 addresses = new ArrayList<Address>(clusterNodeIds.size());
375
376 for (String clusterNodeId : clusterNodeIds) {
377 Address address = _clusterNodeIdMap.get(clusterNodeId);
378
379 addresses.add(address);
380 }
381 }
382
383 return addresses;
384 }
385
386 protected ClusterNodeResponse runLocalMethod(MethodWrapper methodWrapper)
387 throws SystemException {
388
389 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
390
391 ClusterNode localClusterNode = getLocalClusterNode();
392
393 clusterNodeResponse.setClusterNode(localClusterNode);
394 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
395
396 if (methodWrapper == null) {
397 clusterNodeResponse.setException(
398 new ClusterException(
399 "Payload is not of type " + MethodWrapper.class.getName()));
400
401 return clusterNodeResponse;
402 }
403
404 try {
405 Object returnValue = MethodInvoker.invoke(methodWrapper);
406
407 if (returnValue instanceof Serializable) {
408 clusterNodeResponse.setResult(returnValue);
409 }
410 else if (returnValue != null) {
411 clusterNodeResponse.setException(
412 new ClusterException("Return value is not serializable"));
413 }
414 }
415 catch (Exception e) {
416 clusterNodeResponse.setException(e);
417 }
418
419 return clusterNodeResponse;
420 }
421
422 protected void sendMulticastRequest(ClusterRequest clusterRequest)
423 throws SystemException {
424
425 try {
426 _controlChannel.send(null, null, clusterRequest);
427 }
428 catch (ChannelException ce) {
429 _log.error(
430 "Unable to send multicast message " + clusterRequest, ce);
431
432 throw new SystemException(
433 "Unable to send multicast request", ce);
434 }
435 }
436
437 protected void sendUnicastRequest(
438 ClusterRequest clusterRequest, List<Address> addresses)
439 throws SystemException {
440
441 for (Address address : addresses) {
442 org.jgroups.Address jGroupsAddress =
443 (org.jgroups.Address)address.getRealAddress();
444
445 try {
446 _controlChannel.send(jGroupsAddress, null, clusterRequest);
447 }
448 catch (ChannelException ce) {
449 _log.error(
450 "Unable to send unicast message " + clusterRequest, ce);
451
452 throw new SystemException(
453 "Unable to send unicast request", ce);
454 }
455 }
456 }
457
458 private static final String _DEFAULT_CLUSTER_NAME =
459 "LIFERAY-CONTROL-CHANNEL";
460
461 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
462
463 private Map<Address, ClusterNode> _addressMap =
464 new ConcurrentHashMap<Address, ClusterNode>();
465 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
466 new CopyOnWriteArrayList<ClusterEventListener>();
467 private Map<String, Address> _clusterNodeIdMap =
468 new ConcurrentHashMap<String, Address>();
469 private JChannel _controlChannel;
470 private Map<String, Reference<FutureClusterResponses>> _executionResultMap =
471 new ConcurrentHashMap<String, Reference<FutureClusterResponses>>();
472 private String _localClusterNodeId;
473 private boolean _shortcutLocalMethod;
474
475 private class RemoveResultKeyFinalizeAction implements FinalizeAction {
476
477 private String _uuid;
478
479 public RemoveResultKeyFinalizeAction(String uuid) {
480 _uuid = uuid;
481 }
482
483 public void finalize() {
484 _executionResultMap.remove(_uuid);
485 }
486
487 }
488
489 }