001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.exception.SystemException;
029 import com.liferay.portal.kernel.log.Log;
030 import com.liferay.portal.kernel.log.LogFactoryUtil;
031 import com.liferay.portal.kernel.util.MethodHandler;
032
033 import java.util.HashSet;
034 import java.util.Set;
035 import java.util.concurrent.ExecutionException;
036 import java.util.concurrent.Future;
037 import java.util.concurrent.TimeUnit;
038 import java.util.concurrent.TimeoutException;
039
040
043 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
044
045 public void destroy() {
046 if (!_enabled) {
047 return;
048 }
049
050 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
051 }
052
053 @Override
054 public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
055 throws SystemException {
056
057 if (!_enabled) {
058 if (_log.isWarnEnabled()) {
059 _log.warn(
060 "Executing on the local node because the cluster master " +
061 "executor is disabled");
062 }
063
064 try {
065 return new LocalFuture<T>((T)methodHandler.invoke(true));
066 }
067 catch (Exception e) {
068 throw new SystemException(e);
069 }
070 }
071
072 Address address = getMasterAddress();
073
074 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
075 methodHandler, address);
076
077 try {
078 FutureClusterResponses futureClusterResponses =
079 _clusterExecutor.execute(clusterRequest);
080
081 return new RemoteFuture<T>(address, futureClusterResponses);
082 }
083 catch (Exception e) {
084 throw new SystemException(
085 "Unable to execute on master " + address.getDescription(), e);
086 }
087 }
088
089 @Override
090 public void initialize() {
091 if (!_clusterExecutor.isEnabled()) {
092 return;
093 }
094
095 try {
096 _localClusterNodeAddress =
097 _clusterExecutor.getLocalClusterNodeAddress();
098
099 _clusterEventListener =
100 new ClusterMasterTokenClusterEventListener();
101
102 _clusterExecutor.addClusterEventListener(_clusterEventListener);
103
104 Address masterAddress = getMasterAddress();
105
106 _enabled = true;
107
108 notifyMasterTokenTransitionListeners(
109 _localClusterNodeAddress.equals(masterAddress));
110 }
111 catch (Exception e) {
112 throw new RuntimeException(
113 "Unable to initialize cluster master executor", e);
114 }
115 }
116
117 @Override
118 public boolean isEnabled() {
119 return _enabled;
120 }
121
122 @Override
123 public boolean isMaster() {
124 if (isEnabled()) {
125 return _master;
126 }
127
128 return true;
129 }
130
131 @Override
132 public void registerClusterMasterTokenTransitionListener(
133 ClusterMasterTokenTransitionListener
134 clusterMasterTokenTransitionListener) {
135
136 _clusterMasterTokenTransitionListeners.add(
137 clusterMasterTokenTransitionListener);
138 }
139
140 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
141 _clusterExecutor = clusterExecutor;
142 }
143
144 public void setClusterMasterTokenTransitionListeners(
145 Set<ClusterMasterTokenTransitionListener>
146 clusterMasterTokenTransitionListeners) {
147
148 _clusterMasterTokenTransitionListeners.addAll(
149 clusterMasterTokenTransitionListeners);
150 }
151
152 @Override
153 public void unregisterClusterMasterTokenTransitionListener(
154 ClusterMasterTokenTransitionListener
155 clusterMasterTokenTransitionListener) {
156
157 _clusterMasterTokenTransitionListeners.remove(
158 clusterMasterTokenTransitionListener);
159 }
160
161 protected Address getMasterAddress() {
162 Address coordinator = _clusterExecutor.getCoordinatorAddress();
163
164 boolean master = _localClusterNodeAddress.equals(coordinator);
165
166 if (master != _master) {
167 _master = master;
168
169 if (_enabled) {
170 notifyMasterTokenTransitionListeners(master);
171 }
172 }
173
174 return coordinator;
175 }
176
177 protected void notifyMasterTokenTransitionListeners(
178 boolean masterTokenAcquired) {
179
180 for (ClusterMasterTokenTransitionListener
181 clusterMasterTokenTransitionListener :
182 _clusterMasterTokenTransitionListeners) {
183
184 if (masterTokenAcquired) {
185 clusterMasterTokenTransitionListener.masterTokenAcquired();
186 }
187 else {
188 clusterMasterTokenTransitionListener.masterTokenReleased();
189 }
190 }
191 }
192
193 private static Log _log = LogFactoryUtil.getLog(
194 ClusterMasterExecutorImpl.class);
195
196 private static volatile boolean _master;
197
198 private ClusterEventListener _clusterEventListener;
199 private ClusterExecutor _clusterExecutor;
200 private Set<ClusterMasterTokenTransitionListener>
201 _clusterMasterTokenTransitionListeners =
202 new HashSet<ClusterMasterTokenTransitionListener>();
203 private volatile boolean _enabled;
204 private volatile Address _localClusterNodeAddress;
205
206 private class ClusterMasterTokenClusterEventListener
207 implements ClusterEventListener {
208
209 @Override
210 public void processClusterEvent(ClusterEvent clusterEvent) {
211 ClusterEventType clusterEventType =
212 clusterEvent.getClusterEventType();
213
214 if (clusterEventType == ClusterEventType.COORDINATOR_UPDATE) {
215 getMasterAddress();
216 }
217 }
218 }
219
220 private class LocalFuture<T> implements Future<T> {
221
222 public LocalFuture(T result) {
223 _result = result;
224 }
225
226 @Override
227 public boolean cancel(boolean mayInterruptIfRunning) {
228 return false;
229 }
230
231 @Override
232 public boolean isCancelled() {
233 return false;
234 }
235
236 @Override
237 public boolean isDone() {
238 return true;
239 }
240
241 @Override
242 public T get() {
243 return _result;
244 }
245
246 @Override
247 public T get(long timeout, TimeUnit unit) {
248 return _result;
249 }
250
251 private final T _result;
252
253 }
254
255 private class RemoteFuture<T> implements Future<T> {
256
257 public RemoteFuture(
258 Address address, FutureClusterResponses futureClusterResponses) {
259
260 _address = address;
261 _futureClusterResponses = futureClusterResponses;
262 }
263
264 @Override
265 public boolean cancel(boolean mayInterruptIfRunning) {
266 return _futureClusterResponses.cancel(mayInterruptIfRunning);
267 }
268
269 @Override
270 public boolean isCancelled() {
271 return _futureClusterResponses.isCancelled();
272 }
273
274 @Override
275 public boolean isDone() {
276 return _futureClusterResponses.isDone();
277 }
278
279 @Override
280 public T get() throws ExecutionException, InterruptedException {
281 ClusterNodeResponses clusterNodeResponses =
282 _futureClusterResponses.get();
283
284 ClusterNodeResponse clusterNodeResponse =
285 clusterNodeResponses.getClusterResponse(_address);
286
287 try {
288 return (T)clusterNodeResponse.getResult();
289 }
290 catch (Exception e) {
291 throw new ExecutionException(e);
292 }
293 }
294
295 @Override
296 public T get(long timeout, TimeUnit unit)
297 throws ExecutionException, InterruptedException, TimeoutException {
298
299 ClusterNodeResponses clusterNodeResponses =
300 _futureClusterResponses.get(timeout, unit);
301
302 ClusterNodeResponse clusterNodeResponse =
303 clusterNodeResponses.getClusterResponse(_address);
304
305 try {
306 return (T)clusterNodeResponse.getResult();
307 }
308 catch (Exception e) {
309 throw new ExecutionException(e);
310 }
311 }
312
313 private final Address _address;
314 private final FutureClusterResponses _futureClusterResponses;
315
316 }
317
318 }