001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
027    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
028    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.log.Log;
031    import com.liferay.portal.kernel.log.LogFactoryUtil;
032    import com.liferay.portal.kernel.util.MethodHandler;
033    import com.liferay.portal.kernel.util.Validator;
034    import com.liferay.portal.model.Lock;
035    import com.liferay.portal.service.LockLocalServiceUtil;
036    
037    import java.util.HashSet;
038    import java.util.Set;
039    
040    /**
041     * @author Michael C. Han
042     */
043    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
044    
045            public void destroy() {
046                    if (!_enabled) {
047                            return;
048                    }
049    
050                    try {
051                            _clusterExecutor.removeClusterEventListener(_clusterEventListener);
052    
053                            LockLocalServiceUtil.unlock(
054                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
055                    }
056                    catch (SystemException se) {
057                            if (_log.isWarnEnabled()) {
058                                    _log.warn("Unable to destroy the cluster master executor", se);
059                            }
060                    }
061            }
062    
063            @Override
064            public <T> NoticeableFuture<T> executeOnMaster(
065                    MethodHandler methodHandler) {
066    
067                    if (!_enabled) {
068                            if (_log.isWarnEnabled()) {
069                                    _log.warn(
070                                            "Executing on the local node because the cluster master " +
071                                                    "executor is disabled");
072                            }
073    
074                            DefaultNoticeableFuture<T> defaultNoticeableFuture =
075                                    new DefaultNoticeableFuture<T>();
076    
077                            try {
078                                    defaultNoticeableFuture.set((T)methodHandler.invoke());
079    
080                                    return defaultNoticeableFuture;
081                            }
082                            catch (Exception e) {
083                                    throw new SystemException(e);
084                            }
085                    }
086    
087                    String masterAddressString = getMasterAddressString();
088    
089                    final Address address = AddressSerializerUtil.deserialize(
090                            masterAddressString);
091    
092                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
093                            methodHandler, address);
094    
095                    try {
096                            return new NoticeableFutureConverter<T, ClusterNodeResponses>(
097                                    _clusterExecutor.execute(clusterRequest)) {
098    
099                                            @Override
100                                            protected T convert(
101                                                    ClusterNodeResponses clusterNodeResponses) {
102    
103                                                    return (T)clusterNodeResponses.getClusterResponse(
104                                                            address);
105                                            }
106    
107                                    };
108                    }
109                    catch (Exception e) {
110                            throw new SystemException(
111                                    "Unable to execute on master " + address.getDescription(), e);
112                    }
113            }
114    
115            @Override
116            public void initialize() {
117                    if (!_clusterExecutor.isEnabled()) {
118                            return;
119                    }
120    
121                    try {
122                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
123                                    _clusterExecutor.getLocalClusterNodeAddress());
124    
125                            _clusterEventListener =
126                                    new ClusterMasterTokenClusterEventListener();
127    
128                            _clusterExecutor.addClusterEventListener(_clusterEventListener);
129    
130                            String masterAddressString = getMasterAddressString();
131    
132                            _enabled = true;
133    
134                            notifyMasterTokenTransitionListeners(
135                                    _localClusterNodeAddress.equals(masterAddressString));
136                    }
137                    catch (Exception e) {
138                            throw new RuntimeException(
139                                    "Unable to initialize cluster master executor", e);
140                    }
141            }
142    
143            @Override
144            public boolean isEnabled() {
145                    return _enabled;
146            }
147    
148            @Override
149            public boolean isMaster() {
150                    return _master;
151            }
152    
153            @Override
154            public void registerClusterMasterTokenTransitionListener(
155                    ClusterMasterTokenTransitionListener
156                            clusterMasterTokenTransitionListener) {
157    
158                    _clusterMasterTokenTransitionListeners.add(
159                            clusterMasterTokenTransitionListener);
160            }
161    
162            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
163                    _clusterExecutor = clusterExecutor;
164            }
165    
166            public void setClusterMasterTokenTransitionListeners(
167                    Set<ClusterMasterTokenTransitionListener>
168                            clusterMasterTokenTransitionListeners) {
169    
170                    _clusterMasterTokenTransitionListeners.addAll(
171                            clusterMasterTokenTransitionListeners);
172            }
173    
174            @Override
175            public void unregisterClusterMasterTokenTransitionListener(
176                    ClusterMasterTokenTransitionListener
177                            clusterMasterTokenTransitionListener) {
178    
179                    _clusterMasterTokenTransitionListeners.remove(
180                            clusterMasterTokenTransitionListener);
181            }
182    
183            protected String getMasterAddressString() {
184                    String owner = null;
185    
186                    while (true) {
187                            try {
188                                    Lock lock = null;
189    
190                                    if (owner == null) {
191                                            lock = LockLocalServiceUtil.lock(
192                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
193                                                    _localClusterNodeAddress);
194                                    }
195                                    else {
196                                            lock = LockLocalServiceUtil.lock(
197                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
198                                                    _localClusterNodeAddress);
199                                    }
200    
201                                    owner = lock.getOwner();
202    
203                                    Address address = AddressSerializerUtil.deserialize(owner);
204    
205                                    if (_clusterExecutor.isClusterNodeAlive(address)) {
206                                            break;
207                                    }
208                            }
209                            catch (Exception e) {
210                                    if (_log.isWarnEnabled()) {
211                                            _log.warn(
212                                                    "Unable to acquire memory scheduler cluster lock", e);
213                                    }
214                            }
215    
216                            if (_log.isInfoEnabled()) {
217                                    if (Validator.isNotNull(owner)) {
218                                            _log.info("Lock currently held by " + owner);
219                                    }
220    
221                                    _log.info(
222                                            "Reattempting to acquire memory scheduler cluster lock");
223                            }
224                    }
225    
226                    boolean master = _localClusterNodeAddress.equals(owner);
227    
228                    if (master == _master) {
229                            return owner;
230                    }
231    
232                    _master = master;
233    
234                    if (_enabled) {
235                            notifyMasterTokenTransitionListeners(master);
236                    }
237    
238                    return owner;
239            }
240    
241            protected void notifyMasterTokenTransitionListeners(
242                    boolean masterTokenAcquired) {
243    
244                    for (ClusterMasterTokenTransitionListener
245                                    clusterMasterTokenTransitionListener :
246                                            _clusterMasterTokenTransitionListeners) {
247    
248                            if (masterTokenAcquired) {
249                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
250                            }
251                            else {
252                                    clusterMasterTokenTransitionListener.masterTokenReleased();
253                            }
254                    }
255            }
256    
257            private static final String _LOCK_CLASS_NAME =
258                    ClusterMasterExecutorImpl.class.getName();
259    
260            private static final Log _log = LogFactoryUtil.getLog(
261                    ClusterMasterExecutorImpl.class);
262    
263            private static volatile boolean _master;
264    
265            private ClusterEventListener _clusterEventListener;
266            private ClusterExecutor _clusterExecutor;
267            private final Set<ClusterMasterTokenTransitionListener>
268                    _clusterMasterTokenTransitionListeners =
269                            new HashSet<ClusterMasterTokenTransitionListener>();
270            private volatile boolean _enabled;
271            private volatile String _localClusterNodeAddress;
272    
273            private class ClusterMasterTokenClusterEventListener
274                    implements ClusterEventListener {
275    
276                    @Override
277                    public void processClusterEvent(ClusterEvent clusterEvent) {
278                            try {
279                                    getMasterAddressString();
280                            }
281                            catch (Exception e) {
282                                    _log.error("Unable to update the cluster master lock", e);
283                            }
284                    }
285            }
286    
287    }