001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.MethodHandler;
031 import com.liferay.portal.model.Lock;
032 import com.liferay.portal.service.LockLocalServiceUtil;
033
034 import java.util.HashSet;
035 import java.util.Set;
036 import java.util.concurrent.Future;
037 import java.util.concurrent.TimeUnit;
038 import java.util.concurrent.TimeoutException;
039
040
043 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
044
045 public void destroy() {
046 if (!_enabled) {
047 return;
048 }
049
050 try {
051 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
052
053 LockLocalServiceUtil.unlock(
054 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
055 }
056 catch (SystemException se) {
057 if (_log.isWarnEnabled()) {
058 _log.warn("Unable to destroy the cluster master executor", se);
059 }
060 }
061 }
062
063 @Override
064 public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
065 throws SystemException {
066
067 if (!_enabled) {
068 if (_log.isWarnEnabled()) {
069 _log.warn(
070 "Executing on the local node because the cluster master " +
071 "executor is disabled");
072 }
073
074 try {
075 return new LocalFuture<T>((T)methodHandler.invoke(true));
076 }
077 catch (Exception e) {
078 throw new SystemException(e);
079 }
080 }
081
082 String masterAddressString = getMasterAddressString();
083
084 Address address = AddressSerializerUtil.deserialize(
085 masterAddressString);
086
087 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
088 methodHandler, address);
089
090 try {
091 FutureClusterResponses futureClusterResponses =
092 _clusterExecutor.execute(clusterRequest);
093
094 return new RemoteFuture<T>(address, futureClusterResponses);
095 }
096 catch (Exception e) {
097 throw new SystemException(
098 "Unable to execute on master " + address.getDescription(), e);
099 }
100 }
101
102 @Override
103 public void initialize() {
104 if (!_clusterExecutor.isEnabled()) {
105 return;
106 }
107
108 try {
109 _localClusterNodeAddress = AddressSerializerUtil.serialize(
110 _clusterExecutor.getLocalClusterNodeAddress());
111
112 _clusterEventListener =
113 new ClusterMasterTokenClusterEventListener();
114
115 _clusterExecutor.addClusterEventListener(_clusterEventListener);
116
117 String masterAddressString = getMasterAddressString();
118
119 _enabled = true;
120
121 notifyMasterTokenTransitionListeners(
122 _localClusterNodeAddress.equals(masterAddressString));
123 }
124 catch (Exception e) {
125 throw new RuntimeException(
126 "Unable to initialize cluster master executor", e);
127 }
128 }
129
130 @Override
131 public boolean isMaster() {
132 return _master;
133 }
134
135 @Override
136 public void registerClusterMasterTokenTransitionListener(
137 ClusterMasterTokenTransitionListener
138 clusterMasterTokenTransitionListener) {
139
140 _clusterMasterTokenTransitionListeners.add(
141 clusterMasterTokenTransitionListener);
142 }
143
144 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
145 _clusterExecutor = clusterExecutor;
146 }
147
148 public void setClusterMasterTokenTransitionListeners(
149 Set<ClusterMasterTokenTransitionListener>
150 clusterMasterTokenTransitionListeners) {
151
152 _clusterMasterTokenTransitionListeners.addAll(
153 clusterMasterTokenTransitionListeners);
154 }
155
156 @Override
157 public void unregisterClusterMasterTokenTransitionListener(
158 ClusterMasterTokenTransitionListener
159 clusterMasterTokenTransitionListener) {
160
161 _clusterMasterTokenTransitionListeners.remove(
162 clusterMasterTokenTransitionListener);
163 }
164
165 protected String getMasterAddressString() {
166 String owner = null;
167
168 while (true) {
169 try {
170 Lock lock = null;
171
172 if (owner == null) {
173 lock = LockLocalServiceUtil.lock(
174 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
175 _localClusterNodeAddress);
176 }
177 else {
178 lock = LockLocalServiceUtil.lock(
179 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
180 _localClusterNodeAddress);
181 }
182
183 owner = lock.getOwner();
184
185 Address address = AddressSerializerUtil.deserialize(owner);
186
187 if (_clusterExecutor.isClusterNodeAlive(address)) {
188 break;
189 }
190 }
191 catch (Exception e) {
192 if (_log.isWarnEnabled()) {
193 _log.warn(
194 "Unable to obtain the cluster master token. Trying " +
195 "again.",
196 e);
197 }
198 }
199 }
200
201 boolean master = _localClusterNodeAddress.equals(owner);
202
203 if (master == _master) {
204 return owner;
205 }
206
207 _master = master;
208
209 if (_enabled) {
210 notifyMasterTokenTransitionListeners(master);
211 }
212
213 return owner;
214 }
215
216 protected void notifyMasterTokenTransitionListeners(
217 boolean masterTokenAcquired) {
218
219 for (ClusterMasterTokenTransitionListener
220 clusterMasterTokenTransitionListener :
221 _clusterMasterTokenTransitionListeners) {
222
223 if (masterTokenAcquired) {
224 clusterMasterTokenTransitionListener.masterTokenAcquired();
225 }
226 else {
227 clusterMasterTokenTransitionListener.masterTokenReleased();
228 }
229 }
230 }
231
232 private static final String _LOCK_CLASS_NAME =
233 ClusterMasterExecutorImpl.class.getName();
234
235 private static Log _log = LogFactoryUtil.getLog(
236 ClusterMasterExecutorImpl.class);
237
238 private static volatile boolean _master;
239
240 private ClusterEventListener _clusterEventListener;
241 private ClusterExecutor _clusterExecutor;
242 private Set<ClusterMasterTokenTransitionListener>
243 _clusterMasterTokenTransitionListeners =
244 new HashSet<ClusterMasterTokenTransitionListener>();
245 private volatile boolean _enabled;
246 private volatile String _localClusterNodeAddress;
247
248 private class ClusterMasterTokenClusterEventListener
249 implements ClusterEventListener {
250
251 @Override
252 public void processClusterEvent(ClusterEvent clusterEvent) {
253 try {
254 getMasterAddressString();
255 }
256 catch (Exception e) {
257 _log.error("Unable to update the cluster master lock", e);
258 }
259 }
260 }
261
262 private class LocalFuture<T> implements Future<T> {
263
264 public LocalFuture(T result) {
265 _result = result;
266 }
267
268 @Override
269 public boolean cancel(boolean mayInterruptIfRunning) {
270 return false;
271 }
272
273 @Override
274 public boolean isCancelled() {
275 return false;
276 }
277
278 @Override
279 public boolean isDone() {
280 return true;
281 }
282
283 @Override
284 public T get() {
285 return _result;
286 }
287
288 @Override
289 public T get(long timeout, TimeUnit unit) {
290 return _result;
291 }
292
293 private final T _result;
294
295 }
296
297 private class RemoteFuture<T> implements Future<T> {
298
299 public RemoteFuture(
300 Address address, FutureClusterResponses futureClusterResponses) {
301
302 _address = address;
303 _futureClusterResponses = futureClusterResponses;
304 }
305
306 @Override
307 public boolean cancel(boolean mayInterruptIfRunning) {
308 return _futureClusterResponses.cancel(mayInterruptIfRunning);
309 }
310
311 @Override
312 public boolean isCancelled() {
313 return _futureClusterResponses.isCancelled();
314 }
315
316 @Override
317 public boolean isDone() {
318 return _futureClusterResponses.isDone();
319 }
320
321 @Override
322 public T get() throws InterruptedException {
323 ClusterNodeResponses clusterNodeResponses =
324 _futureClusterResponses.get();
325
326 return (T)clusterNodeResponses.getClusterResponse(_address);
327 }
328
329 @Override
330 public T get(long timeout, TimeUnit unit)
331 throws InterruptedException, TimeoutException {
332
333 ClusterNodeResponses clusterNodeResponses =
334 _futureClusterResponses.get(timeout, unit);
335
336 return (T)clusterNodeResponses.getClusterResponse(_address);
337 }
338
339 private final Address _address;
340 private final FutureClusterResponses _futureClusterResponses;
341
342 }
343
344 }