001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.concurrent.ConcurrentReferenceValueHashMap;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.memory.FinalizeManager;
034 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
035 import com.liferay.portal.kernel.util.CharPool;
036 import com.liferay.portal.kernel.util.GetterUtil;
037 import com.liferay.portal.kernel.util.Http;
038 import com.liferay.portal.kernel.util.MethodHandler;
039 import com.liferay.portal.kernel.util.PropsKeys;
040 import com.liferay.portal.kernel.util.StringUtil;
041 import com.liferay.portal.kernel.util.Validator;
042 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043 import com.liferay.portal.util.PortalInetSocketAddressEventListener;
044 import com.liferay.portal.util.PortalUtil;
045 import com.liferay.portal.util.PropsUtil;
046 import com.liferay.portal.util.PropsValues;
047
048 import java.io.Serializable;
049
050 import java.net.InetAddress;
051 import java.net.InetSocketAddress;
052
053 import java.util.ArrayList;
054 import java.util.Collection;
055 import java.util.Collections;
056 import java.util.List;
057 import java.util.Map;
058 import java.util.Properties;
059 import java.util.concurrent.BlockingQueue;
060 import java.util.concurrent.ConcurrentHashMap;
061 import java.util.concurrent.CopyOnWriteArrayList;
062 import java.util.concurrent.ExecutorService;
063
064 import org.jgroups.JChannel;
065
066
070 @DoPrivileged
071 public class ClusterExecutorImpl
072 extends ClusterBase
073 implements ClusterExecutor, PortalInetSocketAddressEventListener {
074
075 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
076 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
077
078 @Override
079 public void addClusterEventListener(
080 ClusterEventListener clusterEventListener) {
081
082 if (!isEnabled()) {
083 return;
084 }
085
086 _clusterEventListeners.addIfAbsent(clusterEventListener);
087 }
088
089 @Override
090 public void destroy() {
091 if (!isEnabled()) {
092 return;
093 }
094
095 PortalExecutorManagerUtil.shutdown(
096 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
097
098 _controlJChannel.setReceiver(null);
099
100 _controlJChannel.close();
101
102 _clusterEventListeners.clear();
103 _clusterNodeAddresses.clear();
104 _futureClusterResponses.clear();
105 _liveInstances.clear();
106 _localAddress = null;
107 _localClusterNode = null;
108 }
109
110 @Override
111 public FutureClusterResponses execute(ClusterRequest clusterRequest) {
112 if (!isEnabled()) {
113 return null;
114 }
115
116 List<Address> addresses = prepareAddresses(clusterRequest);
117
118 FutureClusterResponses futureClusterResponses =
119 new FutureClusterResponses(addresses);
120
121 if (!clusterRequest.isFireAndForget()) {
122 String uuid = clusterRequest.getUuid();
123
124 _futureClusterResponses.put(uuid, futureClusterResponses);
125 }
126
127 if (_shortcutLocalMethod &&
128 addresses.remove(getLocalClusterNodeAddress())) {
129
130 runLocalMethod(clusterRequest, futureClusterResponses);
131 }
132
133 if (clusterRequest.isMulticast()) {
134 try {
135 _controlJChannel.send(null, clusterRequest);
136 }
137 catch (Exception e) {
138 throw new SystemException(
139 "Unable to send multicast request", e);
140 }
141 }
142 else {
143 for (Address address : addresses) {
144 org.jgroups.Address jGroupsAddress =
145 (org.jgroups.Address)address.getRealAddress();
146
147 try {
148 _controlJChannel.send(jGroupsAddress, clusterRequest);
149 }
150 catch (Exception e) {
151 throw new SystemException(
152 "Unable to send unicast request", e);
153 }
154 }
155 }
156
157 return futureClusterResponses;
158 }
159
160 @Override
161 public FutureClusterResponses execute(
162 ClusterRequest clusterRequest,
163 ClusterResponseCallback clusterResponseCallback) {
164
165 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
166
167 ClusterResponseCallbackJob clusterResponseCallbackJob =
168 new ClusterResponseCallbackJob(
169 clusterResponseCallback, futureClusterResponses);
170
171 _executorService.execute(clusterResponseCallbackJob);
172
173 return futureClusterResponses;
174 }
175
176 @Override
177 public List<ClusterEventListener> getClusterEventListeners() {
178 if (!isEnabled()) {
179 return Collections.emptyList();
180 }
181
182 return Collections.unmodifiableList(_clusterEventListeners);
183 }
184
185 @Override
186 public List<Address> getClusterNodeAddresses() {
187 if (!isEnabled()) {
188 return Collections.emptyList();
189 }
190
191 return getAddresses(_controlJChannel);
192 }
193
194 @Override
195 public List<ClusterNode> getClusterNodes() {
196 if (!isEnabled()) {
197 return Collections.emptyList();
198 }
199
200 return new ArrayList<ClusterNode>(_liveInstances.values());
201 }
202
203 @Override
204 public ClusterNode getLocalClusterNode() {
205 if (!isEnabled()) {
206 return null;
207 }
208
209 return _localClusterNode;
210 }
211
212 @Override
213 public Address getLocalClusterNodeAddress() {
214 if (!isEnabled()) {
215 return null;
216 }
217
218 return _localAddress;
219 }
220
221 @Override
222 public void initialize() {
223 if (!isEnabled()) {
224 return;
225 }
226
227 _secure = StringUtil.equalsIgnoreCase(
228 Http.HTTPS, PropsValues.WEB_SERVER_PROTOCOL);
229
230 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
231 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
232
233 PortalUtil.addPortalInetSocketAddressEventListener(this);
234
235 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
236 addClusterEventListener(new DebuggingClusterEventListenerImpl());
237 }
238
239 if (PropsValues.LIVE_USERS_ENABLED) {
240 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
241 }
242
243 try {
244 initControlChannel();
245
246 _localAddress = new AddressImpl(_controlJChannel.getAddress());
247
248 initLocalClusterNode();
249
250 memberJoined(_localAddress, _localClusterNode);
251
252 sendNotifyRequest();
253
254 BaseReceiver baseReceiver =
255 (BaseReceiver)_controlJChannel.getReceiver();
256
257 baseReceiver.openLatch();
258 }
259 catch (Exception e) {
260 if (_log.isErrorEnabled()) {
261 _log.error("Unable to initialize", e);
262 }
263
264 throw new IllegalStateException(e);
265 }
266 }
267
268 @Override
269 public boolean isClusterNodeAlive(Address address) {
270 if (!isEnabled()) {
271 return false;
272 }
273
274 List<Address> addresses = getAddresses(_controlJChannel);
275
276 return addresses.contains(address);
277 }
278
279 @Override
280 public boolean isClusterNodeAlive(String clusterNodeId) {
281 if (!isEnabled()) {
282 return false;
283 }
284
285 return _clusterNodeAddresses.containsKey(clusterNodeId);
286 }
287
288 @Override
289 public void portalLocalInetSockAddressConfigured(
290 InetSocketAddress inetSocketAddress) {
291
292 if (!isEnabled() ||
293 (_localClusterNode.getPortalInetSocketAddress() != null)) {
294
295 return;
296 }
297
298 try {
299 _localClusterNode.setPortalInetSocketAddress(inetSocketAddress);
300
301 memberJoined(_localAddress, _localClusterNode);
302
303 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
304 ClusterMessageType.UPDATE, _localClusterNode);
305
306 _controlJChannel.send(null, clusterRequest);
307 }
308 catch (Exception e) {
309 _log.error("Unable to determine configure node port", e);
310 }
311 }
312
313 @Override
314 public void portalServerInetSocketAddressConfigured(
315 InetSocketAddress inetSocketAddress) {
316 }
317
318 @Override
319 public void removeClusterEventListener(
320 ClusterEventListener clusterEventListener) {
321
322 if (!isEnabled()) {
323 return;
324 }
325
326 _clusterEventListeners.remove(clusterEventListener);
327 }
328
329 public void setClusterEventListeners(
330 List<ClusterEventListener> clusterEventListeners) {
331
332 if (!isEnabled()) {
333 return;
334 }
335
336 _clusterEventListeners.addAllAbsent(clusterEventListeners);
337 }
338
339 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
340 if (!isEnabled()) {
341 return;
342 }
343
344 _shortcutLocalMethod = shortcutLocalMethod;
345 }
346
347 protected void fireClusterEvent(ClusterEvent clusterEvent) {
348 for (ClusterEventListener listener : _clusterEventListeners) {
349 listener.processClusterEvent(clusterEvent);
350 }
351 }
352
353 protected ClusterNodeResponse generateClusterNodeResponse(
354 ClusterRequest clusterRequest, Object returnValue,
355 Exception exception) {
356
357 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
358
359 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
360 clusterNodeResponse.setClusterNode(getLocalClusterNode());
361 clusterNodeResponse.setClusterMessageType(
362 clusterRequest.getClusterMessageType());
363 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
364 clusterNodeResponse.setUuid(clusterRequest.getUuid());
365
366 if (exception != null) {
367 clusterNodeResponse.setException(exception);
368 }
369 else {
370 if (returnValue instanceof Serializable) {
371 clusterNodeResponse.setResult(returnValue);
372 }
373 else if (returnValue != null) {
374 clusterNodeResponse.setException(
375 new ClusterException("Return value is not serializable"));
376 }
377 }
378
379 return clusterNodeResponse;
380 }
381
382 protected InetSocketAddress getConfiguredPortalInetSockAddress(
383 boolean secure) {
384
385 InetSocketAddress inetSocketAddress = null;
386
387 String portalInetSocketAddressValue = null;
388
389 if (secure) {
390 portalInetSocketAddressValue =
391 PropsValues.PORTAL_INSTANCE_HTTPS_INET_SOCKET_ADDRESS;
392 }
393 else {
394 portalInetSocketAddressValue =
395 PropsValues.PORTAL_INSTANCE_HTTP_INET_SOCKET_ADDRESS;
396 }
397
398 if (Validator.isNotNull(portalInetSocketAddressValue)) {
399 String[] parts = StringUtil.split(
400 portalInetSocketAddressValue, CharPool.COLON);
401
402 if (parts.length == 2) {
403 try {
404 inetSocketAddress = new InetSocketAddress(
405 InetAddress.getByName(parts[0]),
406 GetterUtil.getIntegerStrict(parts[1]));
407 }
408 catch (Exception e) {
409 _log.error(
410 "Unable to parse portal InetSocketAddress from " +
411 portalInetSocketAddressValue,
412 e);
413 }
414 }
415 }
416
417 return inetSocketAddress;
418 }
419
420 protected JChannel getControlChannel() {
421 return _controlJChannel;
422 }
423
424 protected FutureClusterResponses getExecutionResults(String uuid) {
425 return _futureClusterResponses.get(uuid);
426 }
427
428 protected void initControlChannel() throws Exception {
429 Properties controlProperties = PropsUtil.getProperties(
430 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
431
432 String controlProperty = controlProperties.getProperty(
433 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
434
435 ClusterRequestReceiver clusterRequestReceiver =
436 new ClusterRequestReceiver(this);
437
438 _controlJChannel = createJChannel(
439 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
440 }
441
442 protected void initLocalClusterNode() {
443 InetAddress inetAddress = getBindInetAddress(_controlJChannel);
444
445 ClusterNode clusterNode = new ClusterNode(
446 PortalUUIDUtil.generate(), inetAddress);
447
448 InetSocketAddress inetSocketAddress =
449 getConfiguredPortalInetSockAddress(_secure);
450
451 if (inetSocketAddress != null) {
452 clusterNode.setPortalInetSocketAddress(inetSocketAddress);
453 }
454
455 _localClusterNode = clusterNode;
456 }
457
458 protected boolean isShortcutLocalMethod() {
459 return _shortcutLocalMethod;
460 }
461
462 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
463 _liveInstances.put(joinAddress, clusterNode);
464
465 Address previousAddress = _clusterNodeAddresses.put(
466 clusterNode.getClusterNodeId(), joinAddress);
467
468 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
469 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
470
471
472
473 fireClusterEvent(clusterEvent);
474 }
475 }
476
477 protected void memberRemoved(List<Address> departAddresses) {
478 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
479
480 for (Address departAddress : departAddresses) {
481 ClusterNode departClusterNode = _liveInstances.remove(
482 departAddress);
483
484 if (departClusterNode == null) {
485 continue;
486 }
487
488 departClusterNodes.add(departClusterNode);
489
490 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
491 }
492
493 if (departClusterNodes.isEmpty()) {
494 return;
495 }
496
497 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
498
499 fireClusterEvent(clusterEvent);
500 }
501
502 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
503 boolean isMulticast = clusterRequest.isMulticast();
504
505 List<Address> addresses = null;
506
507 if (isMulticast) {
508 addresses = getAddresses(_controlJChannel);
509 }
510 else {
511 addresses = new ArrayList<Address>();
512
513 Collection<Address> clusterNodeAddresses =
514 clusterRequest.getTargetClusterNodeAddresses();
515
516 if (clusterNodeAddresses != null) {
517 addresses.addAll(clusterNodeAddresses);
518 }
519
520 Collection<String> clusterNodeIds =
521 clusterRequest.getTargetClusterNodeIds();
522
523 if (clusterNodeIds != null) {
524 for (String clusterNodeId : clusterNodeIds) {
525 Address address = _clusterNodeAddresses.get(clusterNodeId);
526
527 addresses.add(address);
528 }
529 }
530 }
531
532 if (clusterRequest.isSkipLocal()) {
533 addresses.remove(getLocalClusterNodeAddress());
534 }
535
536 return addresses;
537 }
538
539 protected void runLocalMethod(
540 ClusterRequest clusterRequest,
541 FutureClusterResponses futureClusterResponses) {
542
543 MethodHandler methodHandler = clusterRequest.getMethodHandler();
544
545 Object returnValue = null;
546 Exception exception = null;
547
548 if (methodHandler == null) {
549 exception = new ClusterException(
550 "Payload is not of type " + MethodHandler.class.getName());
551 }
552 else {
553 try {
554 returnValue = methodHandler.invoke();
555 }
556 catch (Exception e) {
557 exception = e;
558 }
559 }
560
561 if (!clusterRequest.isFireAndForget()) {
562 ClusterNodeResponse clusterNodeResponse =
563 generateClusterNodeResponse(
564 clusterRequest, returnValue, exception);
565
566 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
567 }
568 }
569
570 protected void sendNotifyRequest() {
571 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
572 ClusterMessageType.NOTIFY, _localClusterNode);
573
574 try {
575 _controlJChannel.send(null, clusterRequest);
576 }
577 catch (Exception e) {
578 _log.error("Unable to send notify message", e);
579 }
580 }
581
582 private static final String _DEFAULT_CLUSTER_NAME =
583 "LIFERAY-CONTROL-CHANNEL";
584
585 private static final Log _log = LogFactoryUtil.getLog(
586 ClusterExecutorImpl.class);
587
588 private final CopyOnWriteArrayList<ClusterEventListener>
589 _clusterEventListeners =
590 new CopyOnWriteArrayList<ClusterEventListener>();
591 private final Map<String, Address> _clusterNodeAddresses =
592 new ConcurrentHashMap<String, Address>();
593 private JChannel _controlJChannel;
594 private ExecutorService _executorService;
595 private Map<String, FutureClusterResponses> _futureClusterResponses =
596 new ConcurrentReferenceValueHashMap<String, FutureClusterResponses>(
597 FinalizeManager.WEAK_REFERENCE_FACTORY);
598 private final Map<Address, ClusterNode> _liveInstances =
599 new ConcurrentHashMap<Address, ClusterNode>();
600 private Address _localAddress;
601 private ClusterNode _localClusterNode;
602 private boolean _secure;
603 private boolean _shortcutLocalMethod;
604
605 private class ClusterResponseCallbackJob implements Runnable {
606
607 public ClusterResponseCallbackJob(
608 ClusterResponseCallback clusterResponseCallback,
609 FutureClusterResponses futureClusterResponses) {
610
611 _clusterResponseCallback = clusterResponseCallback;
612 _futureClusterResponses = futureClusterResponses;
613 }
614
615 @Override
616 public void run() {
617 BlockingQueue<ClusterNodeResponse> blockingQueue =
618 _futureClusterResponses.getPartialResults();
619
620 _clusterResponseCallback.callback(blockingQueue);
621 }
622
623 private final ClusterResponseCallback _clusterResponseCallback;
624 private final FutureClusterResponses _futureClusterResponses;
625
626 }
627
628 }