001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
028    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
029    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
030    import com.liferay.portal.kernel.exception.SystemException;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.util.MethodHandler;
034    import com.liferay.portal.kernel.util.Validator;
035    import com.liferay.portal.model.Lock;
036    import com.liferay.portal.service.LockLocalServiceUtil;
037    
038    import java.util.HashSet;
039    import java.util.Set;
040    
041    /**
042     * @author Michael C. Han
043     */
044    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
045    
046            public void destroy() {
047                    if (!_enabled) {
048                            return;
049                    }
050    
051                    try {
052                            _clusterExecutor.removeClusterEventListener(_clusterEventListener);
053    
054                            LockLocalServiceUtil.unlock(
055                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
056                    }
057                    catch (SystemException se) {
058                            if (_log.isWarnEnabled()) {
059                                    _log.warn("Unable to destroy the cluster master executor", se);
060                            }
061                    }
062            }
063    
064            @Override
065            public <T> NoticeableFuture<T> executeOnMaster(
066                    MethodHandler methodHandler) {
067    
068                    if (!_enabled) {
069                            if (_log.isWarnEnabled()) {
070                                    _log.warn(
071                                            "Executing on the local node because the cluster master " +
072                                                    "executor is disabled");
073                            }
074    
075                            DefaultNoticeableFuture<T> defaultNoticeableFuture =
076                                    new DefaultNoticeableFuture<T>();
077    
078                            try {
079                                    defaultNoticeableFuture.set((T)methodHandler.invoke());
080    
081                                    return defaultNoticeableFuture;
082                            }
083                            catch (Exception e) {
084                                    throw new SystemException(e);
085                            }
086                    }
087    
088                    String masterAddressString = getMasterAddressString();
089    
090                    final Address address = AddressSerializerUtil.deserialize(
091                            masterAddressString);
092    
093                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
094                            methodHandler, address);
095    
096                    try {
097                            return new NoticeableFutureConverter<T, ClusterNodeResponses>(
098                                    _clusterExecutor.execute(clusterRequest)) {
099    
100                                            @Override
101                                            protected T convert(
102                                                            ClusterNodeResponses clusterNodeResponses)
103                                                    throws Exception {
104    
105                                                    ClusterNodeResponse clusterNodeResponse =
106                                                            clusterNodeResponses.getClusterResponse(address);
107    
108                                                    return (T)clusterNodeResponse.getResult();
109                                            }
110    
111                                    };
112                    }
113                    catch (Exception e) {
114                            throw new SystemException(
115                                    "Unable to execute on master " + address.getDescription(), e);
116                    }
117            }
118    
119            @Override
120            public void initialize() {
121                    if (!_clusterExecutor.isEnabled()) {
122                            return;
123                    }
124    
125                    _localClusterNodeAddress = AddressSerializerUtil.serialize(
126                            _clusterExecutor.getLocalClusterNodeAddress());
127    
128                    _clusterEventListener = new ClusterMasterTokenClusterEventListener();
129    
130                    _clusterExecutor.addClusterEventListener(_clusterEventListener);
131    
132                    String masterAddressString = getMasterAddressString();
133    
134                    _enabled = true;
135    
136                    notifyMasterTokenTransitionListeners(
137                            _localClusterNodeAddress.equals(masterAddressString));
138            }
139    
140            @Override
141            public boolean isEnabled() {
142                    return _enabled;
143            }
144    
145            @Override
146            public boolean isMaster() {
147                    if (isEnabled()) {
148                            return _master;
149                    }
150    
151                    return true;
152            }
153    
154            @Override
155            public void registerClusterMasterTokenTransitionListener(
156                    ClusterMasterTokenTransitionListener
157                            clusterMasterTokenTransitionListener) {
158    
159                    _clusterMasterTokenTransitionListeners.add(
160                            clusterMasterTokenTransitionListener);
161            }
162    
163            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
164                    _clusterExecutor = clusterExecutor;
165            }
166    
167            public void setClusterMasterTokenTransitionListeners(
168                    Set<ClusterMasterTokenTransitionListener>
169                            clusterMasterTokenTransitionListeners) {
170    
171                    _clusterMasterTokenTransitionListeners.addAll(
172                            clusterMasterTokenTransitionListeners);
173            }
174    
175            @Override
176            public void unregisterClusterMasterTokenTransitionListener(
177                    ClusterMasterTokenTransitionListener
178                            clusterMasterTokenTransitionListener) {
179    
180                    _clusterMasterTokenTransitionListeners.remove(
181                            clusterMasterTokenTransitionListener);
182            }
183    
184            protected String getMasterAddressString() {
185                    String owner = null;
186    
187                    while (true) {
188                            try {
189                                    Lock lock = null;
190    
191                                    if (owner == null) {
192                                            lock = LockLocalServiceUtil.lock(
193                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
194                                                    _localClusterNodeAddress);
195                                    }
196                                    else {
197                                            lock = LockLocalServiceUtil.lock(
198                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
199                                                    _localClusterNodeAddress);
200                                    }
201    
202                                    owner = lock.getOwner();
203    
204                                    Address address = AddressSerializerUtil.deserialize(owner);
205    
206                                    if (_clusterExecutor.isClusterNodeAlive(address)) {
207                                            break;
208                                    }
209                            }
210                            catch (Exception e) {
211                                    if (_log.isWarnEnabled()) {
212                                            _log.warn("Unable to acquire the cluster master lock", e);
213                                    }
214                            }
215    
216                            if (_log.isInfoEnabled()) {
217                                    if (Validator.isNotNull(owner)) {
218                                            _log.info("Lock currently held by " + owner);
219                                    }
220    
221                                    _log.info("Reattempting to acquire the cluster master lock");
222                            }
223                    }
224    
225                    boolean master = _localClusterNodeAddress.equals(owner);
226    
227                    if (master == _master) {
228                            return owner;
229                    }
230    
231                    _master = master;
232    
233                    if (_enabled) {
234                            notifyMasterTokenTransitionListeners(master);
235                    }
236    
237                    return owner;
238            }
239    
240            protected void notifyMasterTokenTransitionListeners(
241                    boolean masterTokenAcquired) {
242    
243                    for (ClusterMasterTokenTransitionListener
244                                    clusterMasterTokenTransitionListener :
245                                            _clusterMasterTokenTransitionListeners) {
246    
247                            if (masterTokenAcquired) {
248                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
249                            }
250                            else {
251                                    clusterMasterTokenTransitionListener.masterTokenReleased();
252                            }
253                    }
254            }
255    
256            private static final String _LOCK_CLASS_NAME =
257                    ClusterMasterExecutorImpl.class.getName();
258    
259            private static final Log _log = LogFactoryUtil.getLog(
260                    ClusterMasterExecutorImpl.class);
261    
262            private static volatile boolean _master;
263    
264            private ClusterEventListener _clusterEventListener;
265            private ClusterExecutor _clusterExecutor;
266            private final Set<ClusterMasterTokenTransitionListener>
267                    _clusterMasterTokenTransitionListeners =
268                            new HashSet<ClusterMasterTokenTransitionListener>();
269            private volatile boolean _enabled;
270            private volatile String _localClusterNodeAddress;
271    
272            private class ClusterMasterTokenClusterEventListener
273                    implements ClusterEventListener {
274    
275                    @Override
276                    public void processClusterEvent(ClusterEvent clusterEvent) {
277                            getMasterAddressString();
278                    }
279    
280            }
281    
282    }