001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.MethodHandler;
031    import com.liferay.portal.kernel.util.Validator;
032    import com.liferay.portal.model.Lock;
033    import com.liferay.portal.service.LockLocalServiceUtil;
034    
035    import java.util.HashSet;
036    import java.util.Set;
037    import java.util.concurrent.Future;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.TimeoutException;
040    
041    /**
042     * @author Michael C. Han
043     */
044    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
045    
046            public void destroy() {
047                    if (!_enabled) {
048                            return;
049                    }
050    
051                    try {
052                            _clusterExecutor.removeClusterEventListener(_clusterEventListener);
053    
054                            LockLocalServiceUtil.unlock(
055                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
056                    }
057                    catch (SystemException se) {
058                            if (_log.isWarnEnabled()) {
059                                    _log.warn("Unable to destroy the cluster master executor", se);
060                            }
061                    }
062            }
063    
064            @Override
065            public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
066                    throws SystemException {
067    
068                    if (!_enabled) {
069                            if (_log.isWarnEnabled()) {
070                                    _log.warn(
071                                            "Executing on the local node because the cluster master " +
072                                                    "executor is disabled");
073                            }
074    
075                            try {
076                                    return new LocalFuture<T>((T)methodHandler.invoke(true));
077                            }
078                            catch (Exception e) {
079                                    throw new SystemException(e);
080                            }
081                    }
082    
083                    String masterAddressString = getMasterAddressString();
084    
085                    Address address = AddressSerializerUtil.deserialize(
086                            masterAddressString);
087    
088                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
089                            methodHandler, address);
090    
091                    try {
092                            FutureClusterResponses futureClusterResponses =
093                                    _clusterExecutor.execute(clusterRequest);
094    
095                            return new RemoteFuture<T>(address, futureClusterResponses);
096                    }
097                    catch (Exception e) {
098                            throw new SystemException(
099                                    "Unable to execute on master " + address.getDescription(), e);
100                    }
101            }
102    
103            @Override
104            public void initialize() {
105                    if (!_clusterExecutor.isEnabled()) {
106                            return;
107                    }
108    
109                    try {
110                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
111                                    _clusterExecutor.getLocalClusterNodeAddress());
112    
113                            _clusterEventListener =
114                                    new ClusterMasterTokenClusterEventListener();
115    
116                            _clusterExecutor.addClusterEventListener(_clusterEventListener);
117    
118                            String masterAddressString = getMasterAddressString();
119    
120                            _enabled = true;
121    
122                            notifyMasterTokenTransitionListeners(
123                                    _localClusterNodeAddress.equals(masterAddressString));
124                    }
125                    catch (Exception e) {
126                            throw new RuntimeException(
127                                    "Unable to initialize cluster master executor", e);
128                    }
129            }
130    
131            @Override
132            public boolean isEnabled() {
133                    return _enabled;
134            }
135    
136            @Override
137            public boolean isMaster() {
138                    if (isEnabled()) {
139                            return _master;
140                    }
141    
142                    return true;
143            }
144    
145            @Override
146            public void registerClusterMasterTokenTransitionListener(
147                    ClusterMasterTokenTransitionListener
148                            clusterMasterTokenTransitionListener) {
149    
150                    _clusterMasterTokenTransitionListeners.add(
151                            clusterMasterTokenTransitionListener);
152            }
153    
154            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
155                    _clusterExecutor = clusterExecutor;
156            }
157    
158            public void setClusterMasterTokenTransitionListeners(
159                    Set<ClusterMasterTokenTransitionListener>
160                            clusterMasterTokenTransitionListeners) {
161    
162                    _clusterMasterTokenTransitionListeners.addAll(
163                            clusterMasterTokenTransitionListeners);
164            }
165    
166            @Override
167            public void unregisterClusterMasterTokenTransitionListener(
168                    ClusterMasterTokenTransitionListener
169                            clusterMasterTokenTransitionListener) {
170    
171                    _clusterMasterTokenTransitionListeners.remove(
172                            clusterMasterTokenTransitionListener);
173            }
174    
175            protected String getMasterAddressString() {
176                    String owner = null;
177    
178                    while (true) {
179                            try {
180                                    Lock lock = null;
181    
182                                    if (owner == null) {
183                                            lock = LockLocalServiceUtil.lock(
184                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
185                                                    _localClusterNodeAddress);
186                                    }
187                                    else {
188                                            lock = LockLocalServiceUtil.lock(
189                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
190                                                    _localClusterNodeAddress);
191                                    }
192    
193                                    owner = lock.getOwner();
194    
195                                    Address address = AddressSerializerUtil.deserialize(owner);
196    
197                                    if (_clusterExecutor.isClusterNodeAlive(address)) {
198                                            break;
199                                    }
200                            }
201                            catch (Exception e) {
202                                    if (_log.isWarnEnabled()) {
203                                            _log.warn(
204                                                    "Unable to acquire memory scheduler cluster lock", e);
205                                    }
206                            }
207    
208                            if (_log.isInfoEnabled()) {
209                                    if (Validator.isNotNull(owner)) {
210                                            _log.info("Lock currently held by " + owner);
211                                    }
212    
213                                    _log.info(
214                                            "Reattempting to acquire memory scheduler cluster lock");
215                            }
216                    }
217    
218                    boolean master = _localClusterNodeAddress.equals(owner);
219    
220                    if (master == _master) {
221                            return owner;
222                    }
223    
224                    _master = master;
225    
226                    if (_enabled) {
227                            notifyMasterTokenTransitionListeners(master);
228                    }
229    
230                    return owner;
231            }
232    
233            protected void notifyMasterTokenTransitionListeners(
234                    boolean masterTokenAcquired) {
235    
236                    for (ClusterMasterTokenTransitionListener
237                                    clusterMasterTokenTransitionListener :
238                                            _clusterMasterTokenTransitionListeners) {
239    
240                            if (masterTokenAcquired) {
241                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
242                            }
243                            else {
244                                    clusterMasterTokenTransitionListener.masterTokenReleased();
245                            }
246                    }
247            }
248    
249            private static final String _LOCK_CLASS_NAME =
250                    ClusterMasterExecutorImpl.class.getName();
251    
252            private static Log _log = LogFactoryUtil.getLog(
253                    ClusterMasterExecutorImpl.class);
254    
255            private static volatile boolean _master;
256    
257            private ClusterEventListener _clusterEventListener;
258            private ClusterExecutor _clusterExecutor;
259            private Set<ClusterMasterTokenTransitionListener>
260                    _clusterMasterTokenTransitionListeners =
261                    new HashSet<ClusterMasterTokenTransitionListener>();
262            private volatile boolean _enabled;
263            private volatile String _localClusterNodeAddress;
264    
265            private class ClusterMasterTokenClusterEventListener
266                    implements ClusterEventListener {
267    
268                    @Override
269                    public void processClusterEvent(ClusterEvent clusterEvent) {
270                            try {
271                                    getMasterAddressString();
272                            }
273                            catch (Exception e) {
274                                    _log.error("Unable to update the cluster master lock", e);
275                            }
276                    }
277            }
278    
279            private class LocalFuture<T> implements Future<T> {
280    
281                    public LocalFuture(T result) {
282                            _result = result;
283                    }
284    
285                    @Override
286                    public boolean cancel(boolean mayInterruptIfRunning) {
287                            return false;
288                    }
289    
290                    @Override
291                    public boolean isCancelled() {
292                            return false;
293                    }
294    
295                    @Override
296                    public boolean isDone() {
297                            return true;
298                    }
299    
300                    @Override
301                    public T get() {
302                            return _result;
303                    }
304    
305                    @Override
306                    public T get(long timeout, TimeUnit unit) {
307                            return _result;
308                    }
309    
310                    private final T _result;
311    
312            }
313    
314            private class RemoteFuture<T> implements Future<T> {
315    
316                    public RemoteFuture(
317                            Address address, FutureClusterResponses futureClusterResponses) {
318    
319                            _address = address;
320                            _futureClusterResponses = futureClusterResponses;
321                    }
322    
323                    @Override
324                    public boolean cancel(boolean mayInterruptIfRunning) {
325                            return _futureClusterResponses.cancel(mayInterruptIfRunning);
326                    }
327    
328                    @Override
329                    public boolean isCancelled() {
330                            return _futureClusterResponses.isCancelled();
331                    }
332    
333                    @Override
334                    public boolean isDone() {
335                            return _futureClusterResponses.isDone();
336                    }
337    
338                    @Override
339                    public T get() throws InterruptedException {
340                            ClusterNodeResponses clusterNodeResponses =
341                                    _futureClusterResponses.get();
342    
343                            return (T)clusterNodeResponses.getClusterResponse(_address);
344                    }
345    
346                    @Override
347                    public T get(long timeout, TimeUnit unit)
348                            throws InterruptedException, TimeoutException {
349    
350                            ClusterNodeResponses clusterNodeResponses =
351                                    _futureClusterResponses.get(timeout, unit);
352    
353                            return (T)clusterNodeResponses.getClusterResponse(_address);
354                    }
355    
356                    private final Address _address;
357                    private final FutureClusterResponses _futureClusterResponses;
358    
359            }
360    
361    }