001
014
015 package com.liferay.portal.kernel.nio.intraband;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
020
021 import java.io.IOException;
022
023 import java.nio.channels.GatheringByteChannel;
024 import java.nio.channels.ScatteringByteChannel;
025
026 import java.util.EnumSet;
027 import java.util.Iterator;
028 import java.util.Map;
029 import java.util.NavigableMap;
030 import java.util.Set;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.ConcurrentSkipListMap;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.TimeoutException;
036 import java.util.concurrent.atomic.AtomicLong;
037 import java.util.concurrent.atomic.AtomicReference;
038
039
042 public abstract class BaseIntraband implements Intraband {
043
044 public BaseIntraband(long defaultTimeout) {
045 this.defaultTimeout = defaultTimeout;
046 }
047
048 @Override
049 @SuppressWarnings("unused")
050 public void close() throws InterruptedException, IOException {
051 datagramReceiveHandlersReference.set(null);
052
053 open = false;
054 }
055
056 @Override
057 public DatagramReceiveHandler[] getDatagramReceiveHandlers() {
058 ensureOpen();
059
060 DatagramReceiveHandler[] datagramReceiveHandlers =
061 datagramReceiveHandlersReference.get();
062
063 return datagramReceiveHandlers.clone();
064 }
065
066 @Override
067 public boolean isOpen() {
068 return open;
069 }
070
071 @Override
072 public DatagramReceiveHandler registerDatagramReceiveHandler(
073 byte type, DatagramReceiveHandler datagramReceiveHandler) {
074
075 ensureOpen();
076
077 int index = type & 0xFF;
078
079 DatagramReceiveHandler oldDatagramReceiveHandler = null;
080 DatagramReceiveHandler[] datagramReceiveHandlers = null;
081 DatagramReceiveHandler[] copyDatagramReceiveHandlers = null;
082
083 do {
084 datagramReceiveHandlers = datagramReceiveHandlersReference.get();
085
086 copyDatagramReceiveHandlers = datagramReceiveHandlers.clone();
087
088 oldDatagramReceiveHandler = copyDatagramReceiveHandlers[index];
089
090 copyDatagramReceiveHandlers[index] = datagramReceiveHandler;
091 }
092 while (!datagramReceiveHandlersReference.compareAndSet(
093 datagramReceiveHandlers, copyDatagramReceiveHandlers));
094
095 return oldDatagramReceiveHandler;
096 }
097
098 @Override
099 public void sendDatagram(
100 RegistrationReference registrationReference, Datagram datagram) {
101
102 if (registrationReference == null) {
103 throw new NullPointerException("Registration reference is null");
104 }
105
106 if (!registrationReference.isValid()) {
107 throw new IllegalArgumentException(
108 "Registration reference is invalid");
109 }
110
111 if (datagram == null) {
112 throw new NullPointerException("Datagram is null");
113 }
114
115 ensureOpen();
116
117 doSendDatagram(registrationReference, datagram);
118 }
119
120 @Override
121 public <A> void sendDatagram(
122 RegistrationReference registrationReference, Datagram datagram,
123 A attachment, EnumSet<CompletionHandler.CompletionType> completionTypes,
124 CompletionHandler<A> completionHandler) {
125
126 sendDatagram(
127 registrationReference, datagram, attachment, completionTypes,
128 completionHandler, defaultTimeout, TimeUnit.MILLISECONDS);
129 }
130
131 @Override
132 public <A> void sendDatagram(
133 RegistrationReference registrationReference, Datagram datagram,
134 A attachment, EnumSet<CompletionType> completionTypes,
135 CompletionHandler<A> completionHandler, long timeout,
136 TimeUnit timeUnit) {
137
138 if (registrationReference == null) {
139 throw new NullPointerException("Registration reference is null");
140 }
141
142 if (!registrationReference.isValid()) {
143 throw new IllegalArgumentException(
144 "Registration reference is invalid");
145 }
146
147 if (datagram == null) {
148 throw new NullPointerException("Datagram is null");
149 }
150
151 if (completionTypes == null) {
152 throw new NullPointerException("Completion type set is null");
153 }
154
155 if (completionTypes.isEmpty()) {
156 throw new IllegalArgumentException("Completion type set is empty");
157 }
158
159 if (completionHandler == null) {
160 throw new NullPointerException("Complete handler is null");
161 }
162
163 if (timeUnit == null) {
164 throw new NullPointerException("Time unit is null");
165 }
166
167 if (timeout <= 0) {
168 timeout = defaultTimeout;
169 }
170 else {
171 timeout = timeUnit.toMillis(timeout);
172 }
173
174 ensureOpen();
175
176 datagram.attachment = attachment;
177 datagram.completionHandler =
178 (CompletionHandler<Object>)completionHandler;
179 datagram.completionTypes = completionTypes;
180 datagram.timeout = timeout;
181
182 datagram.setAckRequest(
183 completionTypes.contains(CompletionType.DELIVERED));
184
185 if (datagram.getSequenceId() == 0) {
186 datagram.setSequenceId(generateSequenceId());
187 }
188
189 if (completionTypes.contains(CompletionType.DELIVERED) ||
190 completionTypes.contains(CompletionType.REPLIED)) {
191
192 addResponseWaitingDatagram(datagram);
193 }
194
195 doSendDatagram(registrationReference, datagram);
196 }
197
198 @Override
199 public Datagram sendSyncDatagram(
200 RegistrationReference registrationReference, Datagram datagram)
201 throws InterruptedException, IOException, TimeoutException {
202
203 return sendSyncDatagram(
204 registrationReference, datagram, defaultTimeout,
205 TimeUnit.MILLISECONDS);
206 }
207
208 @Override
209 public Datagram sendSyncDatagram(
210 RegistrationReference registrationReference, Datagram datagram,
211 long timeout, TimeUnit timeUnit)
212 throws InterruptedException, IOException, TimeoutException {
213
214 if (registrationReference == null) {
215 throw new NullPointerException("Registration reference is null");
216 }
217
218 if (!registrationReference.isValid()) {
219 throw new IllegalArgumentException(
220 "Registration reference is invalid");
221 }
222
223 if (datagram == null) {
224 throw new NullPointerException("Datagram is null");
225 }
226
227 if (timeUnit == null) {
228 throw new NullPointerException("Time unit is null");
229 }
230
231 if (timeout <= 0) {
232 timeout = defaultTimeout;
233 }
234 else {
235 timeout = timeUnit.toMillis(timeout);
236 }
237
238 ensureOpen();
239
240 return doSendSyncDatagram(registrationReference, datagram, timeout);
241 }
242
243 @Override
244 public DatagramReceiveHandler unregisterDatagramReceiveHandler(byte type) {
245 return registerDatagramReceiveHandler(type, null);
246 }
247
248 protected void addResponseWaitingDatagram(Datagram requestDatagram) {
249 long sequenceId = requestDatagram.getSequenceId();
250
251 long expireTime = System.currentTimeMillis() + requestDatagram.timeout;
252
253 requestDatagram.expireTime = expireTime;
254
255 responseWaitingMap.put(sequenceId, requestDatagram);
256
257 timeoutMap.put(expireTime, sequenceId);
258 }
259
260 protected void cleanUpTimeoutResponseWaitingDatagrams() {
261 Map<Long, Long> map = timeoutMap.headMap(
262 System.currentTimeMillis(), true);
263
264 if (map.isEmpty()) {
265 return;
266 }
267
268 Set<Map.Entry<Long, Long>> set = map.entrySet();
269
270 Iterator<Map.Entry<Long, Long>> iterator = set.iterator();
271
272 while (iterator.hasNext()) {
273 Map.Entry<Long, Long> entry = iterator.next();
274
275 iterator.remove();
276
277 Long sequenceId = entry.getValue();
278
279 Datagram datagram = responseWaitingMap.remove(sequenceId);
280
281 if (_log.isWarnEnabled()) {
282 _log.warn(
283 "Removed timeout response waiting datagram " + datagram);
284 }
285
286 datagram.completionHandler.timedOut(datagram.attachment);
287 }
288 }
289
290 protected abstract void doSendDatagram(
291 RegistrationReference registrationReference, Datagram datagram);
292
293 protected Datagram doSendSyncDatagram(
294 RegistrationReference registrationReference, Datagram datagram,
295 long timeout)
296 throws InterruptedException, IOException, TimeoutException {
297
298 SendSyncDatagramCompletionHandler sendSyncDatagramCompletionHandler =
299 new SendSyncDatagramCompletionHandler();
300
301 datagram.completionHandler = sendSyncDatagramCompletionHandler;
302 datagram.completionTypes = REPLIED_ENUM_SET;
303 datagram.timeout = timeout;
304
305 if (datagram.getSequenceId() == 0) {
306 datagram.setSequenceId(generateSequenceId());
307 }
308
309 addResponseWaitingDatagram(datagram);
310
311 doSendDatagram(registrationReference, datagram);
312
313 return sendSyncDatagramCompletionHandler.waitResult(timeout);
314 }
315
316 protected void ensureOpen() {
317 if (!isOpen()) {
318 throw new ClosedIntrabandException();
319 }
320 }
321
322 protected long generateSequenceId() {
323 long sequenceId = sequenceIdGenerator.getAndIncrement();
324
325 if (sequenceId < 0) {
326
327
328
329
330
331
332
333
334
335
336
337
338
339 sequenceId += Long.MIN_VALUE;
340 }
341
342 return sequenceId;
343 }
344
345 protected void handleReading(
346 ScatteringByteChannel scatteringByteChannel,
347 ChannelContext channelContext) {
348
349 Datagram datagram = channelContext.getReadingDatagram();
350
351 if (datagram == null) {
352 datagram = Datagram.createReceiveDatagram();
353
354 channelContext.setReadingDatagram(datagram);
355 }
356
357 try {
358 if (datagram.readFrom(scatteringByteChannel)) {
359 channelContext.setReadingDatagram(
360 Datagram.createReceiveDatagram());
361
362 if (datagram.isAckResponse()) {
363 Datagram requestDatagram = removeResponseWaitingDatagram(
364 datagram);
365
366 if (requestDatagram == null) {
367 if (_log.isWarnEnabled()) {
368 _log.warn(
369 "Dropped ownerless ACK response " + datagram);
370 }
371 }
372 else {
373 CompletionHandler<Object> completionHandler =
374 requestDatagram.completionHandler;
375
376 completionHandler.delivered(requestDatagram.attachment);
377 }
378 }
379 else if (datagram.isResponse()) {
380 Datagram requestDatagram = removeResponseWaitingDatagram(
381 datagram);
382
383 if (requestDatagram == null) {
384 if (_log.isWarnEnabled()) {
385 _log.warn("Dropped ownerless response " + datagram);
386 }
387 }
388 else {
389 EnumSet<CompletionType> completionTypes =
390 requestDatagram.completionTypes;
391
392 if (completionTypes.contains(CompletionType.REPLIED)) {
393 CompletionHandler<Object> completionHandler =
394 requestDatagram.completionHandler;
395
396 completionHandler.replied(
397 requestDatagram.attachment, datagram);
398 }
399 else if (_log.isWarnEnabled()) {
400 _log.warn(
401 "Dropped unconcerned response " + datagram);
402 }
403 }
404 }
405 else {
406 if (datagram.isAckRequest()) {
407 Datagram ackResponseDatagram =
408 Datagram.createACKResponseDatagram(
409 datagram.getSequenceId());
410
411 doSendDatagram(
412 channelContext.getRegistrationReference(),
413 ackResponseDatagram);
414 }
415
416 int index = datagram.getType() & 0xFF;
417
418 DatagramReceiveHandler datagramReceiveHandler =
419 datagramReceiveHandlersReference.get()[index];
420
421 if (datagramReceiveHandler == null) {
422 if (_log.isWarnEnabled()) {
423 _log.warn("Dropped ownerless request " + datagram);
424 }
425 }
426 else {
427 try {
428 datagramReceiveHandler.receive(
429 channelContext.getRegistrationReference(),
430 datagram);
431 }
432 catch (Throwable t) {
433 _log.error("Unable to dispatch", t);
434 }
435 }
436 }
437 }
438 }
439 catch (IOException ioe) {
440 RegistrationReference registrationReference =
441 channelContext.getRegistrationReference();
442
443 registrationReference.cancelRegistration();
444
445 if (_log.isDebugEnabled()) {
446 _log.debug(
447 "Broken read channel, unregister " + registrationReference,
448 ioe);
449 }
450 else if (_log.isInfoEnabled()) {
451 _log.info(
452 "Broken read channel, unregister " + registrationReference);
453 }
454 }
455 }
456
457 protected boolean handleWriting(
458 GatheringByteChannel gatheringByteChannel,
459 ChannelContext channelContext) {
460
461 Datagram datagram = channelContext.getWritingDatagram();
462
463 try {
464 if (datagram.writeTo(gatheringByteChannel)) {
465 channelContext.setWritingDatagram(null);
466
467 EnumSet<CompletionType> completionTypes =
468 datagram.completionTypes;
469
470 if (completionTypes != null) {
471 if (completionTypes.contains(CompletionType.SUBMITTED)) {
472 CompletionHandler<Object> completeHandler =
473 datagram.completionHandler;
474
475 completeHandler.submitted(datagram.attachment);
476 }
477 }
478
479 return true;
480 }
481 else {
482 return false;
483 }
484 }
485 catch (IOException ioe) {
486 RegistrationReference registrationReference =
487 channelContext.getRegistrationReference();
488
489 registrationReference.cancelRegistration();
490
491 CompletionHandler<Object> completionHandler =
492 datagram.completionHandler;
493
494 if (completionHandler != null) {
495 completionHandler.failed(datagram.attachment, ioe);
496 }
497
498 if (_log.isDebugEnabled()) {
499 _log.debug(
500 "Broken write channel, unregister " + registrationReference,
501 ioe);
502 }
503 else if (_log.isInfoEnabled()) {
504 _log.info(
505 "Broken write channel, unregister " +
506 registrationReference);
507 }
508
509 return false;
510 }
511 }
512
513 protected Datagram removeResponseWaitingDatagram(
514 Datagram responseDatagram) {
515
516 long sequenceId = responseDatagram.getSequenceId();
517
518 Datagram requestDatagram = responseWaitingMap.remove(sequenceId);
519
520 if (requestDatagram != null) {
521 timeoutMap.remove(requestDatagram.expireTime);
522 }
523
524 return requestDatagram;
525 }
526
527 protected static final EnumSet<CompletionType> REPLIED_ENUM_SET =
528 EnumSet.of(CompletionType.REPLIED);
529
530 protected final AtomicReference<DatagramReceiveHandler[]>
531 datagramReceiveHandlersReference = new AtomicReference<>(
532 new DatagramReceiveHandler[256]);
533 protected final long defaultTimeout;
534 protected volatile boolean open = true;
535 protected final Map<Long, Datagram> responseWaitingMap =
536 new ConcurrentHashMap<>();
537 protected final AtomicLong sequenceIdGenerator = new AtomicLong();
538 protected final NavigableMap<Long, Long> timeoutMap =
539 new ConcurrentSkipListMap<>();
540
541 protected static class SendSyncDatagramCompletionHandler
542 implements CompletionHandler<Object> {
543
544 @Override
545 public void delivered(Object attachment) {
546 }
547
548 @Override
549 public void failed(Object attachment, IOException ioe) {
550
551
552
553 _ioe = ioe;
554
555 _countDownLatch.countDown();
556 }
557
558 @Override
559 public void replied(Object attachment, Datagram datagram) {
560
561
562
563 _datagram = datagram;
564
565 _countDownLatch.countDown();
566 }
567
568 @Override
569 public void submitted(Object attachment) {
570 }
571
572 @Override
573 public void timedOut(Object attachment) {
574 }
575
576 public Datagram waitResult(long timeout)
577 throws InterruptedException, IOException, TimeoutException {
578
579 boolean result = _countDownLatch.await(
580 timeout, TimeUnit.MILLISECONDS);
581
582 if (!result) {
583 throw new TimeoutException("Result waiting timeout");
584 }
585
586 if (_ioe != null) {
587 throw _ioe;
588 }
589
590 return _datagram;
591 }
592
593 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
594 private Datagram _datagram;
595 private IOException _ioe;
596
597 }
598
599 private static final Log _log = LogFactoryUtil.getLog(BaseIntraband.class);
600
601 }