001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.executor;
016    
017    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
018    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
019    import com.liferay.portal.kernel.executor.PortalExecutorFactory;
020    import com.liferay.portal.kernel.executor.PortalExecutorManager;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
024    
025    import java.util.Map;
026    import java.util.concurrent.Callable;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.ExecutionException;
029    import java.util.concurrent.Future;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.TimeoutException;
032    
033    /**
034     * @author Shuyang Zhou
035     */
036    @DoPrivileged
037    public class PortalExecutorManagerImpl implements PortalExecutorManager {
038    
039            public void afterPropertiesSet() {
040                    if (_portalExecutorFactory == null) {
041                            throw new IllegalArgumentException(
042                                    "Portal executor factory is null");
043                    }
044            }
045    
046            @Override
047            public <T> NoticeableFuture<T> execute(String name, Callable<T> callable) {
048                    ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
049    
050                    return threadPoolExecutor.submit(callable);
051            }
052    
053            @Override
054            public <T> T execute(
055                            String name, Callable<T> callable, long timeout, TimeUnit timeUnit)
056                    throws ExecutionException, InterruptedException, TimeoutException {
057    
058                    ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
059    
060                    Future<T> future = threadPoolExecutor.submit(callable);
061    
062                    return future.get(timeout, timeUnit);
063            }
064    
065            @Override
066            public ThreadPoolExecutor getPortalExecutor(String name) {
067                    return getPortalExecutor(name, true);
068            }
069    
070            @Override
071            public ThreadPoolExecutor getPortalExecutor(
072                    String name, boolean createIfAbsent) {
073    
074                    ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.get(name);
075    
076                    if ((threadPoolExecutor == null) && createIfAbsent) {
077                            synchronized (_threadPoolExecutors) {
078                                    threadPoolExecutor = _threadPoolExecutors.get(name);
079    
080                                    if (threadPoolExecutor == null) {
081                                            threadPoolExecutor =
082                                                    _portalExecutorFactory.createPortalExecutor(name);
083    
084                                            _threadPoolExecutors.put(name, threadPoolExecutor);
085                                    }
086                            }
087                    }
088    
089                    return threadPoolExecutor;
090            }
091    
092            @Override
093            public ThreadPoolExecutor registerPortalExecutor(
094                    String name, ThreadPoolExecutor threadPoolExecutor) {
095    
096                    ThreadPoolExecutor oldThreadPoolExecutor = _threadPoolExecutors.get(
097                            name);
098    
099                    if (oldThreadPoolExecutor == null) {
100                            synchronized (_threadPoolExecutors) {
101                                    oldThreadPoolExecutor = _threadPoolExecutors.get(name);
102    
103                                    if (oldThreadPoolExecutor == null) {
104                                            oldThreadPoolExecutor = _threadPoolExecutors.put(
105                                                    name, threadPoolExecutor);
106                                    }
107                            }
108                    }
109    
110                    return oldThreadPoolExecutor;
111            }
112    
113            public void setPortalExecutorFactory(
114                    PortalExecutorFactory portalExecutorFactory) {
115    
116                    _portalExecutorFactory = portalExecutorFactory;
117            }
118    
119            public void setPortalExecutors(
120                    Map<String, ThreadPoolExecutor> threadPoolExecutors) {
121    
122                    if (threadPoolExecutors != null) {
123                            _threadPoolExecutors =
124                                    new ConcurrentHashMap<String, ThreadPoolExecutor>(
125                                            threadPoolExecutors);
126                    }
127            }
128    
129            @Override
130            public void shutdown() {
131                    shutdown(false);
132            }
133    
134            @Override
135            public void shutdown(boolean interrupt) {
136                    for (Map.Entry<String, ThreadPoolExecutor> entry :
137                                    _threadPoolExecutors.entrySet()) {
138    
139                            ThreadPoolExecutor threadPoolExecutor = entry.getValue();
140    
141                            if (interrupt) {
142                                    threadPoolExecutor.shutdownNow();
143                            }
144                            else {
145                                    threadPoolExecutor.shutdown();
146                            }
147                    }
148    
149                    _threadPoolExecutors.clear();
150            }
151    
152            @Override
153            public void shutdown(String name) {
154                    shutdown(name, false);
155            }
156    
157            @Override
158            public void shutdown(String name, boolean interrupt) {
159                    ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.remove(
160                            name);
161    
162                    if (threadPoolExecutor == null) {
163                            if (_log.isDebugEnabled()) {
164                                    _log.debug("No portal executor found for name " + name);
165                            }
166    
167                            return;
168                    }
169    
170                    if (interrupt) {
171                            threadPoolExecutor.shutdownNow();
172                    }
173                    else {
174                            threadPoolExecutor.shutdown();
175                    }
176            }
177    
178            private static final Log _log = LogFactoryUtil.getLog(
179                    PortalExecutorManagerImpl.class);
180    
181            private PortalExecutorFactory _portalExecutorFactory;
182            private Map<String, ThreadPoolExecutor> _threadPoolExecutors =
183                    new ConcurrentHashMap<String, ThreadPoolExecutor>();
184    
185    }