1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.NamedThreadFactory;
28
29 import java.util.List;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34
40 public abstract class BaseDestination implements Destination {
41
42 public BaseDestination(String name) {
43 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
44 }
45
46 public BaseDestination(
47 String name, int workersCoreSize, int workersMaxSize) {
48
49 _name = name;
50 _workersCoreSize = workersCoreSize;
51 _workersMaxSize = workersMaxSize;
52
53 open();
54 }
55
56 public synchronized void close() {
57 close(false);
58 }
59
60 public synchronized void close(boolean force) {
61 doClose(force);
62 }
63
64 public DestinationStatistics getStatistics() {
65 DestinationStatistics statistics = new DestinationStatistics();
66
67 statistics.setActiveThreadCount(_threadPoolExecutor.getActiveCount());
68 statistics.setCurrentThreadCount(_threadPoolExecutor.getPoolSize());
69 statistics.setLargestThreadCount(
70 _threadPoolExecutor.getLargestPoolSize());
71 statistics.setMaxThreadPoolSize(
72 _threadPoolExecutor.getMaximumPoolSize());
73 statistics.setMinThreadPoolSize(_threadPoolExecutor.getCorePoolSize());
74 statistics.setPendingMessageCount(_threadPoolExecutor.getTaskCount());
75 statistics.setSentMessageCount(
76 _threadPoolExecutor.getCompletedTaskCount());
77
78 return statistics;
79 }
80
81 public int getListenerCount() {
82 return _listenerCount;
83 }
84
85 public String getName() {
86 return _name;
87 }
88
89 public boolean isRegistered() {
90 if (_listenerCount > 0) {
91 return true;
92 }
93 else {
94 return false;
95 }
96 }
97
98 public synchronized void open() {
99 doOpen();
100 }
101
102 protected void doClose(boolean force) {
103 if (!_threadPoolExecutor.isShutdown() &&
104 !_threadPoolExecutor.isTerminating()) {
105
106 if (!force) {
107 _threadPoolExecutor.shutdown();
108 }
109 else {
110 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
111
112 if (_log.isInfoEnabled()) {
113 _log.info(
114 "The following " + pendingTasks.size() + " tasks " +
115 "were not executed due to shutown: " +
116 pendingTasks);
117 }
118 }
119 }
120 }
121
122 protected void doOpen() {
123 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
124 _threadPoolExecutor = new ThreadPoolExecutor(
125 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
126 new LinkedBlockingQueue<Runnable>(),
127 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
128 }
129 }
130
131 protected ThreadPoolExecutor getThreadPoolExecutor() {
132 return _threadPoolExecutor;
133 }
134
135 protected int getWorkersCoreSize() {
136 return _workersCoreSize;
137 }
138
139 protected int getWorkersMaxSize() {
140 return _workersMaxSize;
141 }
142
143 protected void setListenerCount(int listenerCount) {
144 _listenerCount = listenerCount;
145 }
146
147 private static final int _WORKERS_CORE_SIZE = 5;
148
149 private static final int _WORKERS_MAX_SIZE = 10;
150
151 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
152
153 private String _name;
154 private ThreadPoolExecutor _threadPoolExecutor;
155 private int _workersCoreSize;
156 private int _workersMaxSize;
157 private int _listenerCount;
158
159 }