001
014
015 package com.liferay.portal.kernel.nio.intraband.blocking;
016
017 import com.liferay.portal.kernel.nio.intraband.BaseIntraBand;
018 import com.liferay.portal.kernel.nio.intraband.ChannelContext;
019 import com.liferay.portal.kernel.nio.intraband.Datagram;
020 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021 import com.liferay.portal.kernel.util.NamedThreadFactory;
022
023 import java.io.IOException;
024
025 import java.nio.channels.Channel;
026 import java.nio.channels.GatheringByteChannel;
027 import java.nio.channels.ScatteringByteChannel;
028 import java.nio.channels.SelectableChannel;
029
030 import java.util.Queue;
031 import java.util.concurrent.BlockingQueue;
032 import java.util.concurrent.Callable;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.ExecutorService;
035 import java.util.concurrent.Executors;
036 import java.util.concurrent.Future;
037 import java.util.concurrent.LinkedBlockingQueue;
038 import java.util.concurrent.ThreadFactory;
039 import java.util.concurrent.TimeUnit;
040
041
044 public class ExecutorIntraBand extends BaseIntraBand {
045
046 public ExecutorIntraBand(long defaultTimeout) {
047 super(defaultTimeout);
048 }
049
050 @Override
051 public void close() throws InterruptedException, IOException {
052 executorService.shutdownNow();
053
054 executorService.awaitTermination(defaultTimeout, TimeUnit.MILLISECONDS);
055
056 super.close();
057 }
058
059 public RegistrationReference registerChannel(Channel channel) {
060 if (channel == null) {
061 throw new NullPointerException("Channel is null");
062 }
063
064 if (!(channel instanceof GatheringByteChannel)) {
065 throw new IllegalArgumentException(
066 "Channel is not of type GatheringByteChannel");
067 }
068
069 if (!(channel instanceof ScatteringByteChannel)) {
070 throw new IllegalArgumentException(
071 "Channel is not of type ScatteringByteChannel");
072 }
073
074 if (channel instanceof SelectableChannel) {
075 SelectableChannel selectableChannel = (SelectableChannel)channel;
076
077 if (!selectableChannel.isBlocking()) {
078 throw new IllegalArgumentException(
079 "Channel is of type SelectableChannel and " +
080 "configured in nonblocking mode");
081 }
082 }
083
084 ensureOpen();
085
086 return doRegisterChannel(
087 (ScatteringByteChannel)channel, (GatheringByteChannel)channel);
088 }
089
090 public RegistrationReference registerChannel(
091 ScatteringByteChannel scatteringByteChannel,
092 GatheringByteChannel gatheringByteChannel) {
093
094 if (gatheringByteChannel == null) {
095 throw new NullPointerException("Gathering byte channel is null");
096 }
097
098 if (scatteringByteChannel == null) {
099 throw new NullPointerException("Scattering byte channel is null");
100 }
101
102 if (scatteringByteChannel instanceof SelectableChannel) {
103 SelectableChannel selectableChannel =
104 (SelectableChannel)scatteringByteChannel;
105
106 if (!selectableChannel.isBlocking()) {
107 throw new IllegalArgumentException(
108 "Scattering byte channel is of type SelectableChannel " +
109 "and configured in nonblocking mode");
110 }
111 }
112
113 if (gatheringByteChannel instanceof SelectableChannel) {
114 SelectableChannel selectableChannel =
115 (SelectableChannel)gatheringByteChannel;
116
117 if (!selectableChannel.isBlocking()) {
118 throw new IllegalArgumentException(
119 "Gathering byte channel is of type SelectableChannel and " +
120 "configured in nonblocking mode");
121 }
122 }
123
124 ensureOpen();
125
126 return doRegisterChannel(scatteringByteChannel, gatheringByteChannel);
127 }
128
129 protected RegistrationReference doRegisterChannel(
130 ScatteringByteChannel scatteringByteChannel,
131 GatheringByteChannel gatheringByteChannel) {
132
133 BlockingQueue<Datagram> sendingQueue =
134 new LinkedBlockingQueue<Datagram>();
135
136 ChannelContext channelContext = new ChannelContext(sendingQueue);
137
138 ReadingCallable readingCallable = new ReadingCallable(
139 scatteringByteChannel, channelContext);
140 WritingCallable writingCallable = new WritingCallable(
141 gatheringByteChannel, channelContext);
142
143
144
145
146
147 Future<Void> readFuture = executorService.submit(readingCallable);
148 Future<Void> writeFuture = executorService.submit(writingCallable);
149
150 FutureRegistrationReference futureRegistrationReference =
151 new FutureRegistrationReference(
152 this, channelContext, readFuture, writeFuture);
153
154 channelContext.setRegistrationReference(futureRegistrationReference);
155
156 readingCallable.openLatch();
157 writingCallable.openLatch();
158
159 return futureRegistrationReference;
160 }
161
162 @Override
163 protected void doSendDatagram(
164 RegistrationReference registrationReference, Datagram datagram) {
165
166 FutureRegistrationReference futureRegistrationReference =
167 (FutureRegistrationReference)registrationReference;
168
169 ChannelContext channelContext =
170 futureRegistrationReference.channelContext;
171
172 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
173
174 sendingQueue.offer(datagram);
175 }
176
177 protected static final ThreadFactory THREAD_FACTORY =
178 new NamedThreadFactory(
179 ExecutorIntraBand.class + ".threadFactory", Thread.NORM_PRIORITY,
180 ExecutorIntraBand.class.getClassLoader());
181
182 protected final ExecutorService executorService =
183 Executors.newCachedThreadPool(THREAD_FACTORY);
184
185 protected class ReadingCallable implements Callable<Void> {
186
187 public ReadingCallable(
188 ScatteringByteChannel scatteringByteChannel,
189 ChannelContext channelContext) {
190
191 _scatteringByteChannel = scatteringByteChannel;
192 _channelContext = channelContext;
193
194 _countDownLatch = new CountDownLatch(1);
195 }
196
197 public Void call() throws Exception {
198 _countDownLatch.await();
199
200 while (_scatteringByteChannel.isOpen()) {
201 handleReading(_scatteringByteChannel, _channelContext);
202 }
203
204 return null;
205 }
206
207 public void openLatch() {
208 _countDownLatch.countDown();
209 }
210
211 private final ChannelContext _channelContext;
212 private final CountDownLatch _countDownLatch;
213 private final ScatteringByteChannel _scatteringByteChannel;
214
215 }
216
217 protected class WritingCallable implements Callable<Void> {
218
219 public WritingCallable(
220 GatheringByteChannel gatheringByteChannel,
221 ChannelContext channelContext) {
222
223 _gatheringByteChannel = gatheringByteChannel;
224 _channelContext = channelContext;
225
226 _countDownLatch = new CountDownLatch(1);
227 }
228
229 public Void call() throws Exception {
230 _countDownLatch.await();
231
232 try {
233 BlockingQueue<Datagram> sendingQueue =
234 (BlockingQueue<Datagram>)_channelContext.getSendingQueue();
235
236 while (true) {
237 Datagram datagram = sendingQueue.take();
238
239 _channelContext.setWritingDatagram(datagram);
240
241 if (!handleWriting(
242 _gatheringByteChannel, _channelContext)) {
243
244 if (_gatheringByteChannel.isOpen()) {
245
246
247
248
249 throw new IllegalStateException(
250 _gatheringByteChannel +
251 " behaved in nonblocking way.");
252 }
253 else {
254 break;
255 }
256 }
257 }
258 }
259 catch (InterruptedException ie) {
260 }
261
262 return null;
263 }
264
265 public void openLatch() {
266 _countDownLatch.countDown();
267 }
268
269 private final ChannelContext _channelContext;
270 private final CountDownLatch _countDownLatch;
271 private final GatheringByteChannel _gatheringByteChannel;
272
273 }
274
275 }