001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
018 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
020 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.util.NamedThreadFactory;
024 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
025
026 import java.util.Set;
027 import java.util.concurrent.TimeUnit;
028
029
033 public abstract class BaseAsyncDestination extends BaseDestination {
034
035 public BaseAsyncDestination() {
036 }
037
038
041 public BaseAsyncDestination(String name) {
042 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
043 }
044
045
048 public BaseAsyncDestination(
049 String name, int workersCoreSize, int workersMaxSize) {
050
051 this.name = name;
052 _workersCoreSize = workersCoreSize;
053 _workersMaxSize = workersMaxSize;
054
055 open();
056 }
057
058 @Override
059 public void close(boolean force) {
060 PortalExecutorManagerUtil.shutdown(getName(), force);
061 }
062
063 public DestinationStatistics getDestinationStatistics() {
064 DestinationStatistics destinationStatistics =
065 new DestinationStatistics();
066
067 destinationStatistics.setActiveThreadCount(
068 _threadPoolExecutor.getActiveCount());
069 destinationStatistics.setCurrentThreadCount(
070 _threadPoolExecutor.getPoolSize());
071 destinationStatistics.setLargestThreadCount(
072 _threadPoolExecutor.getLargestPoolSize());
073 destinationStatistics.setMaxThreadPoolSize(
074 _threadPoolExecutor.getMaxPoolSize());
075 destinationStatistics.setMinThreadPoolSize(
076 _threadPoolExecutor.getCorePoolSize());
077 destinationStatistics.setPendingMessageCount(
078 _threadPoolExecutor.getPendingTaskCount());
079 destinationStatistics.setSentMessageCount(
080 _threadPoolExecutor.getCompletedTaskCount());
081
082 return destinationStatistics;
083 }
084
085 public int getMaximumQueueSize() {
086 return _maximumQueueSize;
087 }
088
089 public int getWorkersCoreSize() {
090 return _workersCoreSize;
091 }
092
093 public int getWorkersMaxSize() {
094 return _workersMaxSize;
095 }
096
097 @Override
098 public void open() {
099 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
100 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
101
102 if (_rejectedExecutionHandler == null) {
103 _rejectedExecutionHandler = createRejectionExecutionHandler();
104 }
105
106 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
107 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
108 _maximumQueueSize, _rejectedExecutionHandler,
109 new NamedThreadFactory(
110 getName(), Thread.NORM_PRIORITY, classLoader),
111 new ThreadPoolHandlerAdapter());
112
113 ThreadPoolExecutor oldThreadPoolExecutor =
114 PortalExecutorManagerUtil.registerPortalExecutor(
115 getName(), threadPoolExecutor);
116
117 if (oldThreadPoolExecutor != null) {
118 if (_log.isWarnEnabled()) {
119 _log.warn(
120 "Abort creating a new thread pool for destination " +
121 getName() + " and reuse previous one");
122 }
123
124 threadPoolExecutor.shutdownNow();
125
126 threadPoolExecutor = oldThreadPoolExecutor;
127 }
128
129 _threadPoolExecutor = threadPoolExecutor;
130 }
131 }
132
133 public void send(Message message) {
134 if (messageListeners.isEmpty()) {
135 if (_log.isDebugEnabled()) {
136 _log.debug("No message listeners for destination " + getName());
137 }
138
139 return;
140 }
141
142 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
143
144 if (threadPoolExecutor.isShutdown()) {
145 throw new IllegalStateException(
146 "Destination " + getName() + " is shutdown and cannot " +
147 "receive more messages");
148 }
149
150 dispatch(messageListeners, message);
151 }
152
153 public void setMaximumQueueSize(int maximumQueueSize) {
154 _maximumQueueSize = maximumQueueSize;
155 }
156
157 public void setRejectedExecutionHandler(
158 RejectedExecutionHandler rejectedExecutionHandler) {
159
160 _rejectedExecutionHandler = rejectedExecutionHandler;
161 }
162
163 public void setWorkersCoreSize(int workersCoreSize) {
164 _workersCoreSize = workersCoreSize;
165 }
166
167 public void setWorkersMaxSize(int workersMaxSize) {
168 _workersMaxSize = workersMaxSize;
169 }
170
171 protected RejectedExecutionHandler createRejectionExecutionHandler() {
172 return new RejectedExecutionHandler() {
173
174 public void rejectedExecution(
175 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
176
177 if (!_log.isWarnEnabled()) {
178 return;
179 }
180
181 MessageRunnable messageRunnable = (MessageRunnable) runnable;
182
183 _log.warn(
184 "Discarding message " + messageRunnable.getMessage() +
185 " because it exceeds the maximum queue size of " +
186 _maximumQueueSize);
187 }
188
189 };
190 }
191
192 protected abstract void dispatch(
193 Set<MessageListener> messageListeners, Message message);
194
195 protected ThreadPoolExecutor getThreadPoolExecutor() {
196 return _threadPoolExecutor;
197 }
198
199 private static final int _WORKERS_CORE_SIZE = 2;
200
201 private static final int _WORKERS_MAX_SIZE = 5;
202
203 private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
204
205 private int _maximumQueueSize = Integer.MAX_VALUE;
206 private RejectedExecutionHandler _rejectedExecutionHandler;
207 private ThreadPoolExecutor _threadPoolExecutor;
208 private int _workersCoreSize = _WORKERS_CORE_SIZE;
209 private int _workersMaxSize = _WORKERS_MAX_SIZE;
210
211 }