001
014
015 package com.liferay.portal.executor;
016
017 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
018 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
019 import com.liferay.portal.kernel.executor.PortalExecutorFactory;
020 import com.liferay.portal.kernel.executor.PortalExecutorManager;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
024
025 import java.util.Map;
026 import java.util.concurrent.Callable;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.ExecutionException;
029 import java.util.concurrent.Future;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.TimeoutException;
032
033
036 @DoPrivileged
037 public class PortalExecutorManagerImpl implements PortalExecutorManager {
038
039 public void afterPropertiesSet() {
040 if (_portalExecutorFactory == null) {
041 throw new IllegalArgumentException(
042 "Portal executor factory is null");
043 }
044 }
045
046 @Override
047 public <T> NoticeableFuture<T> execute(String name, Callable<T> callable) {
048 ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
049
050 return threadPoolExecutor.submit(callable);
051 }
052
053 @Override
054 public <T> T execute(
055 String name, Callable<T> callable, long timeout, TimeUnit timeUnit)
056 throws ExecutionException, InterruptedException, TimeoutException {
057
058 ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
059
060 Future<T> future = threadPoolExecutor.submit(callable);
061
062 return future.get(timeout, timeUnit);
063 }
064
065 @Override
066 public ThreadPoolExecutor getPortalExecutor(String name) {
067 return getPortalExecutor(name, true);
068 }
069
070 @Override
071 public ThreadPoolExecutor getPortalExecutor(
072 String name, boolean createIfAbsent) {
073
074 ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.get(name);
075
076 if ((threadPoolExecutor == null) && createIfAbsent) {
077 synchronized (_threadPoolExecutors) {
078 threadPoolExecutor = _threadPoolExecutors.get(name);
079
080 if (threadPoolExecutor == null) {
081 threadPoolExecutor =
082 _portalExecutorFactory.createPortalExecutor(name);
083
084 _threadPoolExecutors.put(name, threadPoolExecutor);
085 }
086 }
087 }
088
089 return threadPoolExecutor;
090 }
091
092 @Override
093 public ThreadPoolExecutor registerPortalExecutor(
094 String name, ThreadPoolExecutor threadPoolExecutor) {
095
096 ThreadPoolExecutor oldThreadPoolExecutor = _threadPoolExecutors.get(
097 name);
098
099 if (oldThreadPoolExecutor == null) {
100 synchronized (_threadPoolExecutors) {
101 oldThreadPoolExecutor = _threadPoolExecutors.get(name);
102
103 if (oldThreadPoolExecutor == null) {
104 oldThreadPoolExecutor = _threadPoolExecutors.put(
105 name, threadPoolExecutor);
106 }
107 }
108 }
109
110 return oldThreadPoolExecutor;
111 }
112
113 public void setPortalExecutorFactory(
114 PortalExecutorFactory portalExecutorFactory) {
115
116 _portalExecutorFactory = portalExecutorFactory;
117 }
118
119 public void setPortalExecutors(
120 Map<String, ThreadPoolExecutor> threadPoolExecutors) {
121
122 if (threadPoolExecutors != null) {
123 _threadPoolExecutors =
124 new ConcurrentHashMap<String, ThreadPoolExecutor>(
125 threadPoolExecutors);
126 }
127 }
128
129 @Override
130 public void shutdown() {
131 shutdown(false);
132 }
133
134 @Override
135 public void shutdown(boolean interrupt) {
136 for (Map.Entry<String, ThreadPoolExecutor> entry :
137 _threadPoolExecutors.entrySet()) {
138
139 ThreadPoolExecutor threadPoolExecutor = entry.getValue();
140
141 if (interrupt) {
142 threadPoolExecutor.shutdownNow();
143 }
144 else {
145 threadPoolExecutor.shutdown();
146 }
147 }
148
149 _threadPoolExecutors.clear();
150 }
151
152 @Override
153 public void shutdown(String name) {
154 shutdown(name, false);
155 }
156
157 @Override
158 public void shutdown(String name, boolean interrupt) {
159 ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.remove(
160 name);
161
162 if (threadPoolExecutor == null) {
163 if (_log.isDebugEnabled()) {
164 _log.debug("No portal executor found for name " + name);
165 }
166
167 return;
168 }
169
170 if (interrupt) {
171 threadPoolExecutor.shutdownNow();
172 }
173 else {
174 threadPoolExecutor.shutdown();
175 }
176 }
177
178 private static final Log _log = LogFactoryUtil.getLog(
179 PortalExecutorManagerImpl.class);
180
181 private PortalExecutorFactory _portalExecutorFactory;
182 private Map<String, ThreadPoolExecutor> _threadPoolExecutors =
183 new ConcurrentHashMap<String, ThreadPoolExecutor>();
184
185 }