001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.MethodHandler;
031    import com.liferay.portal.kernel.util.Validator;
032    import com.liferay.portal.model.Lock;
033    import com.liferay.portal.service.LockLocalServiceUtil;
034    
035    import java.util.HashSet;
036    import java.util.Set;
037    import java.util.concurrent.Future;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.TimeoutException;
040    
041    /**
042     * @author Michael C. Han
043     */
044    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
045    
046            public void destroy() {
047                    if (!_enabled) {
048                            return;
049                    }
050    
051                    try {
052                            _clusterExecutor.removeClusterEventListener(_clusterEventListener);
053    
054                            LockLocalServiceUtil.unlock(
055                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
056                    }
057                    catch (SystemException se) {
058                            if (_log.isWarnEnabled()) {
059                                    _log.warn("Unable to destroy the cluster master executor", se);
060                            }
061                    }
062            }
063    
064            @Override
065            public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
066                    throws SystemException {
067    
068                    if (!_enabled) {
069                            if (_log.isWarnEnabled()) {
070                                    _log.warn(
071                                            "Executing on the local node because the cluster master " +
072                                                    "executor is disabled");
073                            }
074    
075                            try {
076                                    return new LocalFuture<T>((T)methodHandler.invoke(true));
077                            }
078                            catch (Exception e) {
079                                    throw new SystemException(e);
080                            }
081                    }
082    
083                    String masterAddressString = getMasterAddressString();
084    
085                    Address address = AddressSerializerUtil.deserialize(
086                            masterAddressString);
087    
088                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
089                            methodHandler, address);
090    
091                    try {
092                            FutureClusterResponses futureClusterResponses =
093                                    _clusterExecutor.execute(clusterRequest);
094    
095                            return new RemoteFuture<T>(address, futureClusterResponses);
096                    }
097                    catch (Exception e) {
098                            throw new SystemException(
099                                    "Unable to execute on master " + address.getDescription(), e);
100                    }
101            }
102    
103            @Override
104            public void initialize() {
105                    if (!_clusterExecutor.isEnabled()) {
106                            return;
107                    }
108    
109                    try {
110                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
111                                    _clusterExecutor.getLocalClusterNodeAddress());
112    
113                            _clusterEventListener =
114                                    new ClusterMasterTokenClusterEventListener();
115    
116                            _clusterExecutor.addClusterEventListener(_clusterEventListener);
117    
118                            String masterAddressString = getMasterAddressString();
119    
120                            _enabled = true;
121    
122                            notifyMasterTokenTransitionListeners(
123                                    _localClusterNodeAddress.equals(masterAddressString));
124                    }
125                    catch (Exception e) {
126                            throw new RuntimeException(
127                                    "Unable to initialize cluster master executor", e);
128                    }
129            }
130    
131            @Override
132            public boolean isEnabled() {
133                    return _enabled;
134            }
135    
136            @Override
137            public boolean isMaster() {
138                    return _master;
139            }
140    
141            @Override
142            public void registerClusterMasterTokenTransitionListener(
143                    ClusterMasterTokenTransitionListener
144                            clusterMasterTokenTransitionListener) {
145    
146                    _clusterMasterTokenTransitionListeners.add(
147                            clusterMasterTokenTransitionListener);
148            }
149    
150            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
151                    _clusterExecutor = clusterExecutor;
152            }
153    
154            public void setClusterMasterTokenTransitionListeners(
155                    Set<ClusterMasterTokenTransitionListener>
156                            clusterMasterTokenTransitionListeners) {
157    
158                    _clusterMasterTokenTransitionListeners.addAll(
159                            clusterMasterTokenTransitionListeners);
160            }
161    
162            @Override
163            public void unregisterClusterMasterTokenTransitionListener(
164                    ClusterMasterTokenTransitionListener
165                            clusterMasterTokenTransitionListener) {
166    
167                    _clusterMasterTokenTransitionListeners.remove(
168                            clusterMasterTokenTransitionListener);
169            }
170    
171            protected String getMasterAddressString() {
172                    String owner = null;
173    
174                    while (true) {
175                            try {
176                                    Lock lock = null;
177    
178                                    if (owner == null) {
179                                            lock = LockLocalServiceUtil.lock(
180                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
181                                                    _localClusterNodeAddress);
182                                    }
183                                    else {
184                                            lock = LockLocalServiceUtil.lock(
185                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
186                                                    _localClusterNodeAddress);
187                                    }
188    
189                                    owner = lock.getOwner();
190    
191                                    Address address = AddressSerializerUtil.deserialize(owner);
192    
193                                    if (_clusterExecutor.isClusterNodeAlive(address)) {
194                                            break;
195                                    }
196                            }
197                            catch (Exception e) {
198                                    if (_log.isWarnEnabled()) {
199                                            _log.warn(
200                                                    "Unable to acquire memory scheduler cluster lock", e);
201                                    }
202                            }
203    
204                            if (_log.isInfoEnabled()) {
205                                    if (Validator.isNotNull(owner)) {
206                                            _log.info("Lock currently held by " + owner);
207                                    }
208    
209                                    _log.info(
210                                            "Reattempting to acquire memory scheduler cluster lock");
211                            }
212                    }
213    
214                    boolean master = _localClusterNodeAddress.equals(owner);
215    
216                    if (master == _master) {
217                            return owner;
218                    }
219    
220                    _master = master;
221    
222                    if (_enabled) {
223                            notifyMasterTokenTransitionListeners(master);
224                    }
225    
226                    return owner;
227            }
228    
229            protected void notifyMasterTokenTransitionListeners(
230                    boolean masterTokenAcquired) {
231    
232                    for (ClusterMasterTokenTransitionListener
233                                    clusterMasterTokenTransitionListener :
234                                            _clusterMasterTokenTransitionListeners) {
235    
236                            if (masterTokenAcquired) {
237                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
238                            }
239                            else {
240                                    clusterMasterTokenTransitionListener.masterTokenReleased();
241                            }
242                    }
243            }
244    
245            private static final String _LOCK_CLASS_NAME =
246                    ClusterMasterExecutorImpl.class.getName();
247    
248            private static Log _log = LogFactoryUtil.getLog(
249                    ClusterMasterExecutorImpl.class);
250    
251            private static volatile boolean _master;
252    
253            private ClusterEventListener _clusterEventListener;
254            private ClusterExecutor _clusterExecutor;
255            private Set<ClusterMasterTokenTransitionListener>
256                    _clusterMasterTokenTransitionListeners =
257                    new HashSet<ClusterMasterTokenTransitionListener>();
258            private volatile boolean _enabled;
259            private volatile String _localClusterNodeAddress;
260    
261            private class ClusterMasterTokenClusterEventListener
262                    implements ClusterEventListener {
263    
264                    @Override
265                    public void processClusterEvent(ClusterEvent clusterEvent) {
266                            try {
267                                    getMasterAddressString();
268                            }
269                            catch (Exception e) {
270                                    _log.error("Unable to update the cluster master lock", e);
271                            }
272                    }
273            }
274    
275            private class LocalFuture<T> implements Future<T> {
276    
277                    public LocalFuture(T result) {
278                            _result = result;
279                    }
280    
281                    @Override
282                    public boolean cancel(boolean mayInterruptIfRunning) {
283                            return false;
284                    }
285    
286                    @Override
287                    public boolean isCancelled() {
288                            return false;
289                    }
290    
291                    @Override
292                    public boolean isDone() {
293                            return true;
294                    }
295    
296                    @Override
297                    public T get() {
298                            return _result;
299                    }
300    
301                    @Override
302                    public T get(long timeout, TimeUnit unit) {
303                            return _result;
304                    }
305    
306                    private final T _result;
307    
308            }
309    
310            private class RemoteFuture<T> implements Future<T> {
311    
312                    public RemoteFuture(
313                            Address address, FutureClusterResponses futureClusterResponses) {
314    
315                            _address = address;
316                            _futureClusterResponses = futureClusterResponses;
317                    }
318    
319                    @Override
320                    public boolean cancel(boolean mayInterruptIfRunning) {
321                            return _futureClusterResponses.cancel(mayInterruptIfRunning);
322                    }
323    
324                    @Override
325                    public boolean isCancelled() {
326                            return _futureClusterResponses.isCancelled();
327                    }
328    
329                    @Override
330                    public boolean isDone() {
331                            return _futureClusterResponses.isDone();
332                    }
333    
334                    @Override
335                    public T get() throws InterruptedException {
336                            ClusterNodeResponses clusterNodeResponses =
337                                    _futureClusterResponses.get();
338    
339                            return (T)clusterNodeResponses.getClusterResponse(_address);
340                    }
341    
342                    @Override
343                    public T get(long timeout, TimeUnit unit)
344                            throws InterruptedException, TimeoutException {
345    
346                            ClusterNodeResponses clusterNodeResponses =
347                                    _futureClusterResponses.get(timeout, unit);
348    
349                            return (T)clusterNodeResponses.getClusterResponse(_address);
350                    }
351    
352                    private final Address _address;
353                    private final FutureClusterResponses _futureClusterResponses;
354    
355            }
356    
357    }