001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.MethodHandler;
031    import com.liferay.portal.model.Lock;
032    import com.liferay.portal.service.LockLocalServiceUtil;
033    
034    import java.util.HashSet;
035    import java.util.Set;
036    import java.util.concurrent.Future;
037    import java.util.concurrent.TimeUnit;
038    import java.util.concurrent.TimeoutException;
039    
040    /**
041     * @author Michael C. Han
042     */
043    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
044    
045            public void destroy() {
046                    if (!_enabled) {
047                            return;
048                    }
049    
050                    try {
051                            _clusterExecutor.removeClusterEventListener(_clusterEventListener);
052    
053                            LockLocalServiceUtil.unlock(
054                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
055                    }
056                    catch (SystemException se) {
057                            if (_log.isWarnEnabled()) {
058                                    _log.warn("Unable to destroy the cluster master executor", se);
059                            }
060                    }
061            }
062    
063            @Override
064            public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
065                    throws SystemException {
066    
067                    if (!_enabled) {
068                            if (_log.isWarnEnabled()) {
069                                    _log.warn(
070                                            "Executing on the local node because the cluster master " +
071                                                    "executor is disabled");
072                            }
073    
074                            try {
075                                    return new LocalFuture<T>((T)methodHandler.invoke(true));
076                            }
077                            catch (Exception e) {
078                                    throw new SystemException(e);
079                            }
080                    }
081    
082                    String masterAddressString = getMasterAddressString();
083    
084                    Address address = AddressSerializerUtil.deserialize(
085                            masterAddressString);
086    
087                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
088                            methodHandler, address);
089    
090                    try {
091                            FutureClusterResponses futureClusterResponses =
092                                    _clusterExecutor.execute(clusterRequest);
093    
094                            return new RemoteFuture<T>(address, futureClusterResponses);
095                    }
096                    catch (Exception e) {
097                            throw new SystemException(
098                                    "Unable to execute on master " + address.getDescription(), e);
099                    }
100            }
101    
102            @Override
103            public void initialize() {
104                    if (!_clusterExecutor.isEnabled()) {
105                            return;
106                    }
107    
108                    try {
109                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
110                                    _clusterExecutor.getLocalClusterNodeAddress());
111    
112                            _clusterEventListener =
113                                    new ClusterMasterTokenClusterEventListener();
114    
115                            _clusterExecutor.addClusterEventListener(_clusterEventListener);
116    
117                            String masterAddressString = getMasterAddressString();
118    
119                            _enabled = true;
120    
121                            notifyMasterTokenTransitionListeners(
122                                    _localClusterNodeAddress.equals(masterAddressString));
123                    }
124                    catch (Exception e) {
125                            throw new RuntimeException(
126                                    "Unable to initialize cluster master executor", e);
127                    }
128            }
129    
130            @Override
131            public boolean isMaster() {
132                    return _master;
133            }
134    
135            @Override
136            public void registerClusterMasterTokenTransitionListener(
137                    ClusterMasterTokenTransitionListener
138                            clusterMasterTokenTransitionListener) {
139    
140                    _clusterMasterTokenTransitionListeners.add(
141                            clusterMasterTokenTransitionListener);
142            }
143    
144            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
145                    _clusterExecutor = clusterExecutor;
146            }
147    
148            public void setClusterMasterTokenTransitionListeners(
149                    Set<ClusterMasterTokenTransitionListener>
150                            clusterMasterTokenTransitionListeners) {
151    
152                    _clusterMasterTokenTransitionListeners.addAll(
153                            clusterMasterTokenTransitionListeners);
154            }
155    
156            @Override
157            public void unregisterClusterMasterTokenTransitionListener(
158                    ClusterMasterTokenTransitionListener
159                            clusterMasterTokenTransitionListener) {
160    
161                    _clusterMasterTokenTransitionListeners.remove(
162                            clusterMasterTokenTransitionListener);
163            }
164    
165            protected String getMasterAddressString() {
166                    String owner = null;
167    
168                    while (true) {
169                            try {
170                                    Lock lock = null;
171    
172                                    if (owner == null) {
173                                            lock = LockLocalServiceUtil.lock(
174                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
175                                                    _localClusterNodeAddress);
176                                    }
177                                    else {
178                                            lock = LockLocalServiceUtil.lock(
179                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
180                                                    _localClusterNodeAddress);
181                                    }
182    
183                                    owner = lock.getOwner();
184    
185                                    Address address = AddressSerializerUtil.deserialize(owner);
186    
187                                    if (_clusterExecutor.isClusterNodeAlive(address)) {
188                                            break;
189                                    }
190                            }
191                            catch (Exception e) {
192                                    if (_log.isWarnEnabled()) {
193                                            _log.warn(
194                                                    "Unable to obtain the cluster master token. Trying " +
195                                                            "again.",
196                                                    e);
197                                    }
198                            }
199                    }
200    
201                    boolean master = _localClusterNodeAddress.equals(owner);
202    
203                    if (master == _master) {
204                            return owner;
205                    }
206    
207                    _master = master;
208    
209                    if (_enabled) {
210                            notifyMasterTokenTransitionListeners(master);
211                    }
212    
213                    return owner;
214            }
215    
216            protected void notifyMasterTokenTransitionListeners(
217                    boolean masterTokenAcquired) {
218    
219                    for (ClusterMasterTokenTransitionListener
220                                    clusterMasterTokenTransitionListener :
221                                            _clusterMasterTokenTransitionListeners) {
222    
223                            if (masterTokenAcquired) {
224                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
225                            }
226                            else {
227                                    clusterMasterTokenTransitionListener.masterTokenReleased();
228                            }
229                    }
230            }
231    
232            private static final String _LOCK_CLASS_NAME =
233                    ClusterMasterExecutorImpl.class.getName();
234    
235            private static Log _log = LogFactoryUtil.getLog(
236                    ClusterMasterExecutorImpl.class);
237    
238            private static volatile boolean _master;
239    
240            private ClusterEventListener _clusterEventListener;
241            private ClusterExecutor _clusterExecutor;
242            private Set<ClusterMasterTokenTransitionListener>
243                    _clusterMasterTokenTransitionListeners =
244                    new HashSet<ClusterMasterTokenTransitionListener>();
245            private volatile boolean _enabled;
246            private volatile String _localClusterNodeAddress;
247    
248            private class ClusterMasterTokenClusterEventListener
249                    implements ClusterEventListener {
250    
251                    @Override
252                    public void processClusterEvent(ClusterEvent clusterEvent) {
253                            try {
254                                    getMasterAddressString();
255                            }
256                            catch (Exception e) {
257                                    _log.error("Unable to update the cluster master lock", e);
258                            }
259                    }
260            }
261    
262            private class LocalFuture<T> implements Future<T> {
263    
264                    public LocalFuture(T result) {
265                            _result = result;
266                    }
267    
268                    @Override
269                    public boolean cancel(boolean mayInterruptIfRunning) {
270                            return false;
271                    }
272    
273                    @Override
274                    public boolean isCancelled() {
275                            return false;
276                    }
277    
278                    @Override
279                    public boolean isDone() {
280                            return true;
281                    }
282    
283                    @Override
284                    public T get() {
285                            return _result;
286                    }
287    
288                    @Override
289                    public T get(long timeout, TimeUnit unit) {
290                            return _result;
291                    }
292    
293                    private final T _result;
294    
295            }
296    
297            private class RemoteFuture<T> implements Future<T> {
298    
299                    public RemoteFuture(
300                            Address address, FutureClusterResponses futureClusterResponses) {
301    
302                            _address = address;
303                            _futureClusterResponses = futureClusterResponses;
304                    }
305    
306                    @Override
307                    public boolean cancel(boolean mayInterruptIfRunning) {
308                            return _futureClusterResponses.cancel(mayInterruptIfRunning);
309                    }
310    
311                    @Override
312                    public boolean isCancelled() {
313                            return _futureClusterResponses.isCancelled();
314                    }
315    
316                    @Override
317                    public boolean isDone() {
318                            return _futureClusterResponses.isDone();
319                    }
320    
321                    @Override
322                    public T get() throws InterruptedException {
323                            ClusterNodeResponses clusterNodeResponses =
324                                    _futureClusterResponses.get();
325    
326                            return (T)clusterNodeResponses.getClusterResponse(_address);
327                    }
328    
329                    @Override
330                    public T get(long timeout, TimeUnit unit)
331                            throws InterruptedException, TimeoutException {
332    
333                            ClusterNodeResponses clusterNodeResponses =
334                                    _futureClusterResponses.get(timeout, unit);
335    
336                            return (T)clusterNodeResponses.getClusterResponse(_address);
337                    }
338    
339                    private final Address _address;
340                    private final FutureClusterResponses _futureClusterResponses;
341    
342            }
343    
344    }