001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.MethodHandler;
031 import com.liferay.portal.kernel.util.Validator;
032 import com.liferay.portal.model.Lock;
033 import com.liferay.portal.service.LockLocalServiceUtil;
034
035 import java.util.HashSet;
036 import java.util.Set;
037 import java.util.concurrent.Future;
038 import java.util.concurrent.TimeUnit;
039 import java.util.concurrent.TimeoutException;
040
041
044 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
045
046 public void destroy() {
047 if (!_enabled) {
048 return;
049 }
050
051 try {
052 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
053
054 LockLocalServiceUtil.unlock(
055 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
056 }
057 catch (SystemException se) {
058 if (_log.isWarnEnabled()) {
059 _log.warn("Unable to destroy the cluster master executor", se);
060 }
061 }
062 }
063
064 @Override
065 public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
066 throws SystemException {
067
068 if (!_enabled) {
069 if (_log.isWarnEnabled()) {
070 _log.warn(
071 "Executing on the local node because the cluster master " +
072 "executor is disabled");
073 }
074
075 try {
076 return new LocalFuture<T>((T)methodHandler.invoke(true));
077 }
078 catch (Exception e) {
079 throw new SystemException(e);
080 }
081 }
082
083 String masterAddressString = getMasterAddressString();
084
085 Address address = AddressSerializerUtil.deserialize(
086 masterAddressString);
087
088 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
089 methodHandler, address);
090
091 try {
092 FutureClusterResponses futureClusterResponses =
093 _clusterExecutor.execute(clusterRequest);
094
095 return new RemoteFuture<T>(address, futureClusterResponses);
096 }
097 catch (Exception e) {
098 throw new SystemException(
099 "Unable to execute on master " + address.getDescription(), e);
100 }
101 }
102
103 @Override
104 public void initialize() {
105 if (!_clusterExecutor.isEnabled()) {
106 return;
107 }
108
109 try {
110 _localClusterNodeAddress = AddressSerializerUtil.serialize(
111 _clusterExecutor.getLocalClusterNodeAddress());
112
113 _clusterEventListener =
114 new ClusterMasterTokenClusterEventListener();
115
116 _clusterExecutor.addClusterEventListener(_clusterEventListener);
117
118 String masterAddressString = getMasterAddressString();
119
120 _enabled = true;
121
122 notifyMasterTokenTransitionListeners(
123 _localClusterNodeAddress.equals(masterAddressString));
124 }
125 catch (Exception e) {
126 throw new RuntimeException(
127 "Unable to initialize cluster master executor", e);
128 }
129 }
130
131 @Override
132 public boolean isEnabled() {
133 return _enabled;
134 }
135
136 @Override
137 public boolean isMaster() {
138 return _master;
139 }
140
141 @Override
142 public void registerClusterMasterTokenTransitionListener(
143 ClusterMasterTokenTransitionListener
144 clusterMasterTokenTransitionListener) {
145
146 _clusterMasterTokenTransitionListeners.add(
147 clusterMasterTokenTransitionListener);
148 }
149
150 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
151 _clusterExecutor = clusterExecutor;
152 }
153
154 public void setClusterMasterTokenTransitionListeners(
155 Set<ClusterMasterTokenTransitionListener>
156 clusterMasterTokenTransitionListeners) {
157
158 _clusterMasterTokenTransitionListeners.addAll(
159 clusterMasterTokenTransitionListeners);
160 }
161
162 @Override
163 public void unregisterClusterMasterTokenTransitionListener(
164 ClusterMasterTokenTransitionListener
165 clusterMasterTokenTransitionListener) {
166
167 _clusterMasterTokenTransitionListeners.remove(
168 clusterMasterTokenTransitionListener);
169 }
170
171 protected String getMasterAddressString() {
172 String owner = null;
173
174 while (true) {
175 try {
176 Lock lock = null;
177
178 if (owner == null) {
179 lock = LockLocalServiceUtil.lock(
180 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
181 _localClusterNodeAddress);
182 }
183 else {
184 lock = LockLocalServiceUtil.lock(
185 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
186 _localClusterNodeAddress);
187 }
188
189 owner = lock.getOwner();
190
191 Address address = AddressSerializerUtil.deserialize(owner);
192
193 if (_clusterExecutor.isClusterNodeAlive(address)) {
194 break;
195 }
196 }
197 catch (Exception e) {
198 if (_log.isWarnEnabled()) {
199 _log.warn(
200 "Unable to acquire memory scheduler cluster lock", e);
201 }
202 }
203
204 if (_log.isInfoEnabled()) {
205 if (Validator.isNotNull(owner)) {
206 _log.info("Lock currently held by " + owner);
207 }
208
209 _log.info(
210 "Reattempting to acquire memory scheduler cluster lock");
211 }
212 }
213
214 boolean master = _localClusterNodeAddress.equals(owner);
215
216 if (master == _master) {
217 return owner;
218 }
219
220 _master = master;
221
222 if (_enabled) {
223 notifyMasterTokenTransitionListeners(master);
224 }
225
226 return owner;
227 }
228
229 protected void notifyMasterTokenTransitionListeners(
230 boolean masterTokenAcquired) {
231
232 for (ClusterMasterTokenTransitionListener
233 clusterMasterTokenTransitionListener :
234 _clusterMasterTokenTransitionListeners) {
235
236 if (masterTokenAcquired) {
237 clusterMasterTokenTransitionListener.masterTokenAcquired();
238 }
239 else {
240 clusterMasterTokenTransitionListener.masterTokenReleased();
241 }
242 }
243 }
244
245 private static final String _LOCK_CLASS_NAME =
246 ClusterMasterExecutorImpl.class.getName();
247
248 private static Log _log = LogFactoryUtil.getLog(
249 ClusterMasterExecutorImpl.class);
250
251 private static volatile boolean _master;
252
253 private ClusterEventListener _clusterEventListener;
254 private ClusterExecutor _clusterExecutor;
255 private Set<ClusterMasterTokenTransitionListener>
256 _clusterMasterTokenTransitionListeners =
257 new HashSet<ClusterMasterTokenTransitionListener>();
258 private volatile boolean _enabled;
259 private volatile String _localClusterNodeAddress;
260
261 private class ClusterMasterTokenClusterEventListener
262 implements ClusterEventListener {
263
264 @Override
265 public void processClusterEvent(ClusterEvent clusterEvent) {
266 try {
267 getMasterAddressString();
268 }
269 catch (Exception e) {
270 _log.error("Unable to update the cluster master lock", e);
271 }
272 }
273 }
274
275 private class LocalFuture<T> implements Future<T> {
276
277 public LocalFuture(T result) {
278 _result = result;
279 }
280
281 @Override
282 public boolean cancel(boolean mayInterruptIfRunning) {
283 return false;
284 }
285
286 @Override
287 public boolean isCancelled() {
288 return false;
289 }
290
291 @Override
292 public boolean isDone() {
293 return true;
294 }
295
296 @Override
297 public T get() {
298 return _result;
299 }
300
301 @Override
302 public T get(long timeout, TimeUnit unit) {
303 return _result;
304 }
305
306 private final T _result;
307
308 }
309
310 private class RemoteFuture<T> implements Future<T> {
311
312 public RemoteFuture(
313 Address address, FutureClusterResponses futureClusterResponses) {
314
315 _address = address;
316 _futureClusterResponses = futureClusterResponses;
317 }
318
319 @Override
320 public boolean cancel(boolean mayInterruptIfRunning) {
321 return _futureClusterResponses.cancel(mayInterruptIfRunning);
322 }
323
324 @Override
325 public boolean isCancelled() {
326 return _futureClusterResponses.isCancelled();
327 }
328
329 @Override
330 public boolean isDone() {
331 return _futureClusterResponses.isDone();
332 }
333
334 @Override
335 public T get() throws InterruptedException {
336 ClusterNodeResponses clusterNodeResponses =
337 _futureClusterResponses.get();
338
339 return (T)clusterNodeResponses.getClusterResponse(_address);
340 }
341
342 @Override
343 public T get(long timeout, TimeUnit unit)
344 throws InterruptedException, TimeoutException {
345
346 ClusterNodeResponses clusterNodeResponses =
347 _futureClusterResponses.get(timeout, unit);
348
349 return (T)clusterNodeResponses.getClusterResponse(_address);
350 }
351
352 private final Address _address;
353 private final FutureClusterResponses _futureClusterResponses;
354
355 }
356
357 }