001
014
015 package com.liferay.portal.kernel.nio.intraband.nonblocking;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.BaseIntraBand;
020 import com.liferay.portal.kernel.nio.intraband.ChannelContext;
021 import com.liferay.portal.kernel.nio.intraband.Datagram;
022 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
023 import com.liferay.portal.kernel.util.NamedThreadFactory;
024
025 import java.io.IOException;
026
027 import java.nio.channels.CancelledKeyException;
028 import java.nio.channels.Channel;
029 import java.nio.channels.ClosedSelectorException;
030 import java.nio.channels.GatheringByteChannel;
031 import java.nio.channels.ScatteringByteChannel;
032 import java.nio.channels.SelectableChannel;
033 import java.nio.channels.SelectionKey;
034 import java.nio.channels.Selector;
035
036 import java.util.Iterator;
037 import java.util.Queue;
038 import java.util.Set;
039 import java.util.concurrent.Callable;
040 import java.util.concurrent.ConcurrentLinkedQueue;
041 import java.util.concurrent.FutureTask;
042 import java.util.concurrent.ThreadFactory;
043
044
047 public class SelectorIntraBand extends BaseIntraBand {
048
049 public SelectorIntraBand(long defaultTimeout) throws IOException {
050 super(defaultTimeout);
051
052 pollingThread.start();
053 }
054
055 @Override
056 public void close() throws InterruptedException, IOException {
057 selector.close();
058
059 pollingThread.interrupt();
060
061 pollingThread.join(defaultTimeout);
062
063 super.close();
064 }
065
066 public RegistrationReference registerChannel(Channel channel)
067 throws IOException {
068
069 if (channel == null) {
070 throw new NullPointerException("Channel is null");
071 }
072
073 if (!(channel instanceof GatheringByteChannel)) {
074 throw new IllegalArgumentException(
075 "Channel is not of type GatheringByteChannel");
076 }
077
078 if (!(channel instanceof ScatteringByteChannel)) {
079 throw new IllegalArgumentException(
080 "Channel is not of type ScatteringByteChannel");
081 }
082
083 if (!(channel instanceof SelectableChannel)) {
084 throw new IllegalArgumentException(
085 "Channel is not of type SelectableChannel");
086 }
087
088 SelectableChannel selectableChannel = (SelectableChannel)channel;
089
090 if ((selectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
091 throw new IllegalArgumentException(
092 "Channel is not valid for reading");
093 }
094
095 if ((selectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
096 throw new IllegalArgumentException(
097 "Channel is not valid for writing");
098 }
099
100 ensureOpen();
101
102 selectableChannel.configureBlocking(false);
103
104 FutureTask<RegistrationReference> registerFutureTask =
105 new FutureTask<RegistrationReference>(
106 new RegisterCallable(selectableChannel, selectableChannel));
107
108 registerQueue.offer(registerFutureTask);
109
110 selector.wakeup();
111
112 try {
113 return registerFutureTask.get();
114 }
115 catch (Exception e) {
116 throw new IOException(e);
117 }
118 }
119
120 public RegistrationReference registerChannel(
121 ScatteringByteChannel scatteringByteChannel,
122 GatheringByteChannel gatheringByteChannel)
123 throws IOException {
124
125 if (scatteringByteChannel == null) {
126 throw new NullPointerException("Scattering byte channel is null");
127 }
128
129 if (gatheringByteChannel == null) {
130 throw new NullPointerException("Gathering byte channel is null");
131 }
132
133 if (!(scatteringByteChannel instanceof SelectableChannel)) {
134 throw new IllegalArgumentException(
135 "Scattering byte channel is not of type SelectableChannel");
136 }
137
138 if (!(gatheringByteChannel instanceof SelectableChannel)) {
139 throw new IllegalArgumentException(
140 "Gathering byte channel is not of type SelectableChannel");
141 }
142
143 SelectableChannel readSelectableChannel =
144 (SelectableChannel)scatteringByteChannel;
145 SelectableChannel writeSelectableChannel =
146 (SelectableChannel)gatheringByteChannel;
147
148 if ((readSelectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
149 throw new IllegalArgumentException(
150 "Scattering byte channel is not valid for reading");
151 }
152
153 if ((writeSelectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
154 throw new IllegalArgumentException(
155 "Gathering byte channel is not valid for writing");
156 }
157
158 ensureOpen();
159
160 readSelectableChannel.configureBlocking(false);
161 writeSelectableChannel.configureBlocking(false);
162
163 FutureTask<RegistrationReference> registerFutureTask =
164 new FutureTask<RegistrationReference>(
165 new RegisterCallable(
166 readSelectableChannel, writeSelectableChannel));
167
168 registerQueue.offer(registerFutureTask);
169
170 selector.wakeup();
171
172 try {
173 return registerFutureTask.get();
174 }
175 catch (Exception e) {
176 throw new IOException(e);
177 }
178 }
179
180 @Override
181 protected void doSendDatagram(
182 RegistrationReference registrationReference, Datagram datagram) {
183
184 SelectionKeyRegistrationReference selectionKeyRegistrationReference =
185 (SelectionKeyRegistrationReference)registrationReference;
186
187 SelectionKey writeSelectionKey =
188 selectionKeyRegistrationReference.writeSelectionKey;
189
190 ChannelContext channelContext =
191 (ChannelContext)writeSelectionKey.attachment();
192
193 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
194
195 sendingQueue.offer(datagram);
196
197 synchronized (writeSelectionKey) {
198 int ops = writeSelectionKey.interestOps();
199
200 if ((ops & SelectionKey.OP_WRITE) == 0) {
201 ops |= SelectionKey.OP_WRITE;
202
203 writeSelectionKey.interestOps(ops);
204
205 selector.wakeup();
206 }
207 }
208 }
209
210 protected void registerChannels() {
211 FutureTask<RegistrationReference> registerFuturetask = null;
212
213 while ((registerFuturetask = registerQueue.poll()) != null) {
214 registerFuturetask.run();
215 }
216 }
217
218 protected static final ThreadFactory threadFactory =
219 new NamedThreadFactory(
220 SelectorIntraBand.class + ".threadFactory", Thread.NORM_PRIORITY,
221 SelectorIntraBand.class.getClassLoader());
222
223 protected final Thread pollingThread = threadFactory.newThread(
224 new PollingJob());
225 protected final Queue<FutureTask<RegistrationReference>> registerQueue =
226 new ConcurrentLinkedQueue<FutureTask<RegistrationReference>>();
227 protected final Selector selector = Selector.open();
228
229 protected class RegisterCallable
230 implements Callable<RegistrationReference> {
231
232 public RegisterCallable(
233 SelectableChannel readSelectableChannel,
234 SelectableChannel writeSelectableChannel) {
235
236 _readSelectableChannel = readSelectableChannel;
237 _writeSelectableChannel = writeSelectableChannel;
238 }
239
240 public RegistrationReference call() throws Exception {
241 if (_readSelectableChannel == _writeSelectableChannel) {
242
243
244
245
246
247 SelectionKey selectionKey = _readSelectableChannel.register(
248 selector, 0);
249
250 SelectionKeyRegistrationReference
251 selectionKeyRegistrationReference =
252 new SelectionKeyRegistrationReference(
253 SelectorIntraBand.this, selectionKey, selectionKey);
254
255 ChannelContext channelContext = new ChannelContext(
256 new ConcurrentLinkedQueue<Datagram>());
257
258 channelContext.setRegistrationReference(
259 selectionKeyRegistrationReference);
260
261 selectionKey.attach(channelContext);
262
263
264
265 selectionKey.interestOps(
266 SelectionKey.OP_READ | SelectionKey.OP_WRITE);
267
268 return selectionKeyRegistrationReference;
269 }
270 else {
271
272
273
274
275
276 SelectionKey readSelectionKey = _readSelectableChannel.register(
277 selector, 0);
278
279 SelectionKey writeSelectionKey =
280 _writeSelectableChannel.register(selector, 0);
281
282 SelectionKeyRegistrationReference
283 selectionKeyRegistrationReference =
284 new SelectionKeyRegistrationReference(
285 SelectorIntraBand.this, readSelectionKey,
286 writeSelectionKey);
287
288 ChannelContext channelContext = new ChannelContext(
289 new ConcurrentLinkedQueue<Datagram>());
290
291 channelContext.setRegistrationReference(
292 selectionKeyRegistrationReference);
293
294 readSelectionKey.attach(channelContext);
295 writeSelectionKey.attach(channelContext);
296
297
298
299 readSelectionKey.interestOps(SelectionKey.OP_READ);
300 writeSelectionKey.interestOps(SelectionKey.OP_WRITE);
301
302 return selectionKeyRegistrationReference;
303 }
304 }
305
306 private final SelectableChannel _readSelectableChannel;
307 private final SelectableChannel _writeSelectableChannel;
308
309 }
310
311 private void _processReading(SelectionKey selectionKey) {
312 ScatteringByteChannel scatteringByteChannel =
313 (ScatteringByteChannel)selectionKey.channel();
314
315 ChannelContext channelContext =
316 (ChannelContext)selectionKey.attachment();
317
318 handleReading(scatteringByteChannel, channelContext);
319 }
320
321 private void _processWriting(SelectionKey selectionKey) {
322 GatheringByteChannel gatheringByteChannel =
323 (GatheringByteChannel)selectionKey.channel();
324
325 ChannelContext channelContext =
326 (ChannelContext)selectionKey.attachment();
327
328 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
329
330 boolean ready = false;
331
332 if (channelContext.getWritingDatagram() == null) {
333 Datagram datagram = sendingQueue.poll();
334
335 if (datagram != null) {
336 channelContext.setWritingDatagram(datagram);
337
338 ready = true;
339 }
340 }
341 else {
342 ready = true;
343 }
344
345 if (ready) {
346 if (handleWriting(gatheringByteChannel, channelContext)) {
347 if (sendingQueue.isEmpty()) {
348
349
350
351
352 int ops = selectionKey.interestOps();
353
354 ops &= ~SelectionKey.OP_WRITE;
355
356 synchronized (selectionKey) {
357 if (sendingQueue.isEmpty()) {
358 selectionKey.interestOps(ops);
359 }
360 }
361 }
362 }
363 }
364 }
365
366 private static Log _log = LogFactoryUtil.getLog(SelectorIntraBand.class);
367
368 private class PollingJob implements Runnable {
369
370 public void run() {
371 try {
372 try {
373 while (true) {
374 int readyCount = selector.select();
375
376 if (readyCount > 0) {
377 Set<SelectionKey> selectionKeys =
378 selector.selectedKeys();
379
380 Iterator<SelectionKey> iterator =
381 selectionKeys.iterator();
382
383 while (iterator.hasNext()) {
384 SelectionKey selectionKey = iterator.next();
385
386 iterator.remove();
387
388 try {
389 if (selectionKey.isReadable()) {
390 _processReading(selectionKey);
391 }
392
393 if (selectionKey.isWritable()) {
394 _processWriting(selectionKey);
395 }
396 }
397 catch (CancelledKeyException cke) {
398
399
400
401 }
402 }
403 }
404 else if (!selector.isOpen()) {
405 break;
406 }
407
408 registerChannels();
409 cleanUpTimeoutResponseWaitingDatagrams();
410 }
411 }
412 finally {
413 selector.close();
414 }
415 }
416 catch (ClosedSelectorException cse) {
417 if (_log.isInfoEnabled()) {
418 Thread currentThread = Thread.currentThread();
419
420 _log.info(
421 currentThread.getName() +
422 " exiting gracefully on selector closure");
423 }
424 }
425 catch (Throwable t) {
426 Thread currentThread = Thread.currentThread();
427
428 _log.error(
429 currentThread.getName() + " exiting exceptionally", t);
430 }
431
432
433
434
435 registerChannels();
436
437 responseWaitingMap.clear();
438 timeoutMap.clear();
439 }
440
441 }
442
443 }