001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
020    
021    import java.io.IOException;
022    
023    import java.nio.channels.GatheringByteChannel;
024    import java.nio.channels.ScatteringByteChannel;
025    
026    import java.util.EnumSet;
027    import java.util.Iterator;
028    import java.util.Map;
029    import java.util.NavigableMap;
030    import java.util.Set;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.ConcurrentSkipListMap;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.TimeoutException;
036    import java.util.concurrent.atomic.AtomicLong;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    /**
040     * @author Shuyang Zhou
041     */
042    public abstract class BaseIntraBand implements IntraBand {
043    
044            public BaseIntraBand(long defaultTimeout) {
045                    this.defaultTimeout = defaultTimeout;
046            }
047    
048            @SuppressWarnings("unused")
049            public void close() throws InterruptedException, IOException {
050                    datagramReceiveHandlersReference.set(null);
051    
052                    open = false;
053            }
054    
055            public DatagramReceiveHandler[] getDatagramReceiveHandlers() {
056                    ensureOpen();
057    
058                    DatagramReceiveHandler[] datagramReceiveHandlers =
059                            datagramReceiveHandlersReference.get();
060    
061                    return datagramReceiveHandlers.clone();
062            }
063    
064            public boolean isOpen() {
065                    return open;
066            }
067    
068            public DatagramReceiveHandler registerDatagramReceiveHandler(
069                    byte type, DatagramReceiveHandler datagramReceiveHandler) {
070    
071                    ensureOpen();
072    
073                    int index = type & 0xFF;
074    
075                    DatagramReceiveHandler oldDatagramReceiveHandler = null;
076                    DatagramReceiveHandler[] datagramReceiveHandlers = null;
077                    DatagramReceiveHandler[] copyDatagramReceiveHandlers = null;
078    
079                    do {
080                            datagramReceiveHandlers = datagramReceiveHandlersReference.get();
081    
082                            copyDatagramReceiveHandlers = datagramReceiveHandlers.clone();
083    
084                            oldDatagramReceiveHandler = copyDatagramReceiveHandlers[index];
085    
086                            copyDatagramReceiveHandlers[index] = datagramReceiveHandler;
087                    }
088                    while (
089                            !datagramReceiveHandlersReference.compareAndSet(
090                                    datagramReceiveHandlers, copyDatagramReceiveHandlers));
091    
092                    return oldDatagramReceiveHandler;
093            }
094    
095            public void sendDatagram(
096                    RegistrationReference registrationReference, Datagram datagram) {
097    
098                    if (registrationReference == null) {
099                            throw new NullPointerException("Registration reference is null");
100                    }
101    
102                    if (!registrationReference.isValid()) {
103                            throw new IllegalArgumentException(
104                                    "Registration reference is invalid");
105                    }
106    
107                    if (datagram == null) {
108                            throw new NullPointerException("Datagram is null");
109                    }
110    
111                    ensureOpen();
112    
113                    doSendDatagram(registrationReference, datagram);
114            }
115    
116            public <A> void sendDatagram(
117                    RegistrationReference registrationReference, Datagram datagram,
118                    A attachment, EnumSet<CompletionHandler.CompletionType> completionTypes,
119                    CompletionHandler<A> completionHandler) {
120    
121                    sendDatagram(
122                            registrationReference, datagram, attachment, completionTypes,
123                            completionHandler, defaultTimeout, TimeUnit.MILLISECONDS);
124            }
125    
126            public <A> void sendDatagram(
127                    RegistrationReference registrationReference, Datagram datagram,
128                    A attachment, EnumSet<CompletionType> completionTypes,
129                    CompletionHandler<A> completionHandler, long timeout,
130                    TimeUnit timeUnit) {
131    
132                    if (registrationReference == null) {
133                            throw new NullPointerException("Registration reference is null");
134                    }
135    
136                    if (!registrationReference.isValid()) {
137                            throw new IllegalArgumentException(
138                                    "Registration reference is invalid");
139                    }
140    
141                    if (datagram == null) {
142                            throw new NullPointerException("Datagram is null");
143                    }
144    
145                    if (completionTypes == null) {
146                            throw new NullPointerException("Completion type set is null");
147                    }
148    
149                    if (completionTypes.isEmpty()) {
150                            throw new IllegalArgumentException("Completion type set is empty");
151                    }
152    
153                    if (completionHandler == null) {
154                            throw new NullPointerException("Complete handler is null");
155                    }
156    
157                    if (timeUnit == null) {
158                            throw new NullPointerException("Time unit is null");
159                    }
160    
161                    if (timeout <= 0) {
162                            timeout = defaultTimeout;
163                    }
164                    else {
165                            timeout = timeUnit.toMillis(timeout);
166                    }
167    
168                    ensureOpen();
169    
170                    datagram.attachment = attachment;
171                    datagram.completionHandler =
172                            (CompletionHandler<Object>)completionHandler;
173                    datagram.completionTypes = completionTypes;
174                    datagram.timeout = timeout;
175    
176                    datagram.setAckRequest(
177                            completionTypes.contains(CompletionType.DELIVERED));
178    
179                    if (datagram.getSequenceId() == 0) {
180                            datagram.setSequenceId(generateSequenceId());
181                    }
182    
183                    if (completionTypes.contains(CompletionType.DELIVERED) ||
184                            completionTypes.contains(CompletionType.REPLIED)) {
185    
186                            addResponseWaitingDatagram(datagram);
187                    }
188    
189                    doSendDatagram(registrationReference, datagram);
190            }
191    
192            public Datagram sendSyncDatagram(
193                            RegistrationReference registrationReference, Datagram datagram)
194                    throws InterruptedException, IOException, TimeoutException {
195    
196                    return sendSyncDatagram(
197                            registrationReference, datagram, defaultTimeout,
198                            TimeUnit.MILLISECONDS);
199            }
200    
201            public Datagram sendSyncDatagram(
202                            RegistrationReference registrationReference, Datagram datagram,
203                            long timeout, TimeUnit timeUnit)
204                    throws InterruptedException, IOException, TimeoutException {
205    
206                    if (registrationReference == null) {
207                            throw new NullPointerException("Registration reference is null");
208                    }
209    
210                    if (!registrationReference.isValid()) {
211                            throw new IllegalArgumentException(
212                                    "Registration reference is invalid");
213                    }
214    
215                    if (datagram == null) {
216                            throw new NullPointerException("Datagram is null");
217                    }
218    
219                    if (timeUnit == null) {
220                            throw new NullPointerException("Time unit is null");
221                    }
222    
223                    if (timeout <= 0) {
224                            timeout = defaultTimeout;
225                    }
226                    else {
227                            timeout = timeUnit.toMillis(timeout);
228                    }
229    
230                    ensureOpen();
231    
232                    return doSendSyncDatagram(registrationReference, datagram, timeout);
233            }
234    
235            public DatagramReceiveHandler unregisterDatagramReceiveHandler(byte type) {
236                    return registerDatagramReceiveHandler(type, null);
237            }
238    
239            protected void addResponseWaitingDatagram(Datagram requestDatagram) {
240                    long sequenceId = requestDatagram.getSequenceId();
241    
242                    long expireTime = System.currentTimeMillis() + requestDatagram.timeout;
243    
244                    requestDatagram.expireTime = expireTime;
245    
246                    responseWaitingMap.put(sequenceId, requestDatagram);
247    
248                    timeoutMap.put(expireTime, sequenceId);
249            }
250    
251            protected void cleanUpTimeoutResponseWaitingDatagrams() {
252                    Map<Long, Long> map = timeoutMap.headMap(
253                            System.currentTimeMillis(), true);
254    
255                    if (map.isEmpty()) {
256                            return;
257                    }
258    
259                    Set<Map.Entry<Long, Long>> set = map.entrySet();
260    
261                    Iterator<Map.Entry<Long, Long>> iterator = set.iterator();
262    
263                    while (iterator.hasNext()) {
264                            Map.Entry<Long, Long> entry = iterator.next();
265    
266                            iterator.remove();
267    
268                            Long sequenceId = entry.getValue();
269    
270                            Datagram datagram = responseWaitingMap.remove(sequenceId);
271    
272                            if (_log.isWarnEnabled()) {
273                                    _log.warn(
274                                            "Removed timeout response waiting datagram " + datagram);
275                            }
276    
277                            datagram.completionHandler.timeouted(datagram.attachment);
278                    }
279            }
280    
281            protected abstract void doSendDatagram(
282                    RegistrationReference registrationReference, Datagram datagram);
283    
284            protected Datagram doSendSyncDatagram(
285                            RegistrationReference registrationReference, Datagram datagram,
286                            long timeout)
287                    throws InterruptedException, IOException, TimeoutException {
288    
289                    SendSyncDatagramCompletionHandler sendSyncDatagramCompletionHandler =
290                            new SendSyncDatagramCompletionHandler();
291    
292                    datagram.completionHandler = sendSyncDatagramCompletionHandler;
293                    datagram.completionTypes = REPLIED_ENUM_SET;
294                    datagram.timeout = timeout;
295    
296                    if (datagram.getSequenceId() == 0) {
297                            datagram.setSequenceId(generateSequenceId());
298                    }
299    
300                    addResponseWaitingDatagram(datagram);
301    
302                    doSendDatagram(registrationReference, datagram);
303    
304                    return sendSyncDatagramCompletionHandler.waitResult(timeout);
305            }
306    
307            protected void ensureOpen() {
308                    if (!isOpen()) {
309                            throw new ClosedIntraBandException();
310                    }
311            }
312    
313            protected long generateSequenceId() {
314                    long sequenceId = sequenceIdGenerator.incrementAndGet();
315    
316                    if (sequenceId <= 0) {
317    
318                            // We assume a long primitive type can hold enough numbers to keep a
319                            // large window time between the earliest and the latest response
320                            // waiting datagrams. In a real system, we will run out of memory
321                            // long before the latest response waiting datagram's ID can catch
322                            // up to the earliest response waiting datagram's ID to cause an ID
323                            // conflict. Even if the sequence ID generator was the only code to
324                            // use memory (which will never be true), to see an ID conflict, we
325                            // need to hold up 2^63 references. Even if we did not factor in the
326                            // data inside the datagram, and considered just the references
327                            // themselves, we would need 2^65 byte or 32 EB (exbibyte) of
328                            // memory, which is impossible in existing computer systems.
329    
330                            while (true) {
331                                    if (sequenceIdGenerator.compareAndSet(sequenceId, 1)) {
332                                            return 1;
333                                    }
334    
335                                    sequenceId = sequenceIdGenerator.incrementAndGet();
336    
337                                    // Another concurrent reset just happened
338    
339                                    if (sequenceId > 0) {
340                                            return sequenceId;
341                                    }
342    
343                            }
344                    }
345    
346                    return sequenceId;
347            }
348    
349            protected void handleReading(
350                    ScatteringByteChannel scatteringByteChannel,
351                    ChannelContext channelContext) {
352    
353                    Datagram datagram = channelContext.getReadingDatagram();
354    
355                    if (datagram == null) {
356                            datagram = Datagram.createReceiveDatagram();
357    
358                            channelContext.setReadingDatagram(datagram);
359                    }
360    
361                    try {
362                            if (datagram.readFrom(scatteringByteChannel)) {
363                                    channelContext.setReadingDatagram(
364                                            Datagram.createReceiveDatagram());
365    
366                                    if (datagram.isAckResponse()) {
367                                            Datagram requestDatagram = removeResponseWaitingDatagram(
368                                                    datagram);
369    
370                                            if (requestDatagram == null) {
371                                                    if (_log.isWarnEnabled()) {
372                                                            _log.warn(
373                                                                    "Dropped ownerless ACK response " + datagram);
374                                                    }
375                                            }
376                                            else {
377                                                    CompletionHandler<Object> completionHandler =
378                                                            requestDatagram.completionHandler;
379    
380                                                    completionHandler.delivered(requestDatagram.attachment);
381                                            }
382                                    }
383                                    else if (datagram.isResponse()) {
384                                            Datagram requestDatagram = removeResponseWaitingDatagram(
385                                                    datagram);
386    
387                                            if (requestDatagram == null) {
388                                                    if (_log.isWarnEnabled()) {
389                                                            _log.warn("Dropped ownerless response " + datagram);
390                                                    }
391                                            }
392                                            else {
393                                                    EnumSet<CompletionType> completionTypes =
394                                                            requestDatagram.completionTypes;
395    
396                                                    if (completionTypes.contains(CompletionType.REPLIED)) {
397                                                            CompletionHandler<Object> completionHandler =
398                                                                    requestDatagram.completionHandler;
399    
400                                                            completionHandler.replied(
401                                                                    requestDatagram.attachment, datagram);
402                                                    }
403                                                    else if (_log.isWarnEnabled()) {
404                                                            _log.warn(
405                                                                    "Dropped unconcerned response " + datagram);
406                                                    }
407                                            }
408                                    }
409                                    else {
410                                            if (datagram.isAckRequest()) {
411                                                    Datagram ackResponseDatagram =
412                                                            Datagram.createACKResponseDatagram(
413                                                                    datagram.getSequenceId());
414    
415                                                    doSendDatagram(
416                                                            channelContext.getRegistrationReference(),
417                                                            ackResponseDatagram);
418                                            }
419    
420                                            int index = datagram.getType() & 0xFF;
421    
422                                            DatagramReceiveHandler datagramReceiveHandler =
423                                                    datagramReceiveHandlersReference.get()[index];
424    
425                                            if (datagramReceiveHandler == null) {
426                                                    if (_log.isWarnEnabled()) {
427                                                            _log.warn("Dropped ownerless request " + datagram);
428                                                    }
429                                            }
430                                            else {
431                                                    try {
432                                                            datagramReceiveHandler.receive(
433                                                                    channelContext.getRegistrationReference(),
434                                                                    datagram);
435                                                    }
436                                                    catch (Throwable t) {
437                                                            _log.error("Unable to dispatch", t);
438                                                    }
439                                            }
440                                    }
441                            }
442                    }
443                    catch (IOException ioe) {
444                            RegistrationReference registrationReference =
445                                    channelContext.getRegistrationReference();
446    
447                            registrationReference.cancelRegistration();
448    
449                            if (_log.isDebugEnabled()) {
450                                    _log.debug(
451                                            "Broken read channel, unregister " + registrationReference,
452                                            ioe);
453                            }
454                            else if (_log.isInfoEnabled()) {
455                                    _log.info(
456                                            "Broken read channel, unregister " + registrationReference);
457                            }
458                    }
459            }
460    
461            protected boolean handleWriting(
462                    GatheringByteChannel gatheringByteChannel,
463                    ChannelContext channelContext) {
464    
465                    Datagram datagram = channelContext.getWritingDatagram();
466    
467                    try {
468                            if (datagram.writeTo(gatheringByteChannel)) {
469                                    channelContext.setWritingDatagram(null);
470    
471                                    EnumSet<CompletionType> completionTypes =
472                                            datagram.completionTypes;
473    
474                                    if (completionTypes != null) {
475                                            if (completionTypes.contains(CompletionType.SUBMITTED)) {
476                                                    CompletionHandler<Object> completeHandler =
477                                                            datagram.completionHandler;
478    
479                                                    completeHandler.submitted(datagram.attachment);
480                                            }
481                                    }
482    
483                                    return true;
484                            }
485                            else {
486                                    return false;
487                            }
488                    }
489                    catch (IOException ioe) {
490                            RegistrationReference registrationReference =
491                                    channelContext.getRegistrationReference();
492    
493                            registrationReference.cancelRegistration();
494    
495                            CompletionHandler<Object> completionHandler =
496                                    datagram.completionHandler;
497    
498                            if (completionHandler != null) {
499                                    completionHandler.failed(datagram.attachment, ioe);
500                            }
501    
502                            if (_log.isDebugEnabled()) {
503                                    _log.debug(
504                                            "Broken write channel, unregister " + registrationReference,
505                                            ioe);
506                            }
507                            else if (_log.isInfoEnabled()) {
508                                    _log.info(
509                                            "Broken write channel, unregister " +
510                                                    registrationReference);
511                            }
512    
513                            return false;
514                    }
515            }
516    
517            protected Datagram removeResponseWaitingDatagram(
518                    Datagram responseDatagram) {
519    
520                    long sequenceId = responseDatagram.getSequenceId();
521    
522                    Datagram requestDatagram = responseWaitingMap.remove(sequenceId);
523    
524                    if (requestDatagram != null) {
525                            timeoutMap.remove(requestDatagram.expireTime);
526                    }
527    
528                    return requestDatagram;
529            }
530    
531            protected static final EnumSet<CompletionType> REPLIED_ENUM_SET =
532                    EnumSet.of(CompletionType.REPLIED);
533    
534            protected final long defaultTimeout;
535            protected final AtomicReference<DatagramReceiveHandler[]>
536                    datagramReceiveHandlersReference =
537                            new AtomicReference<DatagramReceiveHandler[]>(
538                                    new DatagramReceiveHandler[256]);
539            protected volatile boolean open = true;
540            protected final Map<Long, Datagram> responseWaitingMap =
541                    new ConcurrentHashMap<Long, Datagram>();
542            protected final AtomicLong sequenceIdGenerator = new AtomicLong();
543            protected final NavigableMap<Long, Long> timeoutMap =
544                    new ConcurrentSkipListMap<Long, Long>();
545    
546            protected static class SendSyncDatagramCompletionHandler
547                    implements CompletionHandler<Object> {
548    
549                    public void delivered(Object attachment) {
550                    }
551    
552                    public void failed(Object attachment, IOException ioe) {
553    
554                            // Must set before count down to ensure memory visibility
555    
556                            _ioe = ioe;
557    
558                            _countDownLatch.countDown();
559                    }
560    
561                    public void replied(Object attachment, Datagram datagram) {
562    
563                            // Must set before count down to ensure memory visibility
564    
565                            _datagram = datagram;
566    
567                            _countDownLatch.countDown();
568                    }
569    
570                    public void submitted(Object attachment) {
571                    }
572    
573                    public void timeouted(Object attachment) {
574                    }
575    
576                    public Datagram waitResult(long timeout)
577                            throws InterruptedException, IOException, TimeoutException {
578    
579                            boolean result = _countDownLatch.await(
580                                    timeout, TimeUnit.MILLISECONDS);
581    
582                            if (!result) {
583                                    throw new TimeoutException("Result waiting timeout");
584                            }
585    
586                            if (_ioe != null) {
587                                    throw _ioe;
588                            }
589    
590                            return _datagram;
591                    }
592    
593                    private final CountDownLatch _countDownLatch = new CountDownLatch(1);
594                    private Datagram _datagram;
595                    private IOException _ioe;
596    
597            }
598    
599            private static Log _log = LogFactoryUtil.getLog(BaseIntraBand.class);
600    
601    }