001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.executor;
016    
017    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018    import com.liferay.portal.kernel.executor.PortalExecutorFactory;
019    import com.liferay.portal.kernel.executor.PortalExecutorManager;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
023    
024    import java.util.Map;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.ExecutionException;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.TimeoutException;
031    
032    /**
033     * @author Shuyang Zhou
034     */
035    @DoPrivileged
036    public class PortalExecutorManagerImpl implements PortalExecutorManager {
037    
038            public void afterPropertiesSet() {
039                    if (_portalExecutorFactory == null) {
040                            throw new IllegalArgumentException(
041                                    "Portal executor factory is null");
042                    }
043            }
044    
045            public <T> Future<T> execute(String name, Callable<T> callable) {
046                    ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
047    
048                    return threadPoolExecutor.submit(callable);
049            }
050    
051            public <T> T execute(
052                            String name, Callable<T> callable, long timeout, TimeUnit timeUnit)
053                    throws ExecutionException, InterruptedException, TimeoutException {
054    
055                    ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
056    
057                    Future<T> future = threadPoolExecutor.submit(callable);
058    
059                    return future.get(timeout, timeUnit);
060            }
061    
062            public ThreadPoolExecutor getPortalExecutor(String name) {
063                    return getPortalExecutor(name, true);
064            }
065    
066            public ThreadPoolExecutor getPortalExecutor(
067                    String name, boolean createIfAbsent) {
068    
069                    ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.get(name);
070    
071                    if ((threadPoolExecutor == null) && createIfAbsent) {
072                            synchronized (_threadPoolExecutors) {
073                                    threadPoolExecutor = _threadPoolExecutors.get(name);
074    
075                                    if (threadPoolExecutor == null) {
076                                            threadPoolExecutor =
077                                                    _portalExecutorFactory.createPortalExecutor(name);
078    
079                                            _threadPoolExecutors.put(name, threadPoolExecutor);
080                                    }
081                            }
082                    }
083    
084                    return threadPoolExecutor;
085            }
086    
087            public ThreadPoolExecutor registerPortalExecutor(
088                    String name, ThreadPoolExecutor threadPoolExecutor) {
089    
090                    ThreadPoolExecutor oldThreadPoolExecutor = _threadPoolExecutors.get(
091                            name);
092    
093                    if (oldThreadPoolExecutor == null) {
094                            synchronized (_threadPoolExecutors) {
095                                    oldThreadPoolExecutor = _threadPoolExecutors.get(name);
096    
097                                    if (oldThreadPoolExecutor == null) {
098                                            oldThreadPoolExecutor = _threadPoolExecutors.put(
099                                                    name, threadPoolExecutor);
100                                    }
101                            }
102                    }
103    
104                    return oldThreadPoolExecutor;
105            }
106    
107            public void setPortalExecutorFactory(
108                    PortalExecutorFactory portalExecutorFactory) {
109    
110                    _portalExecutorFactory = portalExecutorFactory;
111            }
112    
113            public void setPortalExecutors(
114                    Map<String, ThreadPoolExecutor> threadPoolExecutors) {
115    
116                    if (threadPoolExecutors != null) {
117                            _threadPoolExecutors =
118                                    new ConcurrentHashMap<String, ThreadPoolExecutor>(
119                                            threadPoolExecutors);
120                    }
121            }
122    
123            public void shutdown() {
124                    shutdown(false);
125            }
126    
127            public void shutdown(boolean interrupt) {
128                    for (Map.Entry<String, ThreadPoolExecutor> entry :
129                                    _threadPoolExecutors.entrySet()) {
130    
131                            ThreadPoolExecutor threadPoolExecutor = entry.getValue();
132    
133                            if (interrupt) {
134                                    threadPoolExecutor.shutdownNow();
135                            }
136                            else {
137                                    threadPoolExecutor.shutdown();
138                            }
139                    }
140    
141                    _threadPoolExecutors.clear();
142            }
143    
144            public void shutdown(String name) {
145                    shutdown(name, false);
146            }
147    
148            public void shutdown(String name, boolean interrupt) {
149                    ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.remove(
150                            name);
151    
152                    if (threadPoolExecutor == null) {
153                            if (_log.isDebugEnabled()) {
154                                    _log.debug("No portal executor found for name " + name);
155                            }
156    
157                            return;
158                    }
159    
160                    if (interrupt) {
161                            threadPoolExecutor.shutdownNow();
162                    }
163                    else {
164                            threadPoolExecutor.shutdown();
165                    }
166            }
167    
168            private static Log _log = LogFactoryUtil.getLog(
169                    PortalExecutorManagerImpl.class);
170    
171            private PortalExecutorFactory _portalExecutorFactory;
172            private Map<String, ThreadPoolExecutor> _threadPoolExecutors =
173                    new ConcurrentHashMap<String, ThreadPoolExecutor>();
174    
175    }