001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.mailbox;
016    
017    import com.liferay.portal.kernel.nio.intraband.Datagram;
018    import com.liferay.portal.kernel.nio.intraband.Intraband;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
021    import com.liferay.portal.kernel.util.GetterUtil;
022    import com.liferay.portal.kernel.util.PropsKeys;
023    import com.liferay.portal.kernel.util.PropsUtil;
024    
025    import java.nio.ByteBuffer;
026    
027    import java.util.Map;
028    import java.util.concurrent.BlockingQueue;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.DelayQueue;
031    import java.util.concurrent.Delayed;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicLong;
034    
035    /**
036     * @author Shuyang Zhou
037     */
038    public class MailboxUtil {
039    
040            public static ByteBuffer receiveMail(long receipt) {
041                    ByteBuffer byteBuffer = _mailMap.remove(receipt);
042    
043                    _overdueMailQueue.remove(new ReceiptStub(receipt));
044    
045                    if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
046                            _pollingCleanup();
047                    }
048    
049                    return byteBuffer;
050            }
051    
052            public static long sendMail(
053                            RegistrationReference registrationReference, ByteBuffer byteBuffer)
054                    throws MailboxException {
055    
056                    Intraband intraband = registrationReference.getIntraband();
057    
058                    try {
059                            SystemDataType systemDataType = SystemDataType.MAILBOX;
060    
061                            Datagram responseDatagram = intraband.sendSyncDatagram(
062                                    registrationReference,
063                                    Datagram.createRequestDatagram(
064                                            systemDataType.getValue(), byteBuffer));
065    
066                            byteBuffer = responseDatagram.getDataByteBuffer();
067    
068                            return byteBuffer.getLong();
069                    }
070                    catch (Exception e) {
071                            throw new MailboxException(e);
072                    }
073            }
074    
075            protected static long depositMail(ByteBuffer byteBuffer) {
076                    long receipt = _receiptGenerator.getAndIncrement();
077    
078                    _mailMap.put(receipt, byteBuffer);
079    
080                    _overdueMailQueue.offer(new ReceiptStub(receipt, System.nanoTime()));
081    
082                    if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
083                            _pollingCleanup();
084                    }
085    
086                    return receipt;
087            }
088    
089            private static void _pollingCleanup() {
090                    ReceiptStub receiptStub = null;
091    
092                    while ((receiptStub = _overdueMailQueue.poll()) != null) {
093                            _mailMap.remove(receiptStub.getReceipt());
094                    }
095            }
096    
097            private static final boolean _INTRABAND_MAILBOX_REAPER_THREAD_ENABLED =
098                    GetterUtil.getBoolean(
099                            PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_REAPER_THREAD_ENABLED));
100    
101            private static final long _INTRABAND_MAILBOX_STORAGE_LIFE =
102                    GetterUtil.getLong(
103                            PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_STORAGE_LIFE));
104    
105            private static final Map<Long, ByteBuffer> _mailMap =
106                    new ConcurrentHashMap<>();
107            private static final BlockingQueue<ReceiptStub> _overdueMailQueue =
108                    new DelayQueue<>();
109            private static final AtomicLong _receiptGenerator = new AtomicLong();
110    
111            static {
112                    if (_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
113                            Thread thread = new OverdueMailReaperThread(
114                                    MailboxUtil.class.getName());
115    
116                            thread.setContextClassLoader(MailboxUtil.class.getClassLoader());
117                            thread.setDaemon(true);
118    
119                            thread.start();
120                    }
121            }
122    
123            private static class OverdueMailReaperThread extends Thread {
124    
125                    public OverdueMailReaperThread(String name) {
126                            super(name);
127                    }
128    
129                    @Override
130                    public void run() {
131                            while (true) {
132                                    try {
133                                            ReceiptStub receiptStub = _overdueMailQueue.take();
134    
135                                            _mailMap.remove(receiptStub.getReceipt());
136                                    }
137                                    catch (InterruptedException ie) {
138                                    }
139                            }
140                    }
141    
142            }
143    
144            private static class ReceiptStub implements Delayed {
145    
146                    public ReceiptStub(long receipt) {
147                            this(receipt, -1);
148                    }
149    
150                    public ReceiptStub(long receipt, long currentNanoTime) {
151                            long expireTime = currentNanoTime;
152    
153                            expireTime += TimeUnit.MILLISECONDS.toNanos(
154                                    _INTRABAND_MAILBOX_STORAGE_LIFE);
155    
156                            _expireTime = expireTime;
157    
158                            _receipt = receipt;
159                    }
160    
161                    @Override
162                    public int compareTo(Delayed delayed) {
163                            ReceiptStub receiptStub = (ReceiptStub)delayed;
164    
165                            return (int)(_expireTime - receiptStub._expireTime);
166                    }
167    
168                    @Override
169                    public boolean equals(Object obj) {
170                            ReceiptStub receiptStub = (ReceiptStub)obj;
171    
172                            return _receipt == receiptStub._receipt;
173                    }
174    
175                    @Override
176                    public long getDelay(TimeUnit unit) {
177                            return _expireTime - System.nanoTime();
178                    }
179    
180                    public long getReceipt() {
181                            return _receipt;
182                    }
183    
184                    @Override
185                    public int hashCode() {
186                            return (int)_receipt;
187                    }
188    
189                    private final long _expireTime;
190                    private final long _receipt;
191    
192            }
193    
194    }