001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.InetAddressUtil;
031 import com.liferay.portal.kernel.util.MethodInvoker;
032 import com.liferay.portal.kernel.util.MethodWrapper;
033 import com.liferay.portal.kernel.util.PropsKeys;
034 import com.liferay.portal.kernel.util.PropsUtil;
035 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
036 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
037 import com.liferay.portal.util.PortalPortEventListener;
038 import com.liferay.portal.util.PortalUtil;
039 import com.liferay.portal.util.PropsValues;
040
041 import java.io.Serializable;
042
043 import java.net.InetAddress;
044
045 import java.util.ArrayList;
046 import java.util.Collection;
047 import java.util.Collections;
048 import java.util.List;
049 import java.util.Map;
050 import java.util.Properties;
051 import java.util.concurrent.ConcurrentHashMap;
052 import java.util.concurrent.CopyOnWriteArrayList;
053
054 import org.jgroups.ChannelException;
055 import org.jgroups.JChannel;
056
057
061 public class ClusterExecutorImpl
062 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
063
064 public void addClusterEventListener(
065 ClusterEventListener clusterEventListener) {
066 if (!isEnabled()) {
067 return;
068 }
069
070 _clusterEventListeners.addIfAbsent(clusterEventListener);
071 }
072
073 public void afterPropertiesSet() {
074 super.afterPropertiesSet();
075
076 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
077 addClusterEventListener(new DebuggingClusterEventListenerImpl());
078 }
079 }
080
081 public void destroy() {
082 if (!isEnabled()) {
083 return;
084 }
085
086 _controlChannel.close();
087 }
088
089 public FutureClusterResponses execute(ClusterRequest clusterRequest)
090 throws SystemException {
091
092 if (!isEnabled()) {
093 return null;
094 }
095
096 List<Address> addresses = prepareAddresses(clusterRequest);
097
098 FutureClusterResponses futureClusterResponses =
099 new FutureClusterResponses(addresses);
100
101 if (!clusterRequest.isFireAndForget()) {
102 String uuid = clusterRequest.getUuid();
103
104 _executionResultMap.put(uuid, futureClusterResponses);
105 }
106
107 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
108 addresses.remove(getLocalControlAddress())) {
109
110 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
111 clusterRequest.getMethodWrapper());
112
113 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
114 clusterNodeResponse.setUuid(clusterRequest.getUuid());
115
116 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
117 }
118
119 if (clusterRequest.isMulticast()) {
120 sendMulticastRequest(clusterRequest);
121 }
122 else {
123 sendUnicastRequest(clusterRequest, addresses);
124 }
125
126 return futureClusterResponses;
127 }
128
129 public List<ClusterEventListener> getClusterEventListeners() {
130 if (!isEnabled()) {
131 return Collections.EMPTY_LIST;
132 }
133
134 return Collections.unmodifiableList(_clusterEventListeners);
135 }
136
137 public List<ClusterNode> getClusterNodes() {
138 if (!isEnabled()) {
139 return Collections.EMPTY_LIST;
140 }
141
142 return new ArrayList<ClusterNode>(_addressMap.values());
143 }
144
145 public ClusterNode getLocalClusterNode() throws SystemException {
146 if (!isEnabled()) {
147 return null;
148 }
149
150 ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
151
152 if (clusterNode == null) {
153 _localClusterNodeId = PortalUUIDUtil.generate();
154
155 clusterNode = new ClusterNode(_localClusterNodeId);
156
157 clusterNode.setPort(PortalUtil.getPortalPort());
158
159 try {
160 InetAddress inetAddress = bindInetAddress;
161
162 if (inetAddress == null) {
163 inetAddress = InetAddressUtil.getLocalInetAddress();
164 }
165
166 clusterNode.setInetAddress(inetAddress);
167
168 clusterNode.setHostName(inetAddress.getHostName());
169 }
170 catch (Exception e) {
171 throw new SystemException(
172 "Unable to determine local network address", e);
173 }
174 }
175
176 return clusterNode;
177 }
178
179 public void initialize() {
180 if (!isEnabled()) {
181 return;
182 }
183
184 try {
185 PortalUtil.addPortalPortEventListener(this);
186
187 ClusterNode clusterNode = getLocalClusterNode();
188
189 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
190 ClusterMessageType.NOTIFY, clusterNode);
191
192 _controlChannel.send(null, null, clusterRequest);
193 }
194 catch (ChannelException ce) {
195 _log.error("Unable to send multicast message ", ce);
196 }
197 catch (SystemException se) {
198 _log.error("Unable to determine local network address", se);
199 }
200 }
201
202 public boolean isClusterNodeAlive(String clusterNodeId) {
203 if (!isEnabled()) {
204 return false;
205 }
206
207 return _clusterNodeIdMap.containsKey(clusterNodeId);
208 }
209
210 public boolean isEnabled() {
211 return PropsValues.CLUSTER_LINK_ENABLED;
212 }
213
214 public void portalPortConfigured(int port) {
215 if (!isEnabled()) {
216 return;
217 }
218
219 try {
220 ClusterNode clusterNode = getLocalClusterNode();
221
222 clusterNode.setPort(port);
223
224 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
225 ClusterMessageType.UPDATE, clusterNode);
226
227 _controlChannel.send(null, null, clusterRequest);
228 }
229 catch (Exception e) {
230 if (_log.isErrorEnabled()) {
231 _log.error("Unable to determine configure node port", e);
232 }
233 }
234 }
235
236 public void removeClusterEventListener(
237 ClusterEventListener clusterEventListener) {
238
239 if (!isEnabled()) {
240 return;
241 }
242
243 _clusterEventListeners.remove(clusterEventListener);
244 }
245
246 public void setClusterEventListeners(
247 List<ClusterEventListener> clusterEventListeners) {
248
249 if (!isEnabled()) {
250 return;
251 }
252
253 _clusterEventListeners.addAllAbsent(clusterEventListeners);
254 }
255
256 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
257 if (!isEnabled()) {
258 return;
259 }
260
261 _shortcutLocalMethod = shortcutLocalMethod;
262 }
263
264 protected void fireClusterEvent(ClusterEvent clusterEvent) {
265 for (ClusterEventListener listener : _clusterEventListeners) {
266 listener.processClusterEvent(clusterEvent);
267 }
268 }
269
270 protected JChannel getControlChannel() {
271 return _controlChannel;
272 }
273
274 protected FutureClusterResponses getExecutionResults(String uuid) {
275 return _executionResultMap.get(uuid);
276 }
277
278 protected Address getLocalControlAddress() {
279 return new AddressImpl(_controlChannel.getLocalAddress());
280 }
281
282 protected void initChannels() {
283 Properties controlProperties = PropsUtil.getProperties(
284 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
285
286 String controlProperty = controlProperties.getProperty(
287 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
288
289 ClusterRequestReceiver clusterInvokeReceiver =
290 new ClusterRequestReceiver(this);
291
292 try {
293 _controlChannel = createJChannel(
294 controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
295 }
296 catch (ChannelException ce) {
297 _log.error(ce, ce);
298 }
299 catch (Exception e) {
300 _log.error(e, e);
301 }
302 }
303
304 protected boolean isShortcutLocalMethod() {
305 return _shortcutLocalMethod;
306 }
307
308 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
309 _addressMap.put(joinAddress, clusterNode);
310
311 Address previousAddress = _clusterNodeIdMap.put(
312 clusterNode.getClusterNodeId(), joinAddress);
313
314 if ((previousAddress == null) &&
315 !getLocalControlAddress().equals(joinAddress)) {
316
317 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
318
319 fireClusterEvent(clusterEvent);
320 }
321 }
322
323 protected void memberRemoved(List<Address> departAddresses) {
324 List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
325
326 for (Address departAddress : departAddresses) {
327 ClusterNode departingClusterNode = _addressMap.remove(
328 departAddress);
329 if (departingClusterNode != null) {
330 departingClusterNodes.add(departingClusterNode);
331
332 _clusterNodeIdMap.remove(
333 departingClusterNode.getClusterNodeId());
334 }
335 }
336
337 if (departingClusterNodes.isEmpty()) {
338 return;
339 }
340
341 ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
342
343 fireClusterEvent(clusterEvent);
344 }
345
346 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
347 boolean isMulticast = clusterRequest.isMulticast();
348
349 List<Address> addresses = null;
350
351 if (isMulticast) {
352 addresses = getAddresses(_controlChannel);
353 }
354 else {
355 Collection<String> clusterNodeIds =
356 clusterRequest.getTargetClusterNodeIds();
357
358 addresses = new ArrayList<Address>(clusterNodeIds.size());
359
360 for (String clusterNodeId : clusterNodeIds) {
361 Address address = _clusterNodeIdMap.get(clusterNodeId);
362
363 addresses.add(address);
364 }
365 }
366
367 return addresses;
368 }
369
370 protected ClusterNodeResponse runLocalMethod(MethodWrapper methodWrapper)
371 throws SystemException {
372
373 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
374
375 ClusterNode localClusterNode = getLocalClusterNode();
376
377 clusterNodeResponse.setClusterNode(localClusterNode);
378 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
379
380 if (methodWrapper == null) {
381 clusterNodeResponse.setException(
382 new ClusterException(
383 "Payload is not of type " + MethodWrapper.class.getName()));
384
385 return clusterNodeResponse;
386 }
387
388 try {
389 Object returnValue = MethodInvoker.invoke(methodWrapper);
390
391 if (returnValue instanceof Serializable) {
392 clusterNodeResponse.setResult(returnValue);
393 }
394 else if (returnValue != null) {
395 clusterNodeResponse.setException(
396 new ClusterException("Return value is not serializable"));
397 }
398 }
399 catch (Exception e) {
400 clusterNodeResponse.setException(e);
401 }
402
403 return clusterNodeResponse;
404 }
405
406 protected void sendMulticastRequest(ClusterRequest clusterRequest)
407 throws SystemException {
408
409 try {
410 _controlChannel.send(null, null, clusterRequest);
411 }
412 catch (ChannelException ce) {
413 _log.error(
414 "Unable to send multicast message " + clusterRequest, ce);
415
416 throw new SystemException(
417 "Unable to send multicast request", ce);
418 }
419 }
420
421 protected void sendUnicastRequest(
422 ClusterRequest clusterRequest, List<Address> addresses)
423 throws SystemException {
424
425 for (Address address : addresses) {
426 org.jgroups.Address jGroupsAddress =
427 (org.jgroups.Address)address.getRealAddress();
428
429 try {
430 _controlChannel.send(jGroupsAddress, null, clusterRequest);
431 }
432 catch (ChannelException ce) {
433 _log.error(
434 "Unable to send unicast message " + clusterRequest, ce);
435
436 throw new SystemException(
437 "Unable to send unicast request", ce);
438 }
439 }
440 }
441
442 private static final String _DEFAULT_CLUSTER_NAME =
443 "LIFERAY-CONTROL-CHANNEL";
444
445 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
446
447 private Map<Address, ClusterNode> _addressMap =
448 new ConcurrentHashMap<Address, ClusterNode>();
449 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
450 new CopyOnWriteArrayList<ClusterEventListener>();
451 private Map<String, Address> _clusterNodeIdMap =
452 new ConcurrentHashMap<String, Address>();
453 private JChannel _controlChannel;
454 private Map<String, FutureClusterResponses> _executionResultMap =
455 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
456 private String _localClusterNodeId;
457 private boolean _shortcutLocalMethod;
458
459 }