001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.log.Log;
030    import com.liferay.portal.kernel.log.LogFactoryUtil;
031    import com.liferay.portal.kernel.util.MethodHandler;
032    
033    import java.util.HashSet;
034    import java.util.Set;
035    import java.util.concurrent.ExecutionException;
036    import java.util.concurrent.Future;
037    import java.util.concurrent.TimeUnit;
038    import java.util.concurrent.TimeoutException;
039    
040    /**
041     * @author Michael C. Han
042     */
043    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
044    
045            public void destroy() {
046                    if (!_enabled) {
047                            return;
048                    }
049    
050                    _clusterExecutor.removeClusterEventListener(_clusterEventListener);
051            }
052    
053            @Override
054            public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
055                    throws SystemException {
056    
057                    if (!_enabled) {
058                            if (_log.isWarnEnabled()) {
059                                    _log.warn(
060                                            "Executing on the local node because the cluster master " +
061                                                    "executor is disabled");
062                            }
063    
064                            try {
065                                    return new LocalFuture<T>((T)methodHandler.invoke(true));
066                            }
067                            catch (Exception e) {
068                                    throw new SystemException(e);
069                            }
070                    }
071    
072                    Address address = getMasterAddress();
073    
074                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
075                            methodHandler, address);
076    
077                    try {
078                            FutureClusterResponses futureClusterResponses =
079                                    _clusterExecutor.execute(clusterRequest);
080    
081                            return new RemoteFuture<T>(address, futureClusterResponses);
082                    }
083                    catch (Exception e) {
084                            throw new SystemException(
085                                    "Unable to execute on master " + address.getDescription(), e);
086                    }
087            }
088    
089            @Override
090            public void initialize() {
091                    if (!_clusterExecutor.isEnabled()) {
092                            return;
093                    }
094    
095                    try {
096                            _localClusterNodeAddress =
097                                    _clusterExecutor.getLocalClusterNodeAddress();
098    
099                            _clusterEventListener =
100                                    new ClusterMasterTokenClusterEventListener();
101    
102                            _clusterExecutor.addClusterEventListener(_clusterEventListener);
103    
104                            Address masterAddress = getMasterAddress();
105    
106                            _enabled = true;
107    
108                            notifyMasterTokenTransitionListeners(
109                                    _localClusterNodeAddress.equals(masterAddress));
110                    }
111                    catch (Exception e) {
112                            throw new RuntimeException(
113                                    "Unable to initialize cluster master executor", e);
114                    }
115            }
116    
117            @Override
118            public boolean isEnabled() {
119                    return _enabled;
120            }
121    
122            @Override
123            public boolean isMaster() {
124                    if (isEnabled()) {
125                            return _master;
126                    }
127    
128                    return true;
129            }
130    
131            @Override
132            public void registerClusterMasterTokenTransitionListener(
133                    ClusterMasterTokenTransitionListener
134                            clusterMasterTokenTransitionListener) {
135    
136                    _clusterMasterTokenTransitionListeners.add(
137                            clusterMasterTokenTransitionListener);
138            }
139    
140            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
141                    _clusterExecutor = clusterExecutor;
142            }
143    
144            public void setClusterMasterTokenTransitionListeners(
145                    Set<ClusterMasterTokenTransitionListener>
146                            clusterMasterTokenTransitionListeners) {
147    
148                    _clusterMasterTokenTransitionListeners.addAll(
149                            clusterMasterTokenTransitionListeners);
150            }
151    
152            @Override
153            public void unregisterClusterMasterTokenTransitionListener(
154                    ClusterMasterTokenTransitionListener
155                            clusterMasterTokenTransitionListener) {
156    
157                    _clusterMasterTokenTransitionListeners.remove(
158                            clusterMasterTokenTransitionListener);
159            }
160    
161            protected Address getMasterAddress() {
162                    Address coordinator = _clusterExecutor.getCoordinatorAddress();
163    
164                    boolean master = _localClusterNodeAddress.equals(coordinator);
165    
166                    if (master != _master) {
167                            _master = master;
168    
169                            if (_enabled) {
170                                    notifyMasterTokenTransitionListeners(master);
171                            }
172                    }
173    
174                    return coordinator;
175            }
176    
177            protected void notifyMasterTokenTransitionListeners(
178                    boolean masterTokenAcquired) {
179    
180                    for (ClusterMasterTokenTransitionListener
181                                    clusterMasterTokenTransitionListener :
182                                            _clusterMasterTokenTransitionListeners) {
183    
184                            if (masterTokenAcquired) {
185                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
186                            }
187                            else {
188                                    clusterMasterTokenTransitionListener.masterTokenReleased();
189                            }
190                    }
191            }
192    
193            private static Log _log = LogFactoryUtil.getLog(
194                    ClusterMasterExecutorImpl.class);
195    
196            private static volatile boolean _master;
197    
198            private ClusterEventListener _clusterEventListener;
199            private ClusterExecutor _clusterExecutor;
200            private Set<ClusterMasterTokenTransitionListener>
201                    _clusterMasterTokenTransitionListeners =
202                    new HashSet<ClusterMasterTokenTransitionListener>();
203            private volatile boolean _enabled;
204            private volatile Address _localClusterNodeAddress;
205    
206            private class ClusterMasterTokenClusterEventListener
207                    implements ClusterEventListener {
208    
209                    @Override
210                    public void processClusterEvent(ClusterEvent clusterEvent) {
211                            ClusterEventType clusterEventType =
212                                    clusterEvent.getClusterEventType();
213    
214                            if (clusterEventType == ClusterEventType.COORDINATOR_UPDATE) {
215                                    getMasterAddress();
216                            }
217                    }
218            }
219    
220            private class LocalFuture<T> implements Future<T> {
221    
222                    public LocalFuture(T result) {
223                            _result = result;
224                    }
225    
226                    @Override
227                    public boolean cancel(boolean mayInterruptIfRunning) {
228                            return false;
229                    }
230    
231                    @Override
232                    public boolean isCancelled() {
233                            return false;
234                    }
235    
236                    @Override
237                    public boolean isDone() {
238                            return true;
239                    }
240    
241                    @Override
242                    public T get() {
243                            return _result;
244                    }
245    
246                    @Override
247                    public T get(long timeout, TimeUnit unit) {
248                            return _result;
249                    }
250    
251                    private final T _result;
252    
253            }
254    
255            private class RemoteFuture<T> implements Future<T> {
256    
257                    public RemoteFuture(
258                            Address address, FutureClusterResponses futureClusterResponses) {
259    
260                            _address = address;
261                            _futureClusterResponses = futureClusterResponses;
262                    }
263    
264                    @Override
265                    public boolean cancel(boolean mayInterruptIfRunning) {
266                            return _futureClusterResponses.cancel(mayInterruptIfRunning);
267                    }
268    
269                    @Override
270                    public boolean isCancelled() {
271                            return _futureClusterResponses.isCancelled();
272                    }
273    
274                    @Override
275                    public boolean isDone() {
276                            return _futureClusterResponses.isDone();
277                    }
278    
279                    @Override
280                    public T get() throws ExecutionException, InterruptedException {
281                            ClusterNodeResponses clusterNodeResponses =
282                                    _futureClusterResponses.get();
283    
284                            ClusterNodeResponse clusterNodeResponse =
285                                    clusterNodeResponses.getClusterResponse(_address);
286    
287                            try {
288                                    return (T)clusterNodeResponse.getResult();
289                            }
290                            catch (Exception e) {
291                                    throw new ExecutionException(e);
292                            }
293                    }
294    
295                    @Override
296                    public T get(long timeout, TimeUnit unit)
297                            throws ExecutionException, InterruptedException, TimeoutException {
298    
299                            ClusterNodeResponses clusterNodeResponses =
300                                    _futureClusterResponses.get(timeout, unit);
301    
302                            ClusterNodeResponse clusterNodeResponse =
303                                    clusterNodeResponses.getClusterResponse(_address);
304    
305                            try {
306                                    return (T)clusterNodeResponse.getResult();
307                            }
308                            catch (Exception e) {
309                                    throw new ExecutionException(e);
310                            }
311                    }
312    
313                    private final Address _address;
314                    private final FutureClusterResponses _futureClusterResponses;
315    
316            }
317    
318    }