001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterException;
019 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
020 import com.liferay.portal.kernel.cluster.ClusterMessageType;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.log.Log;
025 import com.liferay.portal.kernel.log.LogFactoryUtil;
026 import com.liferay.portal.kernel.util.MethodHandler;
027
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.List;
031 import java.util.concurrent.CountDownLatch;
032
033 import org.jgroups.Channel;
034 import org.jgroups.Message;
035 import org.jgroups.View;
036
037
041 public class ClusterRequestReceiver extends BaseReceiver {
042
043 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
044 _countDownLatch = new CountDownLatch(1);
045 _clusterExecutorImpl = clusterExecutorImpl;
046 }
047
048 public void openLatch() {
049 _countDownLatch.countDown();
050 }
051
052 @Override
053 public void receive(Message message) {
054 try {
055 _countDownLatch.await();
056 }
057 catch (InterruptedException ie) {
058 _log.error(
059 "Latch opened prematurely by interruption. Dependence may " +
060 "not be ready.");
061 }
062
063 Object obj = message.getObject();
064
065 if (obj == null) {
066 if (_log.isWarnEnabled()) {
067 _log.warn("Message content is null");
068 }
069
070 return;
071 }
072
073 Address sourceAddress = new AddressImpl(message.getSrc());
074
075 if (sourceAddress.equals(
076 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
077
078 boolean isProcessed = processLocalMessage(obj);
079
080 if (isProcessed) {
081 return;
082 }
083 }
084
085 if (obj instanceof ClusterRequest) {
086 ClusterRequest clusterRequest = (ClusterRequest)obj;
087
088 processClusterRequest(clusterRequest, sourceAddress);
089 }
090 else if (obj instanceof ClusterNodeResponse) {
091 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
092
093 processClusterResponse(clusterNodeResponse, sourceAddress);
094 }
095 else if (_log.isWarnEnabled()) {
096 _log.warn(
097 "Unable to process message content of type " + obj.getClass());
098 }
099 }
100
101 @Override
102 public void viewAccepted(View view) {
103 super.viewAccepted(view);
104
105 if (_lastView == null) {
106 _lastView = view;
107
108 return;
109 }
110
111 List<Address> departAddresses = getDepartAddresses(view);
112 List<Address> newAddresses = getNewAddresses(view);
113
114 _lastView = view;
115
116 try {
117 _countDownLatch.await();
118 }
119 catch (InterruptedException ie) {
120 _log.error(
121 "Latch opened prematurely by interruption. Dependence may " +
122 "not be ready.");
123 }
124
125 if (!newAddresses.isEmpty()) {
126 _clusterExecutorImpl.sendNotifyRequest();
127 }
128
129 if (!departAddresses.isEmpty()) {
130 _clusterExecutorImpl.memberRemoved(departAddresses);
131 }
132 }
133
134 protected List<Address> getDepartAddresses(View view) {
135 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
136 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
137
138 List<org.jgroups.Address> departJGroupsAddresses =
139 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
140
141 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
142
143 if (departJGroupsAddresses.isEmpty()) {
144 return Collections.emptyList();
145 }
146
147 List<Address> departAddresses = new ArrayList<Address>(
148 departJGroupsAddresses.size());
149
150 for (org.jgroups.Address departJGroupsAddress :
151 departJGroupsAddresses) {
152
153 Address departAddress = new AddressImpl(departJGroupsAddress);
154
155 departAddresses.add(departAddress);
156 }
157
158 return departAddresses;
159 }
160
161 protected List<Address> getNewAddresses(View view) {
162 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
163 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
164
165 List<org.jgroups.Address> newJGroupsAddresses =
166 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
167
168 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
169
170 if (newJGroupsAddresses.isEmpty()) {
171 return Collections.emptyList();
172 }
173
174 List<Address> newAddresses = new ArrayList<Address>(
175 newJGroupsAddresses.size());
176
177 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
178 Address newAddress = new AddressImpl(newJGroupsAddress);
179
180 newAddresses.add(newAddress);
181 }
182
183 return newAddresses;
184 }
185
186 protected void handleResponse(
187 Address address, ClusterRequest clusterRequest, Object returnValue,
188 Exception exception) {
189
190 ClusterNodeResponse clusterNodeResponse =
191 _clusterExecutorImpl.generateClusterNodeResponse(
192 clusterRequest, returnValue, exception);
193
194 Channel channel = _clusterExecutorImpl.getControlChannel();
195
196 try {
197 channel.send(
198 (org.jgroups.Address)address.getRealAddress(),
199 clusterNodeResponse);
200 }
201 catch (Exception e) {
202 _log.error(
203 "Unable to send response message " + clusterNodeResponse, e);
204 }
205 catch (Throwable t) {
206 _log.error(t, t);
207 }
208 }
209
210 protected void processClusterRequest(
211 ClusterRequest clusterRequest, Address sourceAddress) {
212
213 ClusterMessageType clusterMessageType =
214 clusterRequest.getClusterMessageType();
215
216 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
217 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
218
219 _clusterExecutorImpl.memberJoined(
220 sourceAddress, clusterRequest.getOriginatingClusterNode());
221
222 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
223 handleResponse(sourceAddress, clusterRequest, null, null);
224 }
225
226 return;
227 }
228
229 MethodHandler methodHandler = clusterRequest.getMethodHandler();
230
231 Object returnValue = null;
232 Exception exception = null;
233
234 if (methodHandler != null) {
235 try {
236 ClusterInvokeThreadLocal.setEnabled(false);
237
238 returnValue = methodHandler.invoke(true);
239 }
240 catch (Exception e) {
241 exception = e;
242
243 _log.error("Unable to invoke method " + methodHandler, e);
244 }
245 finally {
246 ClusterInvokeThreadLocal.setEnabled(true);
247 }
248 }
249 else {
250 exception = new ClusterException(
251 "Payload is not of type " + MethodHandler.class.getName());
252 }
253
254 if (!clusterRequest.isFireAndForget()) {
255 handleResponse(
256 sourceAddress, clusterRequest, returnValue, exception);
257 }
258 }
259
260 protected void processClusterResponse(
261 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
262
263 ClusterMessageType clusterMessageType =
264 clusterNodeResponse.getClusterMessageType();
265
266 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
267 _clusterExecutorImpl.memberJoined(
268 sourceAddress, clusterNodeResponse.getClusterNode());
269
270 return;
271 }
272
273 String uuid = clusterNodeResponse.getUuid();
274
275 FutureClusterResponses futureClusterResponses =
276 _clusterExecutorImpl.getExecutionResults(uuid);
277
278 if (futureClusterResponses == null) {
279 if (_log.isInfoEnabled()) {
280 _log.info("Unable to find response container for " + uuid);
281 }
282
283 return;
284 }
285
286 if (futureClusterResponses.expectsReply(sourceAddress)) {
287 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
288 }
289 else {
290 if (_log.isWarnEnabled()) {
291 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
292 }
293 }
294 }
295
296 protected boolean processLocalMessage(Object message) {
297 if (message instanceof ClusterRequest) {
298 ClusterRequest clusterRequest = (ClusterRequest)message;
299
300 if (clusterRequest.isSkipLocal()) {
301 return true;
302 }
303 }
304
305 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
306 return true;
307 }
308
309 return false;
310 }
311
312 private static Log _log = LogFactoryUtil.getLog(
313 ClusterRequestReceiver.class);
314
315 private ClusterExecutorImpl _clusterExecutorImpl;
316 private CountDownLatch _countDownLatch;
317 private volatile View _lastView;
318
319 }