1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ConcurrentHashSet;
28 import com.liferay.portal.kernel.util.NamedThreadFactory;
29 import com.liferay.portal.kernel.util.StringPool;
30
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36
37
42 public abstract class BaseDestination implements Destination {
43
44 public BaseDestination() {
45 }
46
47
50 public BaseDestination(String name) {
51 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
52 }
53
54
57 public BaseDestination(
58 String name, int workersCoreSize, int workersMaxSize) {
59
60 _name = name;
61 _workersCoreSize = workersCoreSize;
62 _workersMaxSize = workersMaxSize;
63
64 open();
65 }
66
67 public void addDestinationEventListener(
68 DestinationEventListener destinationEventListener) {
69
70 _destinationEventListeners.add(destinationEventListener);
71 }
72
73 public void afterPropertiesSet() {
74 open();
75 }
76
77 public synchronized void close() {
78 close(false);
79 }
80
81 public synchronized void close(boolean force) {
82 doClose(force);
83 }
84
85 public void copyDestinationEventListeners(Destination destination) {
86 for (DestinationEventListener destinationEventListener :
87 _destinationEventListeners) {
88
89 destination.addDestinationEventListener(
90 destinationEventListener);
91 }
92 }
93
94 public void copyMessageListeners(Destination destination) {
95 for (MessageListener messageListener : _messageListeners) {
96 InvokerMessageListener invokerMessageListener =
97 (InvokerMessageListener)messageListener;
98
99 destination.register(
100 invokerMessageListener.getMessageListener(),
101 invokerMessageListener.getClassLoader());
102 }
103 }
104
105 public DestinationStatistics getDestinationStatistics() {
106 DestinationStatistics destinationStatistics =
107 new DestinationStatistics();
108
109 destinationStatistics.setActiveThreadCount(
110 _threadPoolExecutor.getActiveCount());
111 destinationStatistics.setCurrentThreadCount(
112 _threadPoolExecutor.getPoolSize());
113 destinationStatistics.setLargestThreadCount(
114 _threadPoolExecutor.getLargestPoolSize());
115 destinationStatistics.setMaxThreadPoolSize(
116 _threadPoolExecutor.getMaximumPoolSize());
117 destinationStatistics.setMinThreadPoolSize(
118 _threadPoolExecutor.getCorePoolSize());
119 destinationStatistics.setPendingMessageCount(
120 _threadPoolExecutor.getQueue().size());
121 destinationStatistics.setSentMessageCount(
122 _threadPoolExecutor.getCompletedTaskCount());
123
124 return destinationStatistics;
125 }
126
127 public int getMessageListenerCount() {
128 return _messageListeners.size();
129 }
130
131 public String getName() {
132 return _name;
133 }
134
135 public int getWorkersCoreSize() {
136 return _workersCoreSize;
137 }
138
139 public int getWorkersMaxSize() {
140 return _workersMaxSize;
141 }
142
143 public boolean isRegistered() {
144 if (getMessageListenerCount() > 0) {
145 return true;
146 }
147 else {
148 return false;
149 }
150 }
151
152 public synchronized void open() {
153 doOpen();
154 }
155
156 public boolean register(MessageListener messageListener) {
157 InvokerMessageListener invokerMessageListener =
158 new InvokerMessageListener(messageListener);
159
160 return registerMessageListener(invokerMessageListener);
161 }
162
163 public boolean register(
164 MessageListener messageListener, ClassLoader classloader) {
165
166 InvokerMessageListener invokerMessageListener =
167 new InvokerMessageListener(messageListener, classloader);
168
169 return registerMessageListener(invokerMessageListener);
170 }
171
172 public void removeDestinationEventListener(
173 DestinationEventListener destinationEventListener) {
174
175 _destinationEventListeners.remove(destinationEventListener);
176 }
177
178 public void removeDestinationEventListeners() {
179 _destinationEventListeners.clear();
180 }
181
182 public void send(Message message) {
183 if (_messageListeners.isEmpty()) {
184 if (_log.isDebugEnabled()) {
185 _log.debug("No message listeners for destination " + getName());
186 }
187
188 return;
189 }
190
191 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
192
193 if (threadPoolExecutor.isShutdown()) {
194 throw new IllegalStateException(
195 "Destination " + getName() + " is shutdown and cannot " +
196 "receive more messages");
197 }
198
199 dispatch(_messageListeners, message);
200 }
201
202 public void setName(String name) {
203 _name = name;
204 }
205
206 public void setWorkersCoreSize(int workersCoreSize) {
207 _workersCoreSize = workersCoreSize;
208 }
209
210 public void setWorkersMaxSize(int workersMaxSize) {
211 _workersMaxSize = workersMaxSize;
212 }
213
214 public boolean unregister(MessageListener messageListener) {
215 InvokerMessageListener invokerMessageListener =
216 new InvokerMessageListener(messageListener);
217
218 return unregisterMessageListener(invokerMessageListener);
219 }
220
221 public boolean unregister(
222 MessageListener messageListener, ClassLoader classloader) {
223
224 InvokerMessageListener invokerMessageListener =
225 new InvokerMessageListener(messageListener, classloader);
226
227 return unregisterMessageListener(invokerMessageListener);
228 }
229
230 public void unregisterMessageListeners() {
231 for (MessageListener messageListener : _messageListeners) {
232 unregisterMessageListener((InvokerMessageListener)messageListener);
233 }
234 }
235
236 protected abstract void dispatch(
237 Set<MessageListener> messageListeners, Message message);
238
239 protected void doClose(boolean force) {
240 if (!_threadPoolExecutor.isShutdown() &&
241 !_threadPoolExecutor.isTerminating()) {
242
243 if (!force) {
244 _threadPoolExecutor.shutdown();
245 }
246 else {
247 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
248
249 if (_log.isInfoEnabled()) {
250 _log.info(
251 "The following " + pendingTasks.size() + " tasks " +
252 "were not executed due to shutown: " +
253 pendingTasks);
254 }
255 }
256 }
257 }
258
259 protected void doOpen() {
260 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
261 _threadPoolExecutor = new ThreadPoolExecutor(
262 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
263 new LinkedBlockingQueue<Runnable>(),
264 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
265 }
266 }
267
268 protected void fireMessageListenerRegisteredEvent(
269 MessageListener messageListener) {
270
271 for (DestinationEventListener destinationEventListener :
272 _destinationEventListeners) {
273
274 destinationEventListener.messageListenerRegistered(
275 getName(), messageListener);
276 }
277 }
278
279 protected void fireMessageListenerUnregisteredEvent(
280 MessageListener messageListener) {
281
282 for (DestinationEventListener listener : _destinationEventListeners) {
283 listener.messageListenerUnregistered(getName(), messageListener);
284 }
285 }
286
287 protected ThreadPoolExecutor getThreadPoolExecutor() {
288 return _threadPoolExecutor;
289 }
290
291 protected boolean registerMessageListener(
292 InvokerMessageListener invokerMessageListener) {
293
294 boolean registered = _messageListeners.add(invokerMessageListener);
295
296 if (registered) {
297 fireMessageListenerRegisteredEvent(
298 invokerMessageListener.getMessageListener());
299 }
300
301 return registered;
302 }
303
304 protected boolean unregisterMessageListener(
305 InvokerMessageListener invokerMessageListener) {
306
307 boolean unregistered = _messageListeners.remove(invokerMessageListener);
308
309 if (unregistered) {
310 fireMessageListenerUnregisteredEvent(
311 invokerMessageListener.getMessageListener());
312 }
313
314 return unregistered;
315 }
316
317 private static final int _WORKERS_CORE_SIZE = 2;
318
319 private static final int _WORKERS_MAX_SIZE = 5;
320
321 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
322
323 private Set<DestinationEventListener> _destinationEventListeners =
324 new ConcurrentHashSet<DestinationEventListener>();
325 private Set<MessageListener> _messageListeners =
326 new ConcurrentHashSet<MessageListener>();
327 private String _name = StringPool.BLANK;
328 private ThreadPoolExecutor _threadPoolExecutor;
329 private int _workersCoreSize = _WORKERS_CORE_SIZE;
330 private int _workersMaxSize = _WORKERS_MAX_SIZE;
331
332 }