001
014
015 package com.liferay.portal.kernel.nio.intraband.blocking;
016
017 import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
018 import com.liferay.portal.kernel.nio.intraband.ChannelContext;
019 import com.liferay.portal.kernel.nio.intraband.Datagram;
020 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021 import com.liferay.portal.kernel.util.NamedThreadFactory;
022
023 import java.io.IOException;
024
025 import java.nio.channels.Channel;
026 import java.nio.channels.GatheringByteChannel;
027 import java.nio.channels.ScatteringByteChannel;
028 import java.nio.channels.SelectableChannel;
029
030 import java.util.Queue;
031 import java.util.concurrent.BlockingQueue;
032 import java.util.concurrent.Callable;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.ExecutorService;
035 import java.util.concurrent.Executors;
036 import java.util.concurrent.Future;
037 import java.util.concurrent.LinkedBlockingQueue;
038 import java.util.concurrent.ThreadFactory;
039 import java.util.concurrent.TimeUnit;
040
041
044 public class ExecutorIntraband extends BaseIntraband {
045
046 public ExecutorIntraband(long defaultTimeout) {
047 super(defaultTimeout);
048 }
049
050 @Override
051 public void close() throws InterruptedException, IOException {
052 executorService.shutdownNow();
053
054 executorService.awaitTermination(defaultTimeout, TimeUnit.MILLISECONDS);
055
056 super.close();
057 }
058
059 @Override
060 public RegistrationReference registerChannel(Channel channel) {
061 if (channel == null) {
062 throw new NullPointerException("Channel is null");
063 }
064
065 if (!(channel instanceof GatheringByteChannel)) {
066 throw new IllegalArgumentException(
067 "Channel is not of type GatheringByteChannel");
068 }
069
070 if (!(channel instanceof ScatteringByteChannel)) {
071 throw new IllegalArgumentException(
072 "Channel is not of type ScatteringByteChannel");
073 }
074
075 if (channel instanceof SelectableChannel) {
076 SelectableChannel selectableChannel = (SelectableChannel)channel;
077
078 if (!selectableChannel.isBlocking()) {
079 throw new IllegalArgumentException(
080 "Channel is of type SelectableChannel and " +
081 "configured in nonblocking mode");
082 }
083 }
084
085 ensureOpen();
086
087 return doRegisterChannel(
088 (ScatteringByteChannel)channel, (GatheringByteChannel)channel);
089 }
090
091 @Override
092 public RegistrationReference registerChannel(
093 ScatteringByteChannel scatteringByteChannel,
094 GatheringByteChannel gatheringByteChannel) {
095
096 if (gatheringByteChannel == null) {
097 throw new NullPointerException("Gathering byte channel is null");
098 }
099
100 if (scatteringByteChannel == null) {
101 throw new NullPointerException("Scattering byte channel is null");
102 }
103
104 if (scatteringByteChannel instanceof SelectableChannel) {
105 SelectableChannel selectableChannel =
106 (SelectableChannel)scatteringByteChannel;
107
108 if (!selectableChannel.isBlocking()) {
109 throw new IllegalArgumentException(
110 "Scattering byte channel is of type SelectableChannel " +
111 "and configured in nonblocking mode");
112 }
113 }
114
115 if (gatheringByteChannel instanceof SelectableChannel) {
116 SelectableChannel selectableChannel =
117 (SelectableChannel)gatheringByteChannel;
118
119 if (!selectableChannel.isBlocking()) {
120 throw new IllegalArgumentException(
121 "Gathering byte channel is of type SelectableChannel and " +
122 "configured in nonblocking mode");
123 }
124 }
125
126 ensureOpen();
127
128 return doRegisterChannel(scatteringByteChannel, gatheringByteChannel);
129 }
130
131 protected RegistrationReference doRegisterChannel(
132 ScatteringByteChannel scatteringByteChannel,
133 GatheringByteChannel gatheringByteChannel) {
134
135 BlockingQueue<Datagram> sendingQueue = new LinkedBlockingQueue<>();
136
137 ChannelContext channelContext = new ChannelContext(sendingQueue);
138
139 ReadingCallable readingCallable = new ReadingCallable(
140 scatteringByteChannel, channelContext);
141 WritingCallable writingCallable = new WritingCallable(
142 gatheringByteChannel, channelContext);
143
144
145
146
147
148 Future<Void> readFuture = executorService.submit(readingCallable);
149 Future<Void> writeFuture = executorService.submit(writingCallable);
150
151 FutureRegistrationReference futureRegistrationReference =
152 new FutureRegistrationReference(
153 this, channelContext, readFuture, writeFuture);
154
155 channelContext.setRegistrationReference(futureRegistrationReference);
156
157 readingCallable.openLatch();
158 writingCallable.openLatch();
159
160 return futureRegistrationReference;
161 }
162
163 @Override
164 protected void doSendDatagram(
165 RegistrationReference registrationReference, Datagram datagram) {
166
167 FutureRegistrationReference futureRegistrationReference =
168 (FutureRegistrationReference)registrationReference;
169
170 ChannelContext channelContext =
171 futureRegistrationReference.channelContext;
172
173 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
174
175 sendingQueue.offer(datagram);
176 }
177
178 protected static final ThreadFactory THREAD_FACTORY =
179 new NamedThreadFactory(
180 ExecutorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
181 ExecutorIntraband.class.getClassLoader());
182
183 protected final ExecutorService executorService =
184 Executors.newCachedThreadPool(THREAD_FACTORY);
185
186 protected class ReadingCallable implements Callable<Void> {
187
188 public ReadingCallable(
189 ScatteringByteChannel scatteringByteChannel,
190 ChannelContext channelContext) {
191
192 _scatteringByteChannel = scatteringByteChannel;
193 _channelContext = channelContext;
194
195 _countDownLatch = new CountDownLatch(1);
196 }
197
198 @Override
199 public Void call() throws Exception {
200 _countDownLatch.await();
201
202 while (_scatteringByteChannel.isOpen()) {
203 handleReading(_scatteringByteChannel, _channelContext);
204 }
205
206 return null;
207 }
208
209 public void openLatch() {
210 _countDownLatch.countDown();
211 }
212
213 private final ChannelContext _channelContext;
214 private final CountDownLatch _countDownLatch;
215 private final ScatteringByteChannel _scatteringByteChannel;
216
217 }
218
219 protected class WritingCallable implements Callable<Void> {
220
221 public WritingCallable(
222 GatheringByteChannel gatheringByteChannel,
223 ChannelContext channelContext) {
224
225 _gatheringByteChannel = gatheringByteChannel;
226 _channelContext = channelContext;
227
228 _countDownLatch = new CountDownLatch(1);
229 }
230
231 @Override
232 public Void call() throws Exception {
233 _countDownLatch.await();
234
235 try {
236 BlockingQueue<Datagram> sendingQueue =
237 (BlockingQueue<Datagram>)_channelContext.getSendingQueue();
238
239 while (true) {
240 Datagram datagram = sendingQueue.take();
241
242 _channelContext.setWritingDatagram(datagram);
243
244 if (!handleWriting(
245 _gatheringByteChannel, _channelContext)) {
246
247 if (_gatheringByteChannel.isOpen()) {
248
249
250
251
252 throw new IllegalStateException(
253 _gatheringByteChannel +
254 " behaved in nonblocking way.");
255 }
256 else {
257 break;
258 }
259 }
260
261 cleanUpTimeoutResponseWaitingDatagrams();
262 }
263 }
264 catch (InterruptedException ie) {
265 }
266
267 return null;
268 }
269
270 public void openLatch() {
271 _countDownLatch.countDown();
272 }
273
274 private final ChannelContext _channelContext;
275 private final CountDownLatch _countDownLatch;
276 private final GatheringByteChannel _gatheringByteChannel;
277
278 }
279
280 }