001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.blocking;
016    
017    import com.liferay.portal.kernel.nio.intraband.BaseIntraBand;
018    import com.liferay.portal.kernel.nio.intraband.ChannelContext;
019    import com.liferay.portal.kernel.nio.intraband.Datagram;
020    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021    import com.liferay.portal.kernel.util.NamedThreadFactory;
022    
023    import java.io.IOException;
024    
025    import java.nio.channels.Channel;
026    import java.nio.channels.GatheringByteChannel;
027    import java.nio.channels.ScatteringByteChannel;
028    import java.nio.channels.SelectableChannel;
029    
030    import java.util.Queue;
031    import java.util.concurrent.BlockingQueue;
032    import java.util.concurrent.Callable;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.ExecutorService;
035    import java.util.concurrent.Executors;
036    import java.util.concurrent.Future;
037    import java.util.concurrent.LinkedBlockingQueue;
038    import java.util.concurrent.ThreadFactory;
039    import java.util.concurrent.TimeUnit;
040    
041    /**
042     * @author Shuyang Zhou
043     */
044    public class ExecutorIntraBand extends BaseIntraBand {
045    
046            public ExecutorIntraBand(long defaultTimeout) {
047                    super(defaultTimeout);
048            }
049    
050            @Override
051            public void close() throws InterruptedException, IOException {
052                    executorService.shutdownNow();
053    
054                    executorService.awaitTermination(defaultTimeout, TimeUnit.MILLISECONDS);
055    
056                    super.close();
057            }
058    
059            public RegistrationReference registerChannel(Channel channel) {
060                    if (channel == null) {
061                            throw new NullPointerException("Channel is null");
062                    }
063    
064                    if (!(channel instanceof GatheringByteChannel)) {
065                            throw new IllegalArgumentException(
066                                    "Channel is not of type GatheringByteChannel");
067                    }
068    
069                    if (!(channel instanceof ScatteringByteChannel)) {
070                            throw new IllegalArgumentException(
071                                    "Channel is not of type ScatteringByteChannel");
072                    }
073    
074                    if (channel instanceof SelectableChannel) {
075                            SelectableChannel selectableChannel = (SelectableChannel)channel;
076    
077                            if (!selectableChannel.isBlocking()) {
078                                    throw new IllegalArgumentException(
079                                            "Channel is of type SelectableChannel and " +
080                                                    "configured in nonblocking mode");
081                            }
082                    }
083    
084                    ensureOpen();
085    
086                    return doRegisterChannel(
087                            (ScatteringByteChannel)channel, (GatheringByteChannel)channel);
088            }
089    
090            public RegistrationReference registerChannel(
091                    ScatteringByteChannel scatteringByteChannel,
092                    GatheringByteChannel gatheringByteChannel) {
093    
094                    if (gatheringByteChannel == null) {
095                            throw new NullPointerException("Gathering byte channel is null");
096                    }
097    
098                    if (scatteringByteChannel == null) {
099                            throw new NullPointerException("Scattering byte channel is null");
100                    }
101    
102                    if (scatteringByteChannel instanceof SelectableChannel) {
103                            SelectableChannel selectableChannel =
104                                    (SelectableChannel)scatteringByteChannel;
105    
106                            if (!selectableChannel.isBlocking()) {
107                                    throw new IllegalArgumentException(
108                                            "Scattering byte channel is of type SelectableChannel " +
109                                                    "and configured in nonblocking mode");
110                            }
111                    }
112    
113                    if (gatheringByteChannel instanceof SelectableChannel) {
114                            SelectableChannel selectableChannel =
115                                    (SelectableChannel)gatheringByteChannel;
116    
117                            if (!selectableChannel.isBlocking()) {
118                                    throw new IllegalArgumentException(
119                                            "Gathering byte channel is of type SelectableChannel and " +
120                                                    "configured in nonblocking mode");
121                            }
122                    }
123    
124                    ensureOpen();
125    
126                    return doRegisterChannel(scatteringByteChannel, gatheringByteChannel);
127            }
128    
129            protected RegistrationReference doRegisterChannel(
130                    ScatteringByteChannel scatteringByteChannel,
131                    GatheringByteChannel gatheringByteChannel) {
132    
133                    BlockingQueue<Datagram> sendingQueue =
134                            new LinkedBlockingQueue<Datagram>();
135    
136                    ChannelContext channelContext = new ChannelContext(sendingQueue);
137    
138                    ReadingCallable readingCallable = new ReadingCallable(
139                            scatteringByteChannel, channelContext);
140                    WritingCallable writingCallable = new WritingCallable(
141                            gatheringByteChannel, channelContext);
142    
143                    // Submit the polling jobs, no dispatch will happen until latches are
144                    // open. This ensures a thread safe publication of
145                    // ChannelContext#_registrationReference.
146    
147                    Future<Void> readFuture = executorService.submit(readingCallable);
148                    Future<Void> writeFuture = executorService.submit(writingCallable);
149    
150                    FutureRegistrationReference futureRegistrationReference =
151                            new FutureRegistrationReference(
152                                    this, channelContext, readFuture, writeFuture);
153    
154                    channelContext.setRegistrationReference(futureRegistrationReference);
155    
156                    readingCallable.openLatch();
157                    writingCallable.openLatch();
158    
159                    return futureRegistrationReference;
160            }
161    
162            @Override
163            protected void doSendDatagram(
164                    RegistrationReference registrationReference, Datagram datagram) {
165    
166                    FutureRegistrationReference futureRegistrationReference =
167                            (FutureRegistrationReference)registrationReference;
168    
169                    ChannelContext channelContext =
170                            futureRegistrationReference.channelContext;
171    
172                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
173    
174                    sendingQueue.offer(datagram);
175            }
176    
177            protected static final ThreadFactory THREAD_FACTORY =
178                    new NamedThreadFactory(
179                            ExecutorIntraBand.class + ".threadFactory", Thread.NORM_PRIORITY,
180                            ExecutorIntraBand.class.getClassLoader());
181    
182            protected final ExecutorService executorService =
183                    Executors.newCachedThreadPool(THREAD_FACTORY);
184    
185            protected class ReadingCallable implements Callable<Void> {
186    
187                    public ReadingCallable(
188                            ScatteringByteChannel scatteringByteChannel,
189                            ChannelContext channelContext) {
190    
191                            _scatteringByteChannel = scatteringByteChannel;
192                            _channelContext = channelContext;
193    
194                            _countDownLatch = new CountDownLatch(1);
195                    }
196    
197                    public Void call() throws Exception {
198                            _countDownLatch.await();
199    
200                            while (_scatteringByteChannel.isOpen()) {
201                                    handleReading(_scatteringByteChannel, _channelContext);
202                            }
203    
204                            return null;
205                    }
206    
207                    public void openLatch() {
208                            _countDownLatch.countDown();
209                    }
210    
211                    private final ChannelContext _channelContext;
212                    private final CountDownLatch _countDownLatch;
213                    private final ScatteringByteChannel _scatteringByteChannel;
214    
215            }
216    
217            protected class WritingCallable implements Callable<Void> {
218    
219                    public WritingCallable(
220                            GatheringByteChannel gatheringByteChannel,
221                            ChannelContext channelContext) {
222    
223                            _gatheringByteChannel = gatheringByteChannel;
224                            _channelContext = channelContext;
225    
226                            _countDownLatch = new CountDownLatch(1);
227                    }
228    
229                    public Void call() throws Exception {
230                            _countDownLatch.await();
231    
232                            try {
233                                    BlockingQueue<Datagram> sendingQueue =
234                                            (BlockingQueue<Datagram>)_channelContext.getSendingQueue();
235    
236                                    while (true) {
237                                            Datagram datagram = sendingQueue.take();
238    
239                                            _channelContext.setWritingDatagram(datagram);
240    
241                                            if (!handleWriting(
242                                                            _gatheringByteChannel, _channelContext)) {
243    
244                                                    if (_gatheringByteChannel.isOpen()) {
245    
246                                                            // Still open but no longer writable, typical
247                                                            // behavior of nonblocking channel
248    
249                                                            throw new IllegalStateException(
250                                                                    _gatheringByteChannel +
251                                                                            " behaved in nonblocking way.");
252                                                    }
253                                                    else {
254                                                            break;
255                                                    }
256                                            }
257                                    }
258                            }
259                            catch (InterruptedException ie) {
260                            }
261    
262                            return null;
263                    }
264    
265                    public void openLatch() {
266                            _countDownLatch.countDown();
267                    }
268    
269                    private final ChannelContext _channelContext;
270                    private final CountDownLatch _countDownLatch;
271                    private final GatheringByteChannel _gatheringByteChannel;
272    
273            }
274    
275    }