001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.blocking;
016    
017    import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
018    import com.liferay.portal.kernel.nio.intraband.ChannelContext;
019    import com.liferay.portal.kernel.nio.intraband.Datagram;
020    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021    import com.liferay.portal.kernel.util.NamedThreadFactory;
022    
023    import java.io.IOException;
024    
025    import java.nio.channels.Channel;
026    import java.nio.channels.GatheringByteChannel;
027    import java.nio.channels.ScatteringByteChannel;
028    import java.nio.channels.SelectableChannel;
029    
030    import java.util.Queue;
031    import java.util.concurrent.BlockingQueue;
032    import java.util.concurrent.Callable;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.ExecutorService;
035    import java.util.concurrent.Executors;
036    import java.util.concurrent.Future;
037    import java.util.concurrent.LinkedBlockingQueue;
038    import java.util.concurrent.ThreadFactory;
039    import java.util.concurrent.TimeUnit;
040    
041    /**
042     * @author Shuyang Zhou
043     */
044    public class ExecutorIntraband extends BaseIntraband {
045    
046            public ExecutorIntraband(long defaultTimeout) {
047                    super(defaultTimeout);
048            }
049    
050            @Override
051            public void close() throws InterruptedException, IOException {
052                    executorService.shutdownNow();
053    
054                    executorService.awaitTermination(defaultTimeout, TimeUnit.MILLISECONDS);
055    
056                    super.close();
057            }
058    
059            @Override
060            public RegistrationReference registerChannel(Channel channel) {
061                    if (channel == null) {
062                            throw new NullPointerException("Channel is null");
063                    }
064    
065                    if (!(channel instanceof GatheringByteChannel)) {
066                            throw new IllegalArgumentException(
067                                    "Channel is not of type GatheringByteChannel");
068                    }
069    
070                    if (!(channel instanceof ScatteringByteChannel)) {
071                            throw new IllegalArgumentException(
072                                    "Channel is not of type ScatteringByteChannel");
073                    }
074    
075                    if (channel instanceof SelectableChannel) {
076                            SelectableChannel selectableChannel = (SelectableChannel)channel;
077    
078                            if (!selectableChannel.isBlocking()) {
079                                    throw new IllegalArgumentException(
080                                            "Channel is of type SelectableChannel and " +
081                                                    "configured in nonblocking mode");
082                            }
083                    }
084    
085                    ensureOpen();
086    
087                    return doRegisterChannel(
088                            (ScatteringByteChannel)channel, (GatheringByteChannel)channel);
089            }
090    
091            @Override
092            public RegistrationReference registerChannel(
093                    ScatteringByteChannel scatteringByteChannel,
094                    GatheringByteChannel gatheringByteChannel) {
095    
096                    if (gatheringByteChannel == null) {
097                            throw new NullPointerException("Gathering byte channel is null");
098                    }
099    
100                    if (scatteringByteChannel == null) {
101                            throw new NullPointerException("Scattering byte channel is null");
102                    }
103    
104                    if (scatteringByteChannel instanceof SelectableChannel) {
105                            SelectableChannel selectableChannel =
106                                    (SelectableChannel)scatteringByteChannel;
107    
108                            if (!selectableChannel.isBlocking()) {
109                                    throw new IllegalArgumentException(
110                                            "Scattering byte channel is of type SelectableChannel " +
111                                                    "and configured in nonblocking mode");
112                            }
113                    }
114    
115                    if (gatheringByteChannel instanceof SelectableChannel) {
116                            SelectableChannel selectableChannel =
117                                    (SelectableChannel)gatheringByteChannel;
118    
119                            if (!selectableChannel.isBlocking()) {
120                                    throw new IllegalArgumentException(
121                                            "Gathering byte channel is of type SelectableChannel and " +
122                                                    "configured in nonblocking mode");
123                            }
124                    }
125    
126                    ensureOpen();
127    
128                    return doRegisterChannel(scatteringByteChannel, gatheringByteChannel);
129            }
130    
131            protected RegistrationReference doRegisterChannel(
132                    ScatteringByteChannel scatteringByteChannel,
133                    GatheringByteChannel gatheringByteChannel) {
134    
135                    BlockingQueue<Datagram> sendingQueue = new LinkedBlockingQueue<>();
136    
137                    ChannelContext channelContext = new ChannelContext(sendingQueue);
138    
139                    ReadingCallable readingCallable = new ReadingCallable(
140                            scatteringByteChannel, channelContext);
141                    WritingCallable writingCallable = new WritingCallable(
142                            gatheringByteChannel, channelContext);
143    
144                    // Submit the polling jobs, no dispatch will happen until latches are
145                    // open. This ensures a thread safe publication of
146                    // ChannelContext#_registrationReference.
147    
148                    Future<Void> readFuture = executorService.submit(readingCallable);
149                    Future<Void> writeFuture = executorService.submit(writingCallable);
150    
151                    FutureRegistrationReference futureRegistrationReference =
152                            new FutureRegistrationReference(
153                                    this, channelContext, readFuture, writeFuture);
154    
155                    channelContext.setRegistrationReference(futureRegistrationReference);
156    
157                    readingCallable.openLatch();
158                    writingCallable.openLatch();
159    
160                    return futureRegistrationReference;
161            }
162    
163            @Override
164            protected void doSendDatagram(
165                    RegistrationReference registrationReference, Datagram datagram) {
166    
167                    FutureRegistrationReference futureRegistrationReference =
168                            (FutureRegistrationReference)registrationReference;
169    
170                    ChannelContext channelContext =
171                            futureRegistrationReference.channelContext;
172    
173                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
174    
175                    sendingQueue.offer(datagram);
176            }
177    
178            protected static final ThreadFactory THREAD_FACTORY =
179                    new NamedThreadFactory(
180                            ExecutorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
181                            ExecutorIntraband.class.getClassLoader());
182    
183            protected final ExecutorService executorService =
184                    Executors.newCachedThreadPool(THREAD_FACTORY);
185    
186            protected class ReadingCallable implements Callable<Void> {
187    
188                    public ReadingCallable(
189                            ScatteringByteChannel scatteringByteChannel,
190                            ChannelContext channelContext) {
191    
192                            _scatteringByteChannel = scatteringByteChannel;
193                            _channelContext = channelContext;
194    
195                            _countDownLatch = new CountDownLatch(1);
196                    }
197    
198                    @Override
199                    public Void call() throws Exception {
200                            _countDownLatch.await();
201    
202                            while (_scatteringByteChannel.isOpen()) {
203                                    handleReading(_scatteringByteChannel, _channelContext);
204                            }
205    
206                            return null;
207                    }
208    
209                    public void openLatch() {
210                            _countDownLatch.countDown();
211                    }
212    
213                    private final ChannelContext _channelContext;
214                    private final CountDownLatch _countDownLatch;
215                    private final ScatteringByteChannel _scatteringByteChannel;
216    
217            }
218    
219            protected class WritingCallable implements Callable<Void> {
220    
221                    public WritingCallable(
222                            GatheringByteChannel gatheringByteChannel,
223                            ChannelContext channelContext) {
224    
225                            _gatheringByteChannel = gatheringByteChannel;
226                            _channelContext = channelContext;
227    
228                            _countDownLatch = new CountDownLatch(1);
229                    }
230    
231                    @Override
232                    public Void call() throws Exception {
233                            _countDownLatch.await();
234    
235                            try {
236                                    BlockingQueue<Datagram> sendingQueue =
237                                            (BlockingQueue<Datagram>)_channelContext.getSendingQueue();
238    
239                                    while (true) {
240                                            Datagram datagram = sendingQueue.take();
241    
242                                            _channelContext.setWritingDatagram(datagram);
243    
244                                            if (!handleWriting(
245                                                            _gatheringByteChannel, _channelContext)) {
246    
247                                                    if (_gatheringByteChannel.isOpen()) {
248    
249                                                            // Still open but no longer writable, typical
250                                                            // behavior of nonblocking channel
251    
252                                                            throw new IllegalStateException(
253                                                                    _gatheringByteChannel +
254                                                                            " behaved in nonblocking way.");
255                                                    }
256                                                    else {
257                                                            break;
258                                                    }
259                                            }
260    
261                                            cleanUpTimeoutResponseWaitingDatagrams();
262                                    }
263                            }
264                            catch (InterruptedException ie) {
265                            }
266    
267                            return null;
268                    }
269    
270                    public void openLatch() {
271                            _countDownLatch.countDown();
272                    }
273    
274                    private final ChannelContext _channelContext;
275                    private final CountDownLatch _countDownLatch;
276                    private final GatheringByteChannel _gatheringByteChannel;
277    
278            }
279    
280    }