001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.nonblocking;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.BaseIntraBand;
020    import com.liferay.portal.kernel.nio.intraband.ChannelContext;
021    import com.liferay.portal.kernel.nio.intraband.Datagram;
022    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
023    import com.liferay.portal.kernel.util.NamedThreadFactory;
024    
025    import java.io.IOException;
026    
027    import java.nio.channels.CancelledKeyException;
028    import java.nio.channels.Channel;
029    import java.nio.channels.ClosedSelectorException;
030    import java.nio.channels.GatheringByteChannel;
031    import java.nio.channels.ScatteringByteChannel;
032    import java.nio.channels.SelectableChannel;
033    import java.nio.channels.SelectionKey;
034    import java.nio.channels.Selector;
035    
036    import java.util.Iterator;
037    import java.util.Queue;
038    import java.util.Set;
039    import java.util.concurrent.Callable;
040    import java.util.concurrent.ConcurrentLinkedQueue;
041    import java.util.concurrent.FutureTask;
042    import java.util.concurrent.ThreadFactory;
043    
044    /**
045     * @author Shuyang Zhou
046     */
047    public class SelectorIntraBand extends BaseIntraBand {
048    
049            public SelectorIntraBand(long defaultTimeout) throws IOException {
050                    super(defaultTimeout);
051    
052                    pollingThread.start();
053            }
054    
055            @Override
056            public void close() throws InterruptedException, IOException {
057                    selector.close();
058    
059                    pollingThread.interrupt();
060    
061                    pollingThread.join(defaultTimeout);
062    
063                    super.close();
064            }
065    
066            public RegistrationReference registerChannel(Channel channel)
067                    throws IOException {
068    
069                    if (channel == null) {
070                            throw new NullPointerException("Channel is null");
071                    }
072    
073                    if (!(channel instanceof GatheringByteChannel)) {
074                            throw new IllegalArgumentException(
075                                    "Channel is not of type GatheringByteChannel");
076                    }
077    
078                    if (!(channel instanceof ScatteringByteChannel)) {
079                            throw new IllegalArgumentException(
080                                    "Channel is not of type ScatteringByteChannel");
081                    }
082    
083                    if (!(channel instanceof SelectableChannel)) {
084                            throw new IllegalArgumentException(
085                                    "Channel is not of type SelectableChannel");
086                    }
087    
088                    SelectableChannel selectableChannel = (SelectableChannel)channel;
089    
090                    if ((selectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
091                            throw new IllegalArgumentException(
092                                    "Channel is not valid for reading");
093                    }
094    
095                    if ((selectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
096                            throw new IllegalArgumentException(
097                                    "Channel is not valid for writing");
098                    }
099    
100                    ensureOpen();
101    
102                    selectableChannel.configureBlocking(false);
103    
104                    FutureTask<RegistrationReference> registerFutureTask =
105                            new FutureTask<RegistrationReference>(
106                                    new RegisterCallable(selectableChannel, selectableChannel));
107    
108                    registerQueue.offer(registerFutureTask);
109    
110                    selector.wakeup();
111    
112                    try {
113                            return registerFutureTask.get();
114                    }
115                    catch (Exception e) {
116                            throw new IOException(e);
117                    }
118            }
119    
120            public RegistrationReference registerChannel(
121                            ScatteringByteChannel scatteringByteChannel,
122                            GatheringByteChannel gatheringByteChannel)
123                    throws IOException {
124    
125                    if (scatteringByteChannel == null) {
126                            throw new NullPointerException("Scattering byte channel is null");
127                    }
128    
129                    if (gatheringByteChannel == null) {
130                            throw new NullPointerException("Gathering byte channel is null");
131                    }
132    
133                    if (!(scatteringByteChannel instanceof SelectableChannel)) {
134                            throw new IllegalArgumentException(
135                                    "Scattering byte channel is not of type SelectableChannel");
136                    }
137    
138                    if (!(gatheringByteChannel instanceof SelectableChannel)) {
139                            throw new IllegalArgumentException(
140                                    "Gathering byte channel is not of type SelectableChannel");
141                    }
142    
143                    SelectableChannel readSelectableChannel =
144                            (SelectableChannel)scatteringByteChannel;
145                    SelectableChannel writeSelectableChannel =
146                            (SelectableChannel)gatheringByteChannel;
147    
148                    if ((readSelectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
149                            throw new IllegalArgumentException(
150                                    "Scattering byte channel is not valid for reading");
151                    }
152    
153                    if ((writeSelectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
154                            throw new IllegalArgumentException(
155                                    "Gathering byte channel is not valid for writing");
156                    }
157    
158                    ensureOpen();
159    
160                    readSelectableChannel.configureBlocking(false);
161                    writeSelectableChannel.configureBlocking(false);
162    
163                    FutureTask<RegistrationReference> registerFutureTask =
164                            new FutureTask<RegistrationReference>(
165                                    new RegisterCallable(
166                                            readSelectableChannel, writeSelectableChannel));
167    
168                    registerQueue.offer(registerFutureTask);
169    
170                    selector.wakeup();
171    
172                    try {
173                            return registerFutureTask.get();
174                    }
175                    catch (Exception e) {
176                            throw new IOException(e);
177                    }
178            }
179    
180            @Override
181            protected void doSendDatagram(
182                    RegistrationReference registrationReference, Datagram datagram) {
183    
184                    SelectionKeyRegistrationReference selectionKeyRegistrationReference =
185                            (SelectionKeyRegistrationReference)registrationReference;
186    
187                    SelectionKey writeSelectionKey =
188                            selectionKeyRegistrationReference.writeSelectionKey;
189    
190                    ChannelContext channelContext =
191                            (ChannelContext)writeSelectionKey.attachment();
192    
193                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
194    
195                    sendingQueue.offer(datagram);
196    
197                    synchronized (writeSelectionKey) {
198                            int ops = writeSelectionKey.interestOps();
199    
200                            if ((ops & SelectionKey.OP_WRITE) == 0) {
201                                    ops |= SelectionKey.OP_WRITE;
202    
203                                    writeSelectionKey.interestOps(ops);
204    
205                                    selector.wakeup();
206                            }
207                    }
208            }
209    
210            protected void registerChannels() {
211                    FutureTask<RegistrationReference> registerFuturetask = null;
212    
213                    while ((registerFuturetask = registerQueue.poll()) != null) {
214                            registerFuturetask.run();
215                    }
216            }
217    
218            protected static final ThreadFactory threadFactory =
219                    new NamedThreadFactory(
220                            SelectorIntraBand.class + ".threadFactory", Thread.NORM_PRIORITY,
221                            SelectorIntraBand.class.getClassLoader());
222    
223            protected final Thread pollingThread = threadFactory.newThread(
224                    new PollingJob());
225            protected final Queue<FutureTask<RegistrationReference>> registerQueue =
226                    new ConcurrentLinkedQueue<FutureTask<RegistrationReference>>();
227            protected final Selector selector = Selector.open();
228    
229            protected class RegisterCallable
230                    implements Callable<RegistrationReference> {
231    
232                    public RegisterCallable(
233                            SelectableChannel readSelectableChannel,
234                            SelectableChannel writeSelectableChannel) {
235    
236                            _readSelectableChannel = readSelectableChannel;
237                            _writeSelectableChannel = writeSelectableChannel;
238                    }
239    
240                    public RegistrationReference call() throws Exception {
241                            if (_readSelectableChannel == _writeSelectableChannel) {
242    
243                                    // Register channel with zero interest, no dispatch will happen
244                                    // before channel context is ready. This ensures thread safe
245                                    // publication for ChannelContext#_registrationReference.
246    
247                                    SelectionKey selectionKey = _readSelectableChannel.register(
248                                            selector, 0);
249    
250                                    SelectionKeyRegistrationReference
251                                            selectionKeyRegistrationReference =
252                                                    new SelectionKeyRegistrationReference(
253                                                            SelectorIntraBand.this, selectionKey, selectionKey);
254    
255                                    ChannelContext channelContext = new ChannelContext(
256                                            new ConcurrentLinkedQueue<Datagram>());
257    
258                                    channelContext.setRegistrationReference(
259                                            selectionKeyRegistrationReference);
260    
261                                    selectionKey.attach(channelContext);
262    
263                                    // Alter interest ops after preparing the channel context
264    
265                                    selectionKey.interestOps(
266                                            SelectionKey.OP_READ | SelectionKey.OP_WRITE);
267    
268                                    return selectionKeyRegistrationReference;
269                            }
270                            else {
271    
272                                    // Register channels with zero interest, no dispatch will happen
273                                    // before channel contexts are ready. This ensures thread safe
274                                    // publication for ChannelContext#_registrationReference.
275    
276                                    SelectionKey readSelectionKey = _readSelectableChannel.register(
277                                            selector, 0);
278    
279                                    SelectionKey writeSelectionKey =
280                                            _writeSelectableChannel.register(selector, 0);
281    
282                                    SelectionKeyRegistrationReference
283                                            selectionKeyRegistrationReference =
284                                                    new SelectionKeyRegistrationReference(
285                                                            SelectorIntraBand.this, readSelectionKey,
286                                                            writeSelectionKey);
287    
288                                    ChannelContext channelContext = new ChannelContext(
289                                            new ConcurrentLinkedQueue<Datagram>());
290    
291                                    channelContext.setRegistrationReference(
292                                            selectionKeyRegistrationReference);
293    
294                                    readSelectionKey.attach(channelContext);
295                                    writeSelectionKey.attach(channelContext);
296    
297                                    // Alter interest ops after ChannelContexts preparation
298    
299                                    readSelectionKey.interestOps(SelectionKey.OP_READ);
300                                    writeSelectionKey.interestOps(SelectionKey.OP_WRITE);
301    
302                                    return selectionKeyRegistrationReference;
303                            }
304                    }
305    
306                    private final SelectableChannel _readSelectableChannel;
307                    private final SelectableChannel _writeSelectableChannel;
308    
309            }
310    
311            private void _processReading(SelectionKey selectionKey) {
312                    ScatteringByteChannel scatteringByteChannel =
313                            (ScatteringByteChannel)selectionKey.channel();
314    
315                    ChannelContext channelContext =
316                            (ChannelContext)selectionKey.attachment();
317    
318                    handleReading(scatteringByteChannel, channelContext);
319            }
320    
321            private void _processWriting(SelectionKey selectionKey) {
322                    GatheringByteChannel gatheringByteChannel =
323                            (GatheringByteChannel)selectionKey.channel();
324    
325                    ChannelContext channelContext =
326                            (ChannelContext)selectionKey.attachment();
327    
328                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
329    
330                    boolean ready = false;
331    
332                    if (channelContext.getWritingDatagram() == null) {
333                            Datagram datagram = sendingQueue.poll();
334    
335                            if (datagram != null) {
336                                    channelContext.setWritingDatagram(datagram);
337    
338                                    ready = true;
339                            }
340                    }
341                    else {
342                            ready = true;
343                    }
344    
345                    if (ready) {
346                            if (handleWriting(gatheringByteChannel, channelContext)) {
347                                    if (sendingQueue.isEmpty()) {
348    
349                                            // Channel is still writable, but there is nothing to send,
350                                            // back off to prevent unnecessary busy spinning.
351    
352                                            int ops = selectionKey.interestOps();
353    
354                                            ops &= ~SelectionKey.OP_WRITE;
355    
356                                            synchronized (selectionKey) {
357                                                    if (sendingQueue.isEmpty()) {
358                                                            selectionKey.interestOps(ops);
359                                                    }
360                                            }
361                                    }
362                            }
363                    }
364            }
365    
366            private static Log _log = LogFactoryUtil.getLog(SelectorIntraBand.class);
367    
368            private class PollingJob implements Runnable {
369    
370                    public void run() {
371                            try {
372                                    try {
373                                            while (true) {
374                                                    int readyCount = selector.select();
375    
376                                                    if (readyCount > 0) {
377                                                            Set<SelectionKey> selectionKeys =
378                                                                    selector.selectedKeys();
379    
380                                                            Iterator<SelectionKey> iterator =
381                                                                    selectionKeys.iterator();
382    
383                                                            while (iterator.hasNext()) {
384                                                                    SelectionKey selectionKey = iterator.next();
385    
386                                                                    iterator.remove();
387    
388                                                                    try {
389                                                                            if (selectionKey.isReadable()) {
390                                                                                    _processReading(selectionKey);
391                                                                            }
392    
393                                                                            if (selectionKey.isWritable()) {
394                                                                                    _processWriting(selectionKey);
395                                                                            }
396                                                                    }
397                                                                    catch (CancelledKeyException cke) {
398    
399                                                                            // Concurrent cancelling, move to next key
400    
401                                                                    }
402                                                            }
403                                                    }
404                                                    else if (!selector.isOpen()) {
405                                                            break;
406                                                    }
407    
408                                                    registerChannels();
409                                                    cleanUpTimeoutResponseWaitingDatagrams();
410                                            }
411                                    }
412                                    finally {
413                                            selector.close();
414                                    }
415                            }
416                            catch (ClosedSelectorException cse) {
417                                    if (_log.isInfoEnabled()) {
418                                            Thread currentThread = Thread.currentThread();
419    
420                                            _log.info(
421                                                    currentThread.getName() +
422                                                            " exiting gracefully on selector closure");
423                                    }
424                            }
425                            catch (Throwable t) {
426                                    Thread currentThread = Thread.currentThread();
427    
428                                    _log.error(
429                                            currentThread.getName() + " exiting exceptionally", t);
430                            }
431    
432                            // Flush out pending register requests to unblock their invokers,
433                            // this will cause them to receive a ClosedSelectorException
434    
435                            registerChannels();
436    
437                            responseWaitingMap.clear();
438                            timeoutMap.clear();
439                    }
440    
441            }
442    
443    }