1
14
15 package com.liferay.portal.cluster;
16
17 import com.liferay.portal.kernel.cluster.Address;
18 import com.liferay.portal.kernel.cluster.ClusterException;
19 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
20 import com.liferay.portal.kernel.cluster.ClusterRequest;
21 import com.liferay.portal.kernel.cluster.ClusterResponse;
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.MethodInvoker;
25 import com.liferay.portal.kernel.util.MethodWrapper;
26
27 import java.io.Serializable;
28
29 import java.util.Map;
30 import java.util.concurrent.Future;
31
32 import org.jgroups.ChannelException;
33 import org.jgroups.JChannel;
34 import org.jgroups.Message;
35 import org.jgroups.ReceiverAdapter;
36 import org.jgroups.View;
37
38
45 public class ClusterInvokeReceiver extends ReceiverAdapter {
46
47 public ClusterInvokeReceiver(
48 Map<String, Map<Address, Future<?>>> multicastResultMap,
49 Map<String, Future<?>> unicastResultMap) {
50
51 _multicastResultMap = multicastResultMap;
52 _unicastResultMap = unicastResultMap;
53 }
54
55 public void receive(Message message) {
56 org.jgroups.Address sourceAddress = message.getSrc();
57 org.jgroups.Address localAddress = _channel.getLocalAddress();
58
59 Object obj = message.getObject();
60
61 if (obj == null) {
62 if (_log.isWarnEnabled()) {
63 _log.warn("Message content is null");
64 }
65
66 return;
67 }
68
69 if (localAddress.equals(sourceAddress) &&
70 ClusterExecutorUtil.isShortcutLocalMethod()) {
71
72 return;
73 }
74
75 if (obj instanceof ClusterRequest) {
76 ClusterRequest clusterRequest = (ClusterRequest)obj;
77
78 ClusterResponse clusterResponse = new ClusterResponseImpl();
79
80 clusterResponse.setMulticast(clusterRequest.isMulticast());
81 clusterResponse.setUuid(clusterRequest.getUuid());
82
83 Object payload = clusterRequest.getPayload();
84
85 if (payload instanceof MethodWrapper) {
86 MethodWrapper methodWrapper = (MethodWrapper)payload;
87
88 try {
89 Object returnValue = MethodInvoker.invoke(methodWrapper);
90
91 if (returnValue instanceof Serializable) {
92 clusterResponse.setResult(returnValue);
93 }
94 else if (returnValue != null) {
95 clusterResponse.setException(
96 new ClusterException(
97 "Return value is not serializable"));
98 }
99 }
100 catch (Exception e) {
101 clusterResponse.setException(e);
102 }
103 }
104 else {
105 clusterResponse.setException(
106 new ClusterException(
107 "Payload is not of type " +
108 MethodWrapper.class.getName()));
109 }
110
111 try {
112 _channel.send(sourceAddress, localAddress, clusterResponse);
113 }
114 catch (ChannelException ce) {
115 _log.error(
116 "Unable to send response message " + clusterResponse, ce);
117 }
118 }
119 else if (obj instanceof ClusterResponse) {
120 ClusterResponse clusterResponse = (ClusterResponse)obj;
121
122 String uuid = clusterResponse.getUuid();
123
124 if (clusterResponse.isMulticast() &&
125 _multicastResultMap.containsKey(uuid)) {
126
127 Map<Address, Future<?>> results = _multicastResultMap.get(uuid);
128
129 Address address = new AddressImpl(sourceAddress);
130
131 if (results.containsKey(address)) {
132 FutureResult<Object> futureResult =
133 (FutureResult<Object>)results.get(address);
134
135 if (clusterResponse.hasException()) {
136 futureResult.setException(
137 clusterResponse.getException());
138 }
139 else {
140 futureResult.setResult(clusterResponse.getResult());
141 }
142 }
143 else {
144 _log.error("New node coming from " + sourceAddress);
145 }
146 }
147 else if (_unicastResultMap.containsKey(uuid)) {
148 FutureResult<Object> futureResult =
149 (FutureResult<Object>)_unicastResultMap.get(uuid);
150
151 if (clusterResponse.hasException()) {
152 futureResult.setException(clusterResponse.getException());
153 }
154 else {
155 futureResult.setResult(clusterResponse.getResult());
156 }
157 }
158 else {
159 _log.error("Unknow UUID " + uuid + " from " + sourceAddress);
160 }
161 }
162 else {
163 if (_log.isWarnEnabled()) {
164 _log.warn(
165 "Unable to process message content of type " +
166 obj.getClass().getName());
167 }
168
169 return;
170 }
171 }
172
173 public void setChannel(JChannel channel) {
174 _channel = channel;
175 }
176
177 public void viewAccepted(View view) {
178 if (_log.isInfoEnabled()) {
179 _log.info("Accepted view " + view);
180 }
181 }
182
183 private static Log _log = LogFactoryUtil.getLog(
184 ClusterInvokeReceiver.class);
185
186 private JChannel _channel;
187 private Map<String, Map<Address, Future<?>>> _multicastResultMap;
188 private Map<String, Future<?>> _unicastResultMap;
189
190 }