001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
020    
021    import java.io.IOException;
022    
023    import java.nio.channels.GatheringByteChannel;
024    import java.nio.channels.ScatteringByteChannel;
025    
026    import java.util.EnumSet;
027    import java.util.Iterator;
028    import java.util.Map;
029    import java.util.NavigableMap;
030    import java.util.Set;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.ConcurrentSkipListMap;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.TimeoutException;
036    import java.util.concurrent.atomic.AtomicLong;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    /**
040     * @author Shuyang Zhou
041     */
042    public abstract class BaseIntraband implements Intraband {
043    
044            public BaseIntraband(long defaultTimeout) {
045                    this.defaultTimeout = defaultTimeout;
046            }
047    
048            @Override
049            @SuppressWarnings("unused")
050            public void close() throws InterruptedException, IOException {
051                    datagramReceiveHandlersReference.set(null);
052    
053                    open = false;
054            }
055    
056            @Override
057            public DatagramReceiveHandler[] getDatagramReceiveHandlers() {
058                    ensureOpen();
059    
060                    DatagramReceiveHandler[] datagramReceiveHandlers =
061                            datagramReceiveHandlersReference.get();
062    
063                    return datagramReceiveHandlers.clone();
064            }
065    
066            @Override
067            public boolean isOpen() {
068                    return open;
069            }
070    
071            @Override
072            public DatagramReceiveHandler registerDatagramReceiveHandler(
073                    byte type, DatagramReceiveHandler datagramReceiveHandler) {
074    
075                    ensureOpen();
076    
077                    int index = type & 0xFF;
078    
079                    DatagramReceiveHandler oldDatagramReceiveHandler = null;
080                    DatagramReceiveHandler[] datagramReceiveHandlers = null;
081                    DatagramReceiveHandler[] copyDatagramReceiveHandlers = null;
082    
083                    do {
084                            datagramReceiveHandlers = datagramReceiveHandlersReference.get();
085    
086                            copyDatagramReceiveHandlers = datagramReceiveHandlers.clone();
087    
088                            oldDatagramReceiveHandler = copyDatagramReceiveHandlers[index];
089    
090                            copyDatagramReceiveHandlers[index] = datagramReceiveHandler;
091                    }
092                    while (!datagramReceiveHandlersReference.compareAndSet(
093                                            datagramReceiveHandlers, copyDatagramReceiveHandlers));
094    
095                    return oldDatagramReceiveHandler;
096            }
097    
098            @Override
099            public void sendDatagram(
100                    RegistrationReference registrationReference, Datagram datagram) {
101    
102                    if (registrationReference == null) {
103                            throw new NullPointerException("Registration reference is null");
104                    }
105    
106                    if (!registrationReference.isValid()) {
107                            throw new IllegalArgumentException(
108                                    "Registration reference is invalid");
109                    }
110    
111                    if (datagram == null) {
112                            throw new NullPointerException("Datagram is null");
113                    }
114    
115                    ensureOpen();
116    
117                    doSendDatagram(registrationReference, datagram);
118            }
119    
120            @Override
121            public <A> void sendDatagram(
122                    RegistrationReference registrationReference, Datagram datagram,
123                    A attachment, EnumSet<CompletionHandler.CompletionType> completionTypes,
124                    CompletionHandler<A> completionHandler) {
125    
126                    sendDatagram(
127                            registrationReference, datagram, attachment, completionTypes,
128                            completionHandler, defaultTimeout, TimeUnit.MILLISECONDS);
129            }
130    
131            @Override
132            public <A> void sendDatagram(
133                    RegistrationReference registrationReference, Datagram datagram,
134                    A attachment, EnumSet<CompletionType> completionTypes,
135                    CompletionHandler<A> completionHandler, long timeout,
136                    TimeUnit timeUnit) {
137    
138                    if (registrationReference == null) {
139                            throw new NullPointerException("Registration reference is null");
140                    }
141    
142                    if (!registrationReference.isValid()) {
143                            throw new IllegalArgumentException(
144                                    "Registration reference is invalid");
145                    }
146    
147                    if (datagram == null) {
148                            throw new NullPointerException("Datagram is null");
149                    }
150    
151                    if (completionTypes == null) {
152                            throw new NullPointerException("Completion type set is null");
153                    }
154    
155                    if (completionTypes.isEmpty()) {
156                            throw new IllegalArgumentException("Completion type set is empty");
157                    }
158    
159                    if (completionHandler == null) {
160                            throw new NullPointerException("Complete handler is null");
161                    }
162    
163                    if (timeUnit == null) {
164                            throw new NullPointerException("Time unit is null");
165                    }
166    
167                    if (timeout <= 0) {
168                            timeout = defaultTimeout;
169                    }
170                    else {
171                            timeout = timeUnit.toMillis(timeout);
172                    }
173    
174                    ensureOpen();
175    
176                    datagram.attachment = attachment;
177                    datagram.completionHandler =
178                            (CompletionHandler<Object>)completionHandler;
179                    datagram.completionTypes = completionTypes;
180                    datagram.timeout = timeout;
181    
182                    datagram.setAckRequest(
183                            completionTypes.contains(CompletionType.DELIVERED));
184    
185                    if (datagram.getSequenceId() == 0) {
186                            datagram.setSequenceId(generateSequenceId());
187                    }
188    
189                    if (completionTypes.contains(CompletionType.DELIVERED) ||
190                            completionTypes.contains(CompletionType.REPLIED)) {
191    
192                            addResponseWaitingDatagram(datagram);
193                    }
194    
195                    doSendDatagram(registrationReference, datagram);
196            }
197    
198            @Override
199            public Datagram sendSyncDatagram(
200                            RegistrationReference registrationReference, Datagram datagram)
201                    throws InterruptedException, IOException, TimeoutException {
202    
203                    return sendSyncDatagram(
204                            registrationReference, datagram, defaultTimeout,
205                            TimeUnit.MILLISECONDS);
206            }
207    
208            @Override
209            public Datagram sendSyncDatagram(
210                            RegistrationReference registrationReference, Datagram datagram,
211                            long timeout, TimeUnit timeUnit)
212                    throws InterruptedException, IOException, TimeoutException {
213    
214                    if (registrationReference == null) {
215                            throw new NullPointerException("Registration reference is null");
216                    }
217    
218                    if (!registrationReference.isValid()) {
219                            throw new IllegalArgumentException(
220                                    "Registration reference is invalid");
221                    }
222    
223                    if (datagram == null) {
224                            throw new NullPointerException("Datagram is null");
225                    }
226    
227                    if (timeUnit == null) {
228                            throw new NullPointerException("Time unit is null");
229                    }
230    
231                    if (timeout <= 0) {
232                            timeout = defaultTimeout;
233                    }
234                    else {
235                            timeout = timeUnit.toMillis(timeout);
236                    }
237    
238                    ensureOpen();
239    
240                    return doSendSyncDatagram(registrationReference, datagram, timeout);
241            }
242    
243            @Override
244            public DatagramReceiveHandler unregisterDatagramReceiveHandler(byte type) {
245                    return registerDatagramReceiveHandler(type, null);
246            }
247    
248            protected void addResponseWaitingDatagram(Datagram requestDatagram) {
249                    long sequenceId = requestDatagram.getSequenceId();
250    
251                    long expireTime = System.currentTimeMillis() + requestDatagram.timeout;
252    
253                    requestDatagram.expireTime = expireTime;
254    
255                    responseWaitingMap.put(sequenceId, requestDatagram);
256    
257                    timeoutMap.put(expireTime, sequenceId);
258            }
259    
260            protected void cleanUpTimeoutResponseWaitingDatagrams() {
261                    Map<Long, Long> map = timeoutMap.headMap(
262                            System.currentTimeMillis(), true);
263    
264                    if (map.isEmpty()) {
265                            return;
266                    }
267    
268                    Set<Map.Entry<Long, Long>> set = map.entrySet();
269    
270                    Iterator<Map.Entry<Long, Long>> iterator = set.iterator();
271    
272                    while (iterator.hasNext()) {
273                            Map.Entry<Long, Long> entry = iterator.next();
274    
275                            iterator.remove();
276    
277                            Long sequenceId = entry.getValue();
278    
279                            Datagram datagram = responseWaitingMap.remove(sequenceId);
280    
281                            if (_log.isWarnEnabled()) {
282                                    _log.warn(
283                                            "Removed timeout response waiting datagram " + datagram);
284                            }
285    
286                            datagram.completionHandler.timedOut(datagram.attachment);
287                    }
288            }
289    
290            protected abstract void doSendDatagram(
291                    RegistrationReference registrationReference, Datagram datagram);
292    
293            protected Datagram doSendSyncDatagram(
294                            RegistrationReference registrationReference, Datagram datagram,
295                            long timeout)
296                    throws InterruptedException, IOException, TimeoutException {
297    
298                    SendSyncDatagramCompletionHandler sendSyncDatagramCompletionHandler =
299                            new SendSyncDatagramCompletionHandler();
300    
301                    datagram.completionHandler = sendSyncDatagramCompletionHandler;
302                    datagram.completionTypes = REPLIED_ENUM_SET;
303                    datagram.timeout = timeout;
304    
305                    if (datagram.getSequenceId() == 0) {
306                            datagram.setSequenceId(generateSequenceId());
307                    }
308    
309                    addResponseWaitingDatagram(datagram);
310    
311                    doSendDatagram(registrationReference, datagram);
312    
313                    return sendSyncDatagramCompletionHandler.waitResult(timeout);
314            }
315    
316            protected void ensureOpen() {
317                    if (!isOpen()) {
318                            throw new ClosedIntrabandException();
319                    }
320            }
321    
322            protected long generateSequenceId() {
323                    long sequenceId = sequenceIdGenerator.getAndIncrement();
324    
325                    if (sequenceId < 0) {
326    
327                            // We assume a long primitive type can hold enough numbers to keep a
328                            // large window time between the earliest and the latest response
329                            // waiting datagrams. In a real system, we will run out of memory
330                            // long before the latest response waiting datagram's ID can catch
331                            // up to the earliest response waiting datagram's ID to cause an ID
332                            // conflict. Even if the sequence ID generator was the only code to
333                            // use memory (which will never be true), to see an ID conflict, we
334                            // need to hold up 2^63 references. Even if we did not factor in the
335                            // data inside the datagram, and considered just the references
336                            // themselves, we would need 2^65 byte or 32 EB (exbibyte) of
337                            // memory, which is impossible in existing computer systems.
338    
339                            sequenceId += Long.MIN_VALUE;
340                    }
341    
342                    return sequenceId;
343            }
344    
345            protected void handleReading(
346                    ScatteringByteChannel scatteringByteChannel,
347                    ChannelContext channelContext) {
348    
349                    Datagram datagram = channelContext.getReadingDatagram();
350    
351                    if (datagram == null) {
352                            datagram = Datagram.createReceiveDatagram();
353    
354                            channelContext.setReadingDatagram(datagram);
355                    }
356    
357                    try {
358                            if (datagram.readFrom(scatteringByteChannel)) {
359                                    channelContext.setReadingDatagram(
360                                            Datagram.createReceiveDatagram());
361    
362                                    if (datagram.isAckResponse()) {
363                                            Datagram requestDatagram = removeResponseWaitingDatagram(
364                                                    datagram);
365    
366                                            if (requestDatagram == null) {
367                                                    if (_log.isWarnEnabled()) {
368                                                            _log.warn(
369                                                                    "Dropped ownerless ACK response " + datagram);
370                                                    }
371                                            }
372                                            else {
373                                                    CompletionHandler<Object> completionHandler =
374                                                            requestDatagram.completionHandler;
375    
376                                                    completionHandler.delivered(requestDatagram.attachment);
377                                            }
378                                    }
379                                    else if (datagram.isResponse()) {
380                                            Datagram requestDatagram = removeResponseWaitingDatagram(
381                                                    datagram);
382    
383                                            if (requestDatagram == null) {
384                                                    if (_log.isWarnEnabled()) {
385                                                            _log.warn("Dropped ownerless response " + datagram);
386                                                    }
387                                            }
388                                            else {
389                                                    EnumSet<CompletionType> completionTypes =
390                                                            requestDatagram.completionTypes;
391    
392                                                    if (completionTypes.contains(CompletionType.REPLIED)) {
393                                                            CompletionHandler<Object> completionHandler =
394                                                                    requestDatagram.completionHandler;
395    
396                                                            completionHandler.replied(
397                                                                    requestDatagram.attachment, datagram);
398                                                    }
399                                                    else if (_log.isWarnEnabled()) {
400                                                            _log.warn(
401                                                                    "Dropped unconcerned response " + datagram);
402                                                    }
403                                            }
404                                    }
405                                    else {
406                                            if (datagram.isAckRequest()) {
407                                                    Datagram ackResponseDatagram =
408                                                            Datagram.createACKResponseDatagram(
409                                                                    datagram.getSequenceId());
410    
411                                                    doSendDatagram(
412                                                            channelContext.getRegistrationReference(),
413                                                            ackResponseDatagram);
414                                            }
415    
416                                            int index = datagram.getType() & 0xFF;
417    
418                                            DatagramReceiveHandler datagramReceiveHandler =
419                                                    datagramReceiveHandlersReference.get()[index];
420    
421                                            if (datagramReceiveHandler == null) {
422                                                    if (_log.isWarnEnabled()) {
423                                                            _log.warn("Dropped ownerless request " + datagram);
424                                                    }
425                                            }
426                                            else {
427                                                    try {
428                                                            datagramReceiveHandler.receive(
429                                                                    channelContext.getRegistrationReference(),
430                                                                    datagram);
431                                                    }
432                                                    catch (Throwable t) {
433                                                            _log.error("Unable to dispatch", t);
434                                                    }
435                                            }
436                                    }
437                            }
438                    }
439                    catch (IOException ioe) {
440                            RegistrationReference registrationReference =
441                                    channelContext.getRegistrationReference();
442    
443                            registrationReference.cancelRegistration();
444    
445                            if (_log.isDebugEnabled()) {
446                                    _log.debug(
447                                            "Broken read channel, unregister " + registrationReference,
448                                            ioe);
449                            }
450                            else if (_log.isInfoEnabled()) {
451                                    _log.info(
452                                            "Broken read channel, unregister " + registrationReference);
453                            }
454                    }
455            }
456    
457            protected boolean handleWriting(
458                    GatheringByteChannel gatheringByteChannel,
459                    ChannelContext channelContext) {
460    
461                    Datagram datagram = channelContext.getWritingDatagram();
462    
463                    try {
464                            if (datagram.writeTo(gatheringByteChannel)) {
465                                    channelContext.setWritingDatagram(null);
466    
467                                    EnumSet<CompletionType> completionTypes =
468                                            datagram.completionTypes;
469    
470                                    if (completionTypes != null) {
471                                            if (completionTypes.contains(CompletionType.SUBMITTED)) {
472                                                    CompletionHandler<Object> completeHandler =
473                                                            datagram.completionHandler;
474    
475                                                    completeHandler.submitted(datagram.attachment);
476                                            }
477                                    }
478    
479                                    return true;
480                            }
481                            else {
482                                    return false;
483                            }
484                    }
485                    catch (IOException ioe) {
486                            RegistrationReference registrationReference =
487                                    channelContext.getRegistrationReference();
488    
489                            registrationReference.cancelRegistration();
490    
491                            CompletionHandler<Object> completionHandler =
492                                    datagram.completionHandler;
493    
494                            if (completionHandler != null) {
495                                    completionHandler.failed(datagram.attachment, ioe);
496                            }
497    
498                            if (_log.isDebugEnabled()) {
499                                    _log.debug(
500                                            "Broken write channel, unregister " + registrationReference,
501                                            ioe);
502                            }
503                            else if (_log.isInfoEnabled()) {
504                                    _log.info(
505                                            "Broken write channel, unregister " +
506                                                    registrationReference);
507                            }
508    
509                            return false;
510                    }
511            }
512    
513            protected Datagram removeResponseWaitingDatagram(
514                    Datagram responseDatagram) {
515    
516                    long sequenceId = responseDatagram.getSequenceId();
517    
518                    Datagram requestDatagram = responseWaitingMap.remove(sequenceId);
519    
520                    if (requestDatagram != null) {
521                            timeoutMap.remove(requestDatagram.expireTime);
522                    }
523    
524                    return requestDatagram;
525            }
526    
527            protected static final EnumSet<CompletionType> REPLIED_ENUM_SET =
528                    EnumSet.of(CompletionType.REPLIED);
529    
530            protected final AtomicReference<DatagramReceiveHandler[]>
531                    datagramReceiveHandlersReference = new AtomicReference<>(
532                            new DatagramReceiveHandler[256]);
533            protected final long defaultTimeout;
534            protected volatile boolean open = true;
535            protected final Map<Long, Datagram> responseWaitingMap =
536                    new ConcurrentHashMap<>();
537            protected final AtomicLong sequenceIdGenerator = new AtomicLong();
538            protected final NavigableMap<Long, Long> timeoutMap =
539                    new ConcurrentSkipListMap<>();
540    
541            protected static class SendSyncDatagramCompletionHandler
542                    implements CompletionHandler<Object> {
543    
544                    @Override
545                    public void delivered(Object attachment) {
546                    }
547    
548                    @Override
549                    public void failed(Object attachment, IOException ioe) {
550    
551                            // Must set before count down to ensure memory visibility
552    
553                            _ioe = ioe;
554    
555                            _countDownLatch.countDown();
556                    }
557    
558                    @Override
559                    public void replied(Object attachment, Datagram datagram) {
560    
561                            // Must set before count down to ensure memory visibility
562    
563                            _datagram = datagram;
564    
565                            _countDownLatch.countDown();
566                    }
567    
568                    @Override
569                    public void submitted(Object attachment) {
570                    }
571    
572                    @Override
573                    public void timedOut(Object attachment) {
574                    }
575    
576                    public Datagram waitResult(long timeout)
577                            throws InterruptedException, IOException, TimeoutException {
578    
579                            boolean result = _countDownLatch.await(
580                                    timeout, TimeUnit.MILLISECONDS);
581    
582                            if (!result) {
583                                    throw new TimeoutException("Result waiting timeout");
584                            }
585    
586                            if (_ioe != null) {
587                                    throw _ioe;
588                            }
589    
590                            return _datagram;
591                    }
592    
593                    private final CountDownLatch _countDownLatch = new CountDownLatch(1);
594                    private Datagram _datagram;
595                    private IOException _ioe;
596    
597            }
598    
599            private static final Log _log = LogFactoryUtil.getLog(BaseIntraband.class);
600    
601    }