001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.nonblocking;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
020    import com.liferay.portal.kernel.nio.intraband.ChannelContext;
021    import com.liferay.portal.kernel.nio.intraband.Datagram;
022    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
023    import com.liferay.portal.kernel.util.NamedThreadFactory;
024    
025    import java.io.IOException;
026    
027    import java.nio.channels.CancelledKeyException;
028    import java.nio.channels.Channel;
029    import java.nio.channels.ClosedSelectorException;
030    import java.nio.channels.GatheringByteChannel;
031    import java.nio.channels.ScatteringByteChannel;
032    import java.nio.channels.SelectableChannel;
033    import java.nio.channels.SelectionKey;
034    import java.nio.channels.Selector;
035    
036    import java.util.Iterator;
037    import java.util.Queue;
038    import java.util.Set;
039    import java.util.concurrent.Callable;
040    import java.util.concurrent.ConcurrentLinkedQueue;
041    import java.util.concurrent.FutureTask;
042    import java.util.concurrent.ThreadFactory;
043    
044    /**
045     * @author Shuyang Zhou
046     */
047    public class SelectorIntraband extends BaseIntraband {
048    
049            public SelectorIntraband(long defaultTimeout) throws IOException {
050                    super(defaultTimeout);
051    
052                    pollingThread.start();
053            }
054    
055            @Override
056            public void close() throws InterruptedException, IOException {
057                    selector.close();
058    
059                    pollingThread.interrupt();
060    
061                    pollingThread.join(defaultTimeout);
062    
063                    super.close();
064            }
065    
066            @Override
067            public RegistrationReference registerChannel(Channel channel)
068                    throws IOException {
069    
070                    if (channel == null) {
071                            throw new NullPointerException("Channel is null");
072                    }
073    
074                    if (!(channel instanceof GatheringByteChannel)) {
075                            throw new IllegalArgumentException(
076                                    "Channel is not of type GatheringByteChannel");
077                    }
078    
079                    if (!(channel instanceof ScatteringByteChannel)) {
080                            throw new IllegalArgumentException(
081                                    "Channel is not of type ScatteringByteChannel");
082                    }
083    
084                    if (!(channel instanceof SelectableChannel)) {
085                            throw new IllegalArgumentException(
086                                    "Channel is not of type SelectableChannel");
087                    }
088    
089                    SelectableChannel selectableChannel = (SelectableChannel)channel;
090    
091                    if ((selectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
092                            throw new IllegalArgumentException(
093                                    "Channel is not valid for reading");
094                    }
095    
096                    if ((selectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
097                            throw new IllegalArgumentException(
098                                    "Channel is not valid for writing");
099                    }
100    
101                    ensureOpen();
102    
103                    selectableChannel.configureBlocking(false);
104    
105                    FutureTask<RegistrationReference> registerFutureTask = new FutureTask<>(
106                            new RegisterCallable(selectableChannel, selectableChannel));
107    
108                    registerQueue.offer(registerFutureTask);
109    
110                    selector.wakeup();
111    
112                    try {
113                            return registerFutureTask.get();
114                    }
115                    catch (Exception e) {
116                            throw new IOException(e);
117                    }
118            }
119    
120            @Override
121            public RegistrationReference registerChannel(
122                            ScatteringByteChannel scatteringByteChannel,
123                            GatheringByteChannel gatheringByteChannel)
124                    throws IOException {
125    
126                    if (scatteringByteChannel == null) {
127                            throw new NullPointerException("Scattering byte channel is null");
128                    }
129    
130                    if (gatheringByteChannel == null) {
131                            throw new NullPointerException("Gathering byte channel is null");
132                    }
133    
134                    if (!(scatteringByteChannel instanceof SelectableChannel)) {
135                            throw new IllegalArgumentException(
136                                    "Scattering byte channel is not of type SelectableChannel");
137                    }
138    
139                    if (!(gatheringByteChannel instanceof SelectableChannel)) {
140                            throw new IllegalArgumentException(
141                                    "Gathering byte channel is not of type SelectableChannel");
142                    }
143    
144                    SelectableChannel readSelectableChannel =
145                            (SelectableChannel)scatteringByteChannel;
146                    SelectableChannel writeSelectableChannel =
147                            (SelectableChannel)gatheringByteChannel;
148    
149                    if ((readSelectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
150                            throw new IllegalArgumentException(
151                                    "Scattering byte channel is not valid for reading");
152                    }
153    
154                    if ((writeSelectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
155                            throw new IllegalArgumentException(
156                                    "Gathering byte channel is not valid for writing");
157                    }
158    
159                    ensureOpen();
160    
161                    readSelectableChannel.configureBlocking(false);
162                    writeSelectableChannel.configureBlocking(false);
163    
164                    FutureTask<RegistrationReference> registerFutureTask =
165                            new FutureTask<RegistrationReference>(
166                                    new RegisterCallable(
167                                            readSelectableChannel, writeSelectableChannel));
168    
169                    registerQueue.offer(registerFutureTask);
170    
171                    selector.wakeup();
172    
173                    try {
174                            return registerFutureTask.get();
175                    }
176                    catch (Exception e) {
177                            throw new IOException(e);
178                    }
179            }
180    
181            @Override
182            protected void doSendDatagram(
183                    RegistrationReference registrationReference, Datagram datagram) {
184    
185                    SelectionKeyRegistrationReference selectionKeyRegistrationReference =
186                            (SelectionKeyRegistrationReference)registrationReference;
187    
188                    SelectionKey writeSelectionKey =
189                            selectionKeyRegistrationReference.writeSelectionKey;
190    
191                    ChannelContext channelContext =
192                            (ChannelContext)writeSelectionKey.attachment();
193    
194                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
195    
196                    sendingQueue.offer(datagram);
197    
198                    synchronized (writeSelectionKey) {
199                            int ops = writeSelectionKey.interestOps();
200    
201                            if ((ops & SelectionKey.OP_WRITE) == 0) {
202                                    ops |= SelectionKey.OP_WRITE;
203    
204                                    writeSelectionKey.interestOps(ops);
205    
206                                    selector.wakeup();
207                            }
208                    }
209            }
210    
211            protected void registerChannels() {
212                    FutureTask<RegistrationReference> registerFuturetask = null;
213    
214                    synchronized (selector) {
215                            while ((registerFuturetask = registerQueue.poll()) != null) {
216                                    registerFuturetask.run();
217                            }
218                    }
219            }
220    
221            protected static final ThreadFactory threadFactory = new NamedThreadFactory(
222                    SelectorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
223                    SelectorIntraband.class.getClassLoader());
224    
225            protected final Thread pollingThread = threadFactory.newThread(
226                    new PollingJob());
227            protected final Queue<FutureTask<RegistrationReference>> registerQueue =
228                    new ConcurrentLinkedQueue<>();
229            protected final Selector selector = Selector.open();
230    
231            protected class RegisterCallable
232                    implements Callable<RegistrationReference> {
233    
234                    public RegisterCallable(
235                            SelectableChannel readSelectableChannel,
236                            SelectableChannel writeSelectableChannel) {
237    
238                            _readSelectableChannel = readSelectableChannel;
239                            _writeSelectableChannel = writeSelectableChannel;
240                    }
241    
242                    @Override
243                    public RegistrationReference call() throws Exception {
244                            if (_readSelectableChannel == _writeSelectableChannel) {
245    
246                                    // Register channel with zero interest, no dispatch will happen
247                                    // before channel context is ready. This ensures thread safe
248                                    // publication for ChannelContext#_registrationReference.
249    
250                                    SelectionKey selectionKey = _readSelectableChannel.register(
251                                            selector, 0);
252    
253                                    SelectionKeyRegistrationReference
254                                            selectionKeyRegistrationReference =
255                                                    new SelectionKeyRegistrationReference(
256                                                            SelectorIntraband.this, selectionKey, selectionKey);
257    
258                                    ChannelContext channelContext = new ChannelContext(
259                                            new ConcurrentLinkedQueue<Datagram>());
260    
261                                    channelContext.setRegistrationReference(
262                                            selectionKeyRegistrationReference);
263    
264                                    selectionKey.attach(channelContext);
265    
266                                    // Alter interest ops after preparing the channel context
267    
268                                    selectionKey.interestOps(SelectionKey.OP_READ);
269    
270                                    return selectionKeyRegistrationReference;
271                            }
272                            else {
273    
274                                    // Register channels with zero interest, no dispatch will happen
275                                    // before channel contexts are ready. This ensures thread safe
276                                    // publication for ChannelContext#_registrationReference.
277    
278                                    SelectionKey readSelectionKey = _readSelectableChannel.register(
279                                            selector, 0);
280    
281                                    SelectionKey writeSelectionKey =
282                                            _writeSelectableChannel.register(selector, 0);
283    
284                                    SelectionKeyRegistrationReference
285                                            selectionKeyRegistrationReference =
286                                                    new SelectionKeyRegistrationReference(
287                                                            SelectorIntraband.this, readSelectionKey,
288                                                            writeSelectionKey);
289    
290                                    ChannelContext channelContext = new ChannelContext(
291                                            new ConcurrentLinkedQueue<Datagram>());
292    
293                                    channelContext.setRegistrationReference(
294                                            selectionKeyRegistrationReference);
295    
296                                    readSelectionKey.attach(channelContext);
297                                    writeSelectionKey.attach(channelContext);
298    
299                                    // Alter interest ops after ChannelContexts preparation
300    
301                                    readSelectionKey.interestOps(SelectionKey.OP_READ);
302    
303                                    return selectionKeyRegistrationReference;
304                            }
305                    }
306    
307                    private final SelectableChannel _readSelectableChannel;
308                    private final SelectableChannel _writeSelectableChannel;
309    
310            }
311    
312            private void _processReading(SelectionKey selectionKey) {
313                    ScatteringByteChannel scatteringByteChannel =
314                            (ScatteringByteChannel)selectionKey.channel();
315    
316                    ChannelContext channelContext =
317                            (ChannelContext)selectionKey.attachment();
318    
319                    handleReading(scatteringByteChannel, channelContext);
320            }
321    
322            private void _processWriting(SelectionKey selectionKey) {
323                    GatheringByteChannel gatheringByteChannel =
324                            (GatheringByteChannel)selectionKey.channel();
325    
326                    ChannelContext channelContext =
327                            (ChannelContext)selectionKey.attachment();
328    
329                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
330    
331                    if (channelContext.getWritingDatagram() == null) {
332                            channelContext.setWritingDatagram(sendingQueue.poll());
333                    }
334    
335                    boolean backOff = false;
336    
337                    if (channelContext.getWritingDatagram() != null) {
338                            if (handleWriting(gatheringByteChannel, channelContext)) {
339                                    if (sendingQueue.isEmpty()) {
340                                            backOff = true;
341                                    }
342                            }
343                    }
344                    else {
345                            backOff = true;
346                    }
347    
348                    if (backOff) {
349    
350                            // Channel is still writable, but there is nothing to send, back off
351                            // to prevent unnecessary busy spinning.
352    
353                            int ops = selectionKey.interestOps();
354    
355                            ops &= ~SelectionKey.OP_WRITE;
356    
357                            synchronized (selectionKey) {
358                                    if (sendingQueue.isEmpty()) {
359                                            selectionKey.interestOps(ops);
360                                    }
361                            }
362                    }
363            }
364    
365            private static final Log _log = LogFactoryUtil.getLog(
366                    SelectorIntraband.class);
367    
368            private class PollingJob implements Runnable {
369    
370                    @Override
371                    public void run() {
372                            try {
373                                    try {
374                                            while (true) {
375                                                    int readyCount = selector.select();
376    
377                                                    if (readyCount > 0) {
378                                                            Set<SelectionKey> selectionKeys =
379                                                                    selector.selectedKeys();
380    
381                                                            Iterator<SelectionKey> iterator =
382                                                                    selectionKeys.iterator();
383    
384                                                            while (iterator.hasNext()) {
385                                                                    SelectionKey selectionKey = iterator.next();
386    
387                                                                    iterator.remove();
388    
389                                                                    try {
390                                                                            if (selectionKey.isReadable()) {
391                                                                                    _processReading(selectionKey);
392                                                                            }
393    
394                                                                            if (selectionKey.isWritable()) {
395                                                                                    _processWriting(selectionKey);
396                                                                            }
397                                                                    }
398                                                                    catch (CancelledKeyException cke) {
399    
400                                                                            // Concurrent cancelling, move to next key
401    
402                                                                    }
403                                                            }
404                                                    }
405                                                    else if (!selector.isOpen()) {
406                                                            break;
407                                                    }
408    
409                                                    registerChannels();
410                                                    cleanUpTimeoutResponseWaitingDatagrams();
411                                            }
412                                    }
413                                    finally {
414                                            selector.close();
415                                    }
416                            }
417                            catch (ClosedSelectorException cse) {
418                                    if (_log.isInfoEnabled()) {
419                                            Thread currentThread = Thread.currentThread();
420    
421                                            _log.info(
422                                                    currentThread.getName() +
423                                                            " exiting gracefully on selector closure");
424                                    }
425                            }
426                            catch (Throwable t) {
427                                    Thread currentThread = Thread.currentThread();
428    
429                                    _log.error(
430                                            currentThread.getName() + " exiting exceptionally", t);
431                            }
432    
433                            // Flush out pending register requests to unblock their invokers,
434                            // this will cause them to receive a ClosedSelectorException
435    
436                            registerChannels();
437    
438                            responseWaitingMap.clear();
439                            timeoutMap.clear();
440                    }
441    
442            }
443    
444    }