001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.MethodHandler;
031 import com.liferay.portal.kernel.util.Validator;
032 import com.liferay.portal.model.Lock;
033 import com.liferay.portal.service.LockLocalServiceUtil;
034
035 import java.util.HashSet;
036 import java.util.Set;
037 import java.util.concurrent.Future;
038 import java.util.concurrent.TimeUnit;
039 import java.util.concurrent.TimeoutException;
040
041
044 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
045
046 public void destroy() {
047 if (!_enabled) {
048 return;
049 }
050
051 try {
052 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
053
054 LockLocalServiceUtil.unlock(
055 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
056 }
057 catch (SystemException se) {
058 if (_log.isWarnEnabled()) {
059 _log.warn("Unable to destroy the cluster master executor", se);
060 }
061 }
062 }
063
064 @Override
065 public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
066 throws SystemException {
067
068 if (!_enabled) {
069 if (_log.isWarnEnabled()) {
070 _log.warn(
071 "Executing on the local node because the cluster master " +
072 "executor is disabled");
073 }
074
075 try {
076 return new LocalFuture<T>((T)methodHandler.invoke(true));
077 }
078 catch (Exception e) {
079 throw new SystemException(e);
080 }
081 }
082
083 String masterAddressString = getMasterAddressString();
084
085 Address address = AddressSerializerUtil.deserialize(
086 masterAddressString);
087
088 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
089 methodHandler, address);
090
091 try {
092 FutureClusterResponses futureClusterResponses =
093 _clusterExecutor.execute(clusterRequest);
094
095 return new RemoteFuture<T>(address, futureClusterResponses);
096 }
097 catch (Exception e) {
098 throw new SystemException(
099 "Unable to execute on master " + address.getDescription(), e);
100 }
101 }
102
103 @Override
104 public void initialize() {
105 if (!_clusterExecutor.isEnabled()) {
106 return;
107 }
108
109 try {
110 _localClusterNodeAddress = AddressSerializerUtil.serialize(
111 _clusterExecutor.getLocalClusterNodeAddress());
112
113 _clusterEventListener =
114 new ClusterMasterTokenClusterEventListener();
115
116 _clusterExecutor.addClusterEventListener(_clusterEventListener);
117
118 String masterAddressString = getMasterAddressString();
119
120 _enabled = true;
121
122 notifyMasterTokenTransitionListeners(
123 _localClusterNodeAddress.equals(masterAddressString));
124 }
125 catch (Exception e) {
126 throw new RuntimeException(
127 "Unable to initialize cluster master executor", e);
128 }
129 }
130
131 @Override
132 public boolean isEnabled() {
133 return _enabled;
134 }
135
136 @Override
137 public boolean isMaster() {
138 if (isEnabled()) {
139 return _master;
140 }
141
142 return true;
143 }
144
145 @Override
146 public void registerClusterMasterTokenTransitionListener(
147 ClusterMasterTokenTransitionListener
148 clusterMasterTokenTransitionListener) {
149
150 _clusterMasterTokenTransitionListeners.add(
151 clusterMasterTokenTransitionListener);
152 }
153
154 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
155 _clusterExecutor = clusterExecutor;
156 }
157
158 public void setClusterMasterTokenTransitionListeners(
159 Set<ClusterMasterTokenTransitionListener>
160 clusterMasterTokenTransitionListeners) {
161
162 _clusterMasterTokenTransitionListeners.addAll(
163 clusterMasterTokenTransitionListeners);
164 }
165
166 @Override
167 public void unregisterClusterMasterTokenTransitionListener(
168 ClusterMasterTokenTransitionListener
169 clusterMasterTokenTransitionListener) {
170
171 _clusterMasterTokenTransitionListeners.remove(
172 clusterMasterTokenTransitionListener);
173 }
174
175 protected String getMasterAddressString() {
176 String owner = null;
177
178 while (true) {
179 try {
180 Lock lock = null;
181
182 if (owner == null) {
183 lock = LockLocalServiceUtil.lock(
184 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
185 _localClusterNodeAddress);
186 }
187 else {
188 lock = LockLocalServiceUtil.lock(
189 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
190 _localClusterNodeAddress);
191 }
192
193 owner = lock.getOwner();
194
195 Address address = AddressSerializerUtil.deserialize(owner);
196
197 if (_clusterExecutor.isClusterNodeAlive(address)) {
198 break;
199 }
200 }
201 catch (Exception e) {
202 if (_log.isWarnEnabled()) {
203 _log.warn(
204 "Unable to acquire memory scheduler cluster lock", e);
205 }
206 }
207
208 if (_log.isInfoEnabled()) {
209 if (Validator.isNotNull(owner)) {
210 _log.info("Lock currently held by " + owner);
211 }
212
213 _log.info(
214 "Reattempting to acquire memory scheduler cluster lock");
215 }
216 }
217
218 boolean master = _localClusterNodeAddress.equals(owner);
219
220 if (master == _master) {
221 return owner;
222 }
223
224 _master = master;
225
226 if (_enabled) {
227 notifyMasterTokenTransitionListeners(master);
228 }
229
230 return owner;
231 }
232
233 protected void notifyMasterTokenTransitionListeners(
234 boolean masterTokenAcquired) {
235
236 for (ClusterMasterTokenTransitionListener
237 clusterMasterTokenTransitionListener :
238 _clusterMasterTokenTransitionListeners) {
239
240 if (masterTokenAcquired) {
241 clusterMasterTokenTransitionListener.masterTokenAcquired();
242 }
243 else {
244 clusterMasterTokenTransitionListener.masterTokenReleased();
245 }
246 }
247 }
248
249 private static final String _LOCK_CLASS_NAME =
250 ClusterMasterExecutorImpl.class.getName();
251
252 private static Log _log = LogFactoryUtil.getLog(
253 ClusterMasterExecutorImpl.class);
254
255 private static volatile boolean _master;
256
257 private ClusterEventListener _clusterEventListener;
258 private ClusterExecutor _clusterExecutor;
259 private Set<ClusterMasterTokenTransitionListener>
260 _clusterMasterTokenTransitionListeners =
261 new HashSet<ClusterMasterTokenTransitionListener>();
262 private volatile boolean _enabled;
263 private volatile String _localClusterNodeAddress;
264
265 private class ClusterMasterTokenClusterEventListener
266 implements ClusterEventListener {
267
268 @Override
269 public void processClusterEvent(ClusterEvent clusterEvent) {
270 try {
271 getMasterAddressString();
272 }
273 catch (Exception e) {
274 _log.error("Unable to update the cluster master lock", e);
275 }
276 }
277 }
278
279 private class LocalFuture<T> implements Future<T> {
280
281 public LocalFuture(T result) {
282 _result = result;
283 }
284
285 @Override
286 public boolean cancel(boolean mayInterruptIfRunning) {
287 return false;
288 }
289
290 @Override
291 public boolean isCancelled() {
292 return false;
293 }
294
295 @Override
296 public boolean isDone() {
297 return true;
298 }
299
300 @Override
301 public T get() {
302 return _result;
303 }
304
305 @Override
306 public T get(long timeout, TimeUnit unit) {
307 return _result;
308 }
309
310 private final T _result;
311
312 }
313
314 private class RemoteFuture<T> implements Future<T> {
315
316 public RemoteFuture(
317 Address address, FutureClusterResponses futureClusterResponses) {
318
319 _address = address;
320 _futureClusterResponses = futureClusterResponses;
321 }
322
323 @Override
324 public boolean cancel(boolean mayInterruptIfRunning) {
325 return _futureClusterResponses.cancel(mayInterruptIfRunning);
326 }
327
328 @Override
329 public boolean isCancelled() {
330 return _futureClusterResponses.isCancelled();
331 }
332
333 @Override
334 public boolean isDone() {
335 return _futureClusterResponses.isDone();
336 }
337
338 @Override
339 public T get() throws InterruptedException {
340 ClusterNodeResponses clusterNodeResponses =
341 _futureClusterResponses.get();
342
343 return (T)clusterNodeResponses.getClusterResponse(_address);
344 }
345
346 @Override
347 public T get(long timeout, TimeUnit unit)
348 throws InterruptedException, TimeoutException {
349
350 ClusterNodeResponses clusterNodeResponses =
351 _futureClusterResponses.get(timeout, unit);
352
353 return (T)clusterNodeResponses.getClusterResponse(_address);
354 }
355
356 private final Address _address;
357 private final FutureClusterResponses _futureClusterResponses;
358
359 }
360
361 }