001
014
015 package com.liferay.portal.kernel.nio.intraband.mailbox;
016
017 import com.liferay.portal.kernel.nio.intraband.Datagram;
018 import com.liferay.portal.kernel.nio.intraband.Intraband;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
021 import com.liferay.portal.kernel.util.GetterUtil;
022 import com.liferay.portal.kernel.util.PropsKeys;
023 import com.liferay.portal.kernel.util.PropsUtil;
024
025 import java.nio.ByteBuffer;
026
027 import java.util.Map;
028 import java.util.concurrent.BlockingQueue;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.DelayQueue;
031 import java.util.concurrent.Delayed;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicLong;
034
035
038 public class MailboxUtil {
039
040 public static ByteBuffer receiveMail(long receipt) {
041 ByteBuffer byteBuffer = _mailMap.remove(receipt);
042
043 _overdueMailQueue.remove(new ReceiptStub(receipt));
044
045 if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
046 _pollingCleanup();
047 }
048
049 return byteBuffer;
050 }
051
052 public static long sendMail(
053 RegistrationReference registrationReference, ByteBuffer byteBuffer)
054 throws MailboxException {
055
056 Intraband intraband = registrationReference.getIntraband();
057
058 try {
059 SystemDataType systemDataType = SystemDataType.MAILBOX;
060
061 Datagram responseDatagram = intraband.sendSyncDatagram(
062 registrationReference,
063 Datagram.createRequestDatagram(
064 systemDataType.getValue(), byteBuffer));
065
066 byteBuffer = responseDatagram.getDataByteBuffer();
067
068 return byteBuffer.getLong();
069 }
070 catch (Exception e) {
071 throw new MailboxException(e);
072 }
073 }
074
075 protected static long depositMail(ByteBuffer byteBuffer) {
076 long receipt = _receiptGenerator.getAndIncrement();
077
078 _mailMap.put(receipt, byteBuffer);
079
080 _overdueMailQueue.offer(new ReceiptStub(receipt, System.nanoTime()));
081
082 if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
083 _pollingCleanup();
084 }
085
086 return receipt;
087 }
088
089 private static void _pollingCleanup() {
090 ReceiptStub receiptStub = null;
091
092 while ((receiptStub = _overdueMailQueue.poll()) != null) {
093 _mailMap.remove(receiptStub.getReceipt());
094 }
095 }
096
097 private static final boolean _INTRABAND_MAILBOX_REAPER_THREAD_ENABLED =
098 GetterUtil.getBoolean(
099 PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_REAPER_THREAD_ENABLED));
100
101 private static final long _INTRABAND_MAILBOX_STORAGE_LIFE =
102 GetterUtil.getLong(
103 PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_STORAGE_LIFE));
104
105 private static final Map<Long, ByteBuffer> _mailMap =
106 new ConcurrentHashMap<>();
107 private static final BlockingQueue<ReceiptStub> _overdueMailQueue =
108 new DelayQueue<>();
109 private static final AtomicLong _receiptGenerator = new AtomicLong();
110
111 static {
112 if (_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
113 Thread thread = new OverdueMailReaperThread(
114 MailboxUtil.class.getName());
115
116 thread.setContextClassLoader(MailboxUtil.class.getClassLoader());
117 thread.setDaemon(true);
118
119 thread.start();
120 }
121 }
122
123 private static class OverdueMailReaperThread extends Thread {
124
125 public OverdueMailReaperThread(String name) {
126 super(name);
127 }
128
129 @Override
130 public void run() {
131 while (true) {
132 try {
133 ReceiptStub receiptStub = _overdueMailQueue.take();
134
135 _mailMap.remove(receiptStub.getReceipt());
136 }
137 catch (InterruptedException ie) {
138 }
139 }
140 }
141
142 }
143
144 private static class ReceiptStub implements Delayed {
145
146 public ReceiptStub(long receipt) {
147 this(receipt, -1);
148 }
149
150 public ReceiptStub(long receipt, long currentNanoTime) {
151 long expireTime = currentNanoTime;
152
153 expireTime += TimeUnit.MILLISECONDS.toNanos(
154 _INTRABAND_MAILBOX_STORAGE_LIFE);
155
156 _expireTime = expireTime;
157
158 _receipt = receipt;
159 }
160
161 @Override
162 public int compareTo(Delayed delayed) {
163 ReceiptStub receiptStub = (ReceiptStub)delayed;
164
165 return (int)(_expireTime - receiptStub._expireTime);
166 }
167
168 @Override
169 public boolean equals(Object obj) {
170 ReceiptStub receiptStub = (ReceiptStub)obj;
171
172 return _receipt == receiptStub._receipt;
173 }
174
175 @Override
176 public long getDelay(TimeUnit unit) {
177 return _expireTime - System.nanoTime();
178 }
179
180 public long getReceipt() {
181 return _receipt;
182 }
183
184 @Override
185 public int hashCode() {
186 return (int)_receipt;
187 }
188
189 private final long _expireTime;
190 private final long _receipt;
191
192 }
193
194 }