001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventType;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.log.Log;
027 import com.liferay.portal.kernel.log.LogFactoryUtil;
028 import com.liferay.portal.kernel.util.MethodHandler;
029
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.List;
033
034 import org.jgroups.Message;
035 import org.jgroups.View;
036
037
041 public class ClusterRequestReceiver extends BaseReceiver {
042
043 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
044 _clusterExecutorImpl = clusterExecutorImpl;
045 }
046
047 @Override
048 protected void doReceive(Message message) {
049 Object obj = retrievePayload(message);
050
051 if (obj == null) {
052 return;
053 }
054
055 Address sourceAddress = new AddressImpl(message.getSrc());
056
057 if (sourceAddress.equals(
058 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
059
060 boolean isProcessed = processLocalMessage(obj);
061
062 if (isProcessed) {
063 return;
064 }
065 }
066
067 if (obj instanceof ClusterRequest) {
068 ClusterRequest clusterRequest = (ClusterRequest)obj;
069
070 processClusterRequest(clusterRequest, sourceAddress);
071 }
072 else if (obj instanceof ClusterNodeResponse) {
073 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
074
075 processClusterResponse(clusterNodeResponse, sourceAddress);
076 }
077 else if (_log.isWarnEnabled()) {
078 _log.warn(
079 "Unable to process message content of type " + obj.getClass());
080 }
081 }
082
083 @Override
084 protected void doViewAccepted(View oldView, View newView) {
085 List<Address> departAddresses = getDepartAddresses(oldView, newView);
086 List<Address> newAddresses = getNewAddresses(oldView, newView);
087
088 if (!newAddresses.isEmpty()) {
089 _clusterExecutorImpl.sendNotifyRequest();
090 }
091
092 if (!departAddresses.isEmpty()) {
093 _clusterExecutorImpl.memberRemoved(departAddresses);
094 }
095 }
096
097 @Override
098 protected void doCoordinatorUpdated(org.jgroups.Address coordinator) {
099 _clusterExecutorImpl.fireClusterEvent(
100 new ClusterEvent(ClusterEventType.COORDINATOR_UPDATE));
101 }
102
103 protected List<Address> getDepartAddresses(View oldView, View newView) {
104 List<org.jgroups.Address> currentJGroupsAddresses =
105 newView.getMembers();
106 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
107
108 List<org.jgroups.Address> departJGroupsAddresses =
109 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
110
111 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
112
113 if (departJGroupsAddresses.isEmpty()) {
114 return Collections.emptyList();
115 }
116
117 List<Address> departAddresses = new ArrayList<Address>(
118 departJGroupsAddresses.size());
119
120 for (org.jgroups.Address departJGroupsAddress :
121 departJGroupsAddresses) {
122
123 Address departAddress = new AddressImpl(departJGroupsAddress);
124
125 departAddresses.add(departAddress);
126 }
127
128 return departAddresses;
129 }
130
131 protected List<Address> getNewAddresses(View oldView, View newView) {
132 List<org.jgroups.Address> currentJGroupsAddresses =
133 newView.getMembers();
134 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
135
136 List<org.jgroups.Address> newJGroupsAddresses =
137 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
138
139 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
140
141 if (newJGroupsAddresses.isEmpty()) {
142 return Collections.emptyList();
143 }
144
145 List<Address> newAddresses = new ArrayList<Address>(
146 newJGroupsAddresses.size());
147
148 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
149 Address newAddress = new AddressImpl(newJGroupsAddress);
150
151 newAddresses.add(newAddress);
152 }
153
154 return newAddresses;
155 }
156
157 protected void handleResponse(
158 Address address, ClusterRequest clusterRequest, Object returnValue,
159 Exception exception) {
160
161 ClusterNodeResponse clusterNodeResponse =
162 _clusterExecutorImpl.generateClusterNodeResponse(
163 clusterRequest, returnValue, exception);
164
165 try {
166 _clusterExecutorImpl.sendJGroupsMessage(
167 _clusterExecutorImpl.getControlChannel(),
168 (org.jgroups.Address)address.getRealAddress(),
169 clusterNodeResponse);
170 }
171 catch (Exception e) {
172 _log.error(
173 "Unable to send response message " + clusterNodeResponse, e);
174 }
175 catch (Throwable t) {
176 _log.error(t, t);
177 }
178 }
179
180 protected void processClusterRequest(
181 ClusterRequest clusterRequest, Address sourceAddress) {
182
183 ClusterMessageType clusterMessageType =
184 clusterRequest.getClusterMessageType();
185
186 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
187 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
188
189 _clusterExecutorImpl.memberJoined(
190 sourceAddress, clusterRequest.getOriginatingClusterNode());
191
192 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
193 handleResponse(sourceAddress, clusterRequest, null, null);
194 }
195
196 return;
197 }
198
199 MethodHandler methodHandler = clusterRequest.getMethodHandler();
200
201 Object returnValue = null;
202 Exception exception = null;
203
204 if (methodHandler != null) {
205 try {
206 ClusterInvokeThreadLocal.setEnabled(false);
207
208 returnValue = methodHandler.invoke(true);
209 }
210 catch (Exception e) {
211 exception = e;
212
213 _log.error("Unable to invoke method " + methodHandler, e);
214 }
215 finally {
216 ClusterInvokeThreadLocal.setEnabled(true);
217 }
218 }
219 else {
220 exception = new ClusterException(
221 "Payload is not of type " + MethodHandler.class.getName());
222 }
223
224 if (!clusterRequest.isFireAndForget()) {
225 handleResponse(
226 sourceAddress, clusterRequest, returnValue, exception);
227 }
228 }
229
230 protected void processClusterResponse(
231 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
232
233 ClusterMessageType clusterMessageType =
234 clusterNodeResponse.getClusterMessageType();
235
236 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
237 _clusterExecutorImpl.memberJoined(
238 sourceAddress, clusterNodeResponse.getClusterNode());
239
240 return;
241 }
242
243 String uuid = clusterNodeResponse.getUuid();
244
245 FutureClusterResponses futureClusterResponses =
246 _clusterExecutorImpl.getExecutionResults(uuid);
247
248 if (futureClusterResponses == null) {
249 if (_log.isInfoEnabled()) {
250 _log.info("Unable to find response container for " + uuid);
251 }
252
253 return;
254 }
255
256 if (futureClusterResponses.expectsReply(sourceAddress)) {
257 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
258 }
259 else {
260 if (_log.isWarnEnabled()) {
261 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
262 }
263 }
264 }
265
266 protected boolean processLocalMessage(Object message) {
267 if (message instanceof ClusterRequest) {
268 ClusterRequest clusterRequest = (ClusterRequest)message;
269
270 if (clusterRequest.isSkipLocal()) {
271 return true;
272 }
273 }
274
275 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
276 return true;
277 }
278
279 return false;
280 }
281
282 private static Log _log = LogFactoryUtil.getLog(
283 ClusterRequestReceiver.class);
284
285 private ClusterExecutorImpl _clusterExecutorImpl;
286
287 }