001
014
015 package com.liferay.portal.kernel.nio.intraband.nonblocking;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
020 import com.liferay.portal.kernel.nio.intraband.ChannelContext;
021 import com.liferay.portal.kernel.nio.intraband.Datagram;
022 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
023 import com.liferay.portal.kernel.util.NamedThreadFactory;
024
025 import java.io.IOException;
026
027 import java.nio.channels.CancelledKeyException;
028 import java.nio.channels.Channel;
029 import java.nio.channels.ClosedSelectorException;
030 import java.nio.channels.GatheringByteChannel;
031 import java.nio.channels.ScatteringByteChannel;
032 import java.nio.channels.SelectableChannel;
033 import java.nio.channels.SelectionKey;
034 import java.nio.channels.Selector;
035
036 import java.util.Iterator;
037 import java.util.Queue;
038 import java.util.Set;
039 import java.util.concurrent.Callable;
040 import java.util.concurrent.ConcurrentLinkedQueue;
041 import java.util.concurrent.FutureTask;
042 import java.util.concurrent.ThreadFactory;
043
044
047 public class SelectorIntraband extends BaseIntraband {
048
049 public SelectorIntraband(long defaultTimeout) throws IOException {
050 super(defaultTimeout);
051
052 pollingThread.start();
053 }
054
055 @Override
056 public void close() throws InterruptedException, IOException {
057 selector.close();
058
059 pollingThread.interrupt();
060
061 pollingThread.join(defaultTimeout);
062
063 super.close();
064 }
065
066 @Override
067 public RegistrationReference registerChannel(Channel channel)
068 throws IOException {
069
070 if (channel == null) {
071 throw new NullPointerException("Channel is null");
072 }
073
074 if (!(channel instanceof GatheringByteChannel)) {
075 throw new IllegalArgumentException(
076 "Channel is not of type GatheringByteChannel");
077 }
078
079 if (!(channel instanceof ScatteringByteChannel)) {
080 throw new IllegalArgumentException(
081 "Channel is not of type ScatteringByteChannel");
082 }
083
084 if (!(channel instanceof SelectableChannel)) {
085 throw new IllegalArgumentException(
086 "Channel is not of type SelectableChannel");
087 }
088
089 SelectableChannel selectableChannel = (SelectableChannel)channel;
090
091 if ((selectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
092 throw new IllegalArgumentException(
093 "Channel is not valid for reading");
094 }
095
096 if ((selectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
097 throw new IllegalArgumentException(
098 "Channel is not valid for writing");
099 }
100
101 ensureOpen();
102
103 selectableChannel.configureBlocking(false);
104
105 FutureTask<RegistrationReference> registerFutureTask = new FutureTask<>(
106 new RegisterCallable(selectableChannel, selectableChannel));
107
108 registerQueue.offer(registerFutureTask);
109
110 selector.wakeup();
111
112 try {
113 return registerFutureTask.get();
114 }
115 catch (Exception e) {
116 throw new IOException(e);
117 }
118 }
119
120 @Override
121 public RegistrationReference registerChannel(
122 ScatteringByteChannel scatteringByteChannel,
123 GatheringByteChannel gatheringByteChannel)
124 throws IOException {
125
126 if (scatteringByteChannel == null) {
127 throw new NullPointerException("Scattering byte channel is null");
128 }
129
130 if (gatheringByteChannel == null) {
131 throw new NullPointerException("Gathering byte channel is null");
132 }
133
134 if (!(scatteringByteChannel instanceof SelectableChannel)) {
135 throw new IllegalArgumentException(
136 "Scattering byte channel is not of type SelectableChannel");
137 }
138
139 if (!(gatheringByteChannel instanceof SelectableChannel)) {
140 throw new IllegalArgumentException(
141 "Gathering byte channel is not of type SelectableChannel");
142 }
143
144 SelectableChannel readSelectableChannel =
145 (SelectableChannel)scatteringByteChannel;
146 SelectableChannel writeSelectableChannel =
147 (SelectableChannel)gatheringByteChannel;
148
149 if ((readSelectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
150 throw new IllegalArgumentException(
151 "Scattering byte channel is not valid for reading");
152 }
153
154 if ((writeSelectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
155 throw new IllegalArgumentException(
156 "Gathering byte channel is not valid for writing");
157 }
158
159 ensureOpen();
160
161 readSelectableChannel.configureBlocking(false);
162 writeSelectableChannel.configureBlocking(false);
163
164 FutureTask<RegistrationReference> registerFutureTask =
165 new FutureTask<RegistrationReference>(
166 new RegisterCallable(
167 readSelectableChannel, writeSelectableChannel));
168
169 registerQueue.offer(registerFutureTask);
170
171 selector.wakeup();
172
173 try {
174 return registerFutureTask.get();
175 }
176 catch (Exception e) {
177 throw new IOException(e);
178 }
179 }
180
181 @Override
182 protected void doSendDatagram(
183 RegistrationReference registrationReference, Datagram datagram) {
184
185 SelectionKeyRegistrationReference selectionKeyRegistrationReference =
186 (SelectionKeyRegistrationReference)registrationReference;
187
188 SelectionKey writeSelectionKey =
189 selectionKeyRegistrationReference.writeSelectionKey;
190
191 ChannelContext channelContext =
192 (ChannelContext)writeSelectionKey.attachment();
193
194 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
195
196 sendingQueue.offer(datagram);
197
198 synchronized (writeSelectionKey) {
199 int ops = writeSelectionKey.interestOps();
200
201 if ((ops & SelectionKey.OP_WRITE) == 0) {
202 ops |= SelectionKey.OP_WRITE;
203
204 writeSelectionKey.interestOps(ops);
205
206 selector.wakeup();
207 }
208 }
209 }
210
211 protected void registerChannels() {
212 FutureTask<RegistrationReference> registerFuturetask = null;
213
214 synchronized (selector) {
215 while ((registerFuturetask = registerQueue.poll()) != null) {
216 registerFuturetask.run();
217 }
218 }
219 }
220
221 protected static final ThreadFactory threadFactory = new NamedThreadFactory(
222 SelectorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
223 SelectorIntraband.class.getClassLoader());
224
225 protected final Thread pollingThread = threadFactory.newThread(
226 new PollingJob());
227 protected final Queue<FutureTask<RegistrationReference>> registerQueue =
228 new ConcurrentLinkedQueue<>();
229 protected final Selector selector = Selector.open();
230
231 protected class RegisterCallable
232 implements Callable<RegistrationReference> {
233
234 public RegisterCallable(
235 SelectableChannel readSelectableChannel,
236 SelectableChannel writeSelectableChannel) {
237
238 _readSelectableChannel = readSelectableChannel;
239 _writeSelectableChannel = writeSelectableChannel;
240 }
241
242 @Override
243 public RegistrationReference call() throws Exception {
244 if (_readSelectableChannel == _writeSelectableChannel) {
245
246
247
248
249
250 SelectionKey selectionKey = _readSelectableChannel.register(
251 selector, 0);
252
253 SelectionKeyRegistrationReference
254 selectionKeyRegistrationReference =
255 new SelectionKeyRegistrationReference(
256 SelectorIntraband.this, selectionKey, selectionKey);
257
258 ChannelContext channelContext = new ChannelContext(
259 new ConcurrentLinkedQueue<Datagram>());
260
261 channelContext.setRegistrationReference(
262 selectionKeyRegistrationReference);
263
264 selectionKey.attach(channelContext);
265
266
267
268 selectionKey.interestOps(SelectionKey.OP_READ);
269
270 return selectionKeyRegistrationReference;
271 }
272 else {
273
274
275
276
277
278 SelectionKey readSelectionKey = _readSelectableChannel.register(
279 selector, 0);
280
281 SelectionKey writeSelectionKey =
282 _writeSelectableChannel.register(selector, 0);
283
284 SelectionKeyRegistrationReference
285 selectionKeyRegistrationReference =
286 new SelectionKeyRegistrationReference(
287 SelectorIntraband.this, readSelectionKey,
288 writeSelectionKey);
289
290 ChannelContext channelContext = new ChannelContext(
291 new ConcurrentLinkedQueue<Datagram>());
292
293 channelContext.setRegistrationReference(
294 selectionKeyRegistrationReference);
295
296 readSelectionKey.attach(channelContext);
297 writeSelectionKey.attach(channelContext);
298
299
300
301 readSelectionKey.interestOps(SelectionKey.OP_READ);
302
303 return selectionKeyRegistrationReference;
304 }
305 }
306
307 private final SelectableChannel _readSelectableChannel;
308 private final SelectableChannel _writeSelectableChannel;
309
310 }
311
312 private void _processReading(SelectionKey selectionKey) {
313 ScatteringByteChannel scatteringByteChannel =
314 (ScatteringByteChannel)selectionKey.channel();
315
316 ChannelContext channelContext =
317 (ChannelContext)selectionKey.attachment();
318
319 handleReading(scatteringByteChannel, channelContext);
320 }
321
322 private void _processWriting(SelectionKey selectionKey) {
323 GatheringByteChannel gatheringByteChannel =
324 (GatheringByteChannel)selectionKey.channel();
325
326 ChannelContext channelContext =
327 (ChannelContext)selectionKey.attachment();
328
329 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
330
331 if (channelContext.getWritingDatagram() == null) {
332 channelContext.setWritingDatagram(sendingQueue.poll());
333 }
334
335 boolean backOff = false;
336
337 if (channelContext.getWritingDatagram() != null) {
338 if (handleWriting(gatheringByteChannel, channelContext)) {
339 if (sendingQueue.isEmpty()) {
340 backOff = true;
341 }
342 }
343 }
344 else {
345 backOff = true;
346 }
347
348 if (backOff) {
349
350
351
352
353 int ops = selectionKey.interestOps();
354
355 ops &= ~SelectionKey.OP_WRITE;
356
357 synchronized (selectionKey) {
358 if (sendingQueue.isEmpty()) {
359 selectionKey.interestOps(ops);
360 }
361 }
362 }
363 }
364
365 private static final Log _log = LogFactoryUtil.getLog(
366 SelectorIntraband.class);
367
368 private class PollingJob implements Runnable {
369
370 @Override
371 public void run() {
372 try {
373 try {
374 while (true) {
375 int readyCount = selector.select();
376
377 if (readyCount > 0) {
378 Set<SelectionKey> selectionKeys =
379 selector.selectedKeys();
380
381 Iterator<SelectionKey> iterator =
382 selectionKeys.iterator();
383
384 while (iterator.hasNext()) {
385 SelectionKey selectionKey = iterator.next();
386
387 iterator.remove();
388
389 try {
390 if (selectionKey.isReadable()) {
391 _processReading(selectionKey);
392 }
393
394 if (selectionKey.isWritable()) {
395 _processWriting(selectionKey);
396 }
397 }
398 catch (CancelledKeyException cke) {
399
400
401
402 }
403 }
404 }
405 else if (!selector.isOpen()) {
406 break;
407 }
408
409 registerChannels();
410 cleanUpTimeoutResponseWaitingDatagrams();
411 }
412 }
413 finally {
414 selector.close();
415 }
416 }
417 catch (ClosedSelectorException cse) {
418 if (_log.isInfoEnabled()) {
419 Thread currentThread = Thread.currentThread();
420
421 _log.info(
422 currentThread.getName() +
423 " exiting gracefully on selector closure");
424 }
425 }
426 catch (Throwable t) {
427 Thread currentThread = Thread.currentThread();
428
429 _log.error(
430 currentThread.getName() + " exiting exceptionally", t);
431 }
432
433
434
435
436 registerChannels();
437
438 responseWaitingMap.clear();
439 timeoutMap.clear();
440 }
441
442 }
443
444 }