001
014
015 package com.liferay.portal.kernel.nio.intraband;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
020
021 import java.io.IOException;
022
023 import java.nio.channels.GatheringByteChannel;
024 import java.nio.channels.ScatteringByteChannel;
025
026 import java.util.EnumSet;
027 import java.util.Iterator;
028 import java.util.Map;
029 import java.util.NavigableMap;
030 import java.util.Set;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.ConcurrentSkipListMap;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.TimeoutException;
036 import java.util.concurrent.atomic.AtomicLong;
037 import java.util.concurrent.atomic.AtomicReference;
038
039
042 public abstract class BaseIntraBand implements IntraBand {
043
044 public BaseIntraBand(long defaultTimeout) {
045 this.defaultTimeout = defaultTimeout;
046 }
047
048 @SuppressWarnings("unused")
049 public void close() throws InterruptedException, IOException {
050 datagramReceiveHandlersReference.set(null);
051
052 open = false;
053 }
054
055 public DatagramReceiveHandler[] getDatagramReceiveHandlers() {
056 ensureOpen();
057
058 DatagramReceiveHandler[] datagramReceiveHandlers =
059 datagramReceiveHandlersReference.get();
060
061 return datagramReceiveHandlers.clone();
062 }
063
064 public boolean isOpen() {
065 return open;
066 }
067
068 public DatagramReceiveHandler registerDatagramReceiveHandler(
069 byte type, DatagramReceiveHandler datagramReceiveHandler) {
070
071 ensureOpen();
072
073 int index = type & 0xFF;
074
075 DatagramReceiveHandler oldDatagramReceiveHandler = null;
076 DatagramReceiveHandler[] datagramReceiveHandlers = null;
077 DatagramReceiveHandler[] copyDatagramReceiveHandlers = null;
078
079 do {
080 datagramReceiveHandlers = datagramReceiveHandlersReference.get();
081
082 copyDatagramReceiveHandlers = datagramReceiveHandlers.clone();
083
084 oldDatagramReceiveHandler = copyDatagramReceiveHandlers[index];
085
086 copyDatagramReceiveHandlers[index] = datagramReceiveHandler;
087 }
088 while (
089 !datagramReceiveHandlersReference.compareAndSet(
090 datagramReceiveHandlers, copyDatagramReceiveHandlers));
091
092 return oldDatagramReceiveHandler;
093 }
094
095 public void sendDatagram(
096 RegistrationReference registrationReference, Datagram datagram) {
097
098 if (registrationReference == null) {
099 throw new NullPointerException("Registration reference is null");
100 }
101
102 if (!registrationReference.isValid()) {
103 throw new IllegalArgumentException(
104 "Registration reference is invalid");
105 }
106
107 if (datagram == null) {
108 throw new NullPointerException("Datagram is null");
109 }
110
111 ensureOpen();
112
113 doSendDatagram(registrationReference, datagram);
114 }
115
116 public <A> void sendDatagram(
117 RegistrationReference registrationReference, Datagram datagram,
118 A attachment, EnumSet<CompletionHandler.CompletionType> completionTypes,
119 CompletionHandler<A> completionHandler) {
120
121 sendDatagram(
122 registrationReference, datagram, attachment, completionTypes,
123 completionHandler, defaultTimeout, TimeUnit.MILLISECONDS);
124 }
125
126 public <A> void sendDatagram(
127 RegistrationReference registrationReference, Datagram datagram,
128 A attachment, EnumSet<CompletionType> completionTypes,
129 CompletionHandler<A> completionHandler, long timeout,
130 TimeUnit timeUnit) {
131
132 if (registrationReference == null) {
133 throw new NullPointerException("Registration reference is null");
134 }
135
136 if (!registrationReference.isValid()) {
137 throw new IllegalArgumentException(
138 "Registration reference is invalid");
139 }
140
141 if (datagram == null) {
142 throw new NullPointerException("Datagram is null");
143 }
144
145 if (completionTypes == null) {
146 throw new NullPointerException("Completion type set is null");
147 }
148
149 if (completionTypes.isEmpty()) {
150 throw new IllegalArgumentException("Completion type set is empty");
151 }
152
153 if (completionHandler == null) {
154 throw new NullPointerException("Complete handler is null");
155 }
156
157 if (timeUnit == null) {
158 throw new NullPointerException("Time unit is null");
159 }
160
161 if (timeout <= 0) {
162 timeout = defaultTimeout;
163 }
164 else {
165 timeout = timeUnit.toMillis(timeout);
166 }
167
168 ensureOpen();
169
170 datagram.attachment = attachment;
171 datagram.completionHandler =
172 (CompletionHandler<Object>)completionHandler;
173 datagram.completionTypes = completionTypes;
174 datagram.timeout = timeout;
175
176 datagram.setAckRequest(
177 completionTypes.contains(CompletionType.DELIVERED));
178
179 if (datagram.getSequenceId() == 0) {
180 datagram.setSequenceId(generateSequenceId());
181 }
182
183 if (completionTypes.contains(CompletionType.DELIVERED) ||
184 completionTypes.contains(CompletionType.REPLIED)) {
185
186 addResponseWaitingDatagram(datagram);
187 }
188
189 doSendDatagram(registrationReference, datagram);
190 }
191
192 public Datagram sendSyncDatagram(
193 RegistrationReference registrationReference, Datagram datagram)
194 throws InterruptedException, IOException, TimeoutException {
195
196 return sendSyncDatagram(
197 registrationReference, datagram, defaultTimeout,
198 TimeUnit.MILLISECONDS);
199 }
200
201 public Datagram sendSyncDatagram(
202 RegistrationReference registrationReference, Datagram datagram,
203 long timeout, TimeUnit timeUnit)
204 throws InterruptedException, IOException, TimeoutException {
205
206 if (registrationReference == null) {
207 throw new NullPointerException("Registration reference is null");
208 }
209
210 if (!registrationReference.isValid()) {
211 throw new IllegalArgumentException(
212 "Registration reference is invalid");
213 }
214
215 if (datagram == null) {
216 throw new NullPointerException("Datagram is null");
217 }
218
219 if (timeUnit == null) {
220 throw new NullPointerException("Time unit is null");
221 }
222
223 if (timeout <= 0) {
224 timeout = defaultTimeout;
225 }
226 else {
227 timeout = timeUnit.toMillis(timeout);
228 }
229
230 ensureOpen();
231
232 return doSendSyncDatagram(registrationReference, datagram, timeout);
233 }
234
235 public DatagramReceiveHandler unregisterDatagramReceiveHandler(byte type) {
236 return registerDatagramReceiveHandler(type, null);
237 }
238
239 protected void addResponseWaitingDatagram(Datagram requestDatagram) {
240 long sequenceId = requestDatagram.getSequenceId();
241
242 long expireTime = System.currentTimeMillis() + requestDatagram.timeout;
243
244 requestDatagram.expireTime = expireTime;
245
246 responseWaitingMap.put(sequenceId, requestDatagram);
247
248 timeoutMap.put(expireTime, sequenceId);
249 }
250
251 protected void cleanUpTimeoutResponseWaitingDatagrams() {
252 Map<Long, Long> map = timeoutMap.headMap(
253 System.currentTimeMillis(), true);
254
255 if (map.isEmpty()) {
256 return;
257 }
258
259 Set<Map.Entry<Long, Long>> set = map.entrySet();
260
261 Iterator<Map.Entry<Long, Long>> iterator = set.iterator();
262
263 while (iterator.hasNext()) {
264 Map.Entry<Long, Long> entry = iterator.next();
265
266 iterator.remove();
267
268 Long sequenceId = entry.getValue();
269
270 Datagram datagram = responseWaitingMap.remove(sequenceId);
271
272 if (_log.isWarnEnabled()) {
273 _log.warn(
274 "Removed timeout response waiting datagram " + datagram);
275 }
276
277 datagram.completionHandler.timeouted(datagram.attachment);
278 }
279 }
280
281 protected abstract void doSendDatagram(
282 RegistrationReference registrationReference, Datagram datagram);
283
284 protected Datagram doSendSyncDatagram(
285 RegistrationReference registrationReference, Datagram datagram,
286 long timeout)
287 throws InterruptedException, IOException, TimeoutException {
288
289 SendSyncDatagramCompletionHandler sendSyncDatagramCompletionHandler =
290 new SendSyncDatagramCompletionHandler();
291
292 datagram.completionHandler = sendSyncDatagramCompletionHandler;
293 datagram.completionTypes = REPLIED_ENUM_SET;
294 datagram.timeout = timeout;
295
296 if (datagram.getSequenceId() == 0) {
297 datagram.setSequenceId(generateSequenceId());
298 }
299
300 addResponseWaitingDatagram(datagram);
301
302 doSendDatagram(registrationReference, datagram);
303
304 return sendSyncDatagramCompletionHandler.waitResult(timeout);
305 }
306
307 protected void ensureOpen() {
308 if (!isOpen()) {
309 throw new ClosedIntraBandException();
310 }
311 }
312
313 protected long generateSequenceId() {
314 long sequenceId = sequenceIdGenerator.incrementAndGet();
315
316 if (sequenceId <= 0) {
317
318
319
320
321
322
323
324
325
326
327
328
329
330 while (true) {
331 if (sequenceIdGenerator.compareAndSet(sequenceId, 1)) {
332 return 1;
333 }
334
335 sequenceId = sequenceIdGenerator.incrementAndGet();
336
337
338
339 if (sequenceId > 0) {
340 return sequenceId;
341 }
342
343 }
344 }
345
346 return sequenceId;
347 }
348
349 protected void handleReading(
350 ScatteringByteChannel scatteringByteChannel,
351 ChannelContext channelContext) {
352
353 Datagram datagram = channelContext.getReadingDatagram();
354
355 if (datagram == null) {
356 datagram = Datagram.createReceiveDatagram();
357
358 channelContext.setReadingDatagram(datagram);
359 }
360
361 try {
362 if (datagram.readFrom(scatteringByteChannel)) {
363 channelContext.setReadingDatagram(
364 Datagram.createReceiveDatagram());
365
366 if (datagram.isAckResponse()) {
367 Datagram requestDatagram = removeResponseWaitingDatagram(
368 datagram);
369
370 if (requestDatagram == null) {
371 if (_log.isWarnEnabled()) {
372 _log.warn(
373 "Dropped ownerless ACK response " + datagram);
374 }
375 }
376 else {
377 CompletionHandler<Object> completionHandler =
378 requestDatagram.completionHandler;
379
380 completionHandler.delivered(requestDatagram.attachment);
381 }
382 }
383 else if (datagram.isResponse()) {
384 Datagram requestDatagram = removeResponseWaitingDatagram(
385 datagram);
386
387 if (requestDatagram == null) {
388 if (_log.isWarnEnabled()) {
389 _log.warn("Dropped ownerless response " + datagram);
390 }
391 }
392 else {
393 EnumSet<CompletionType> completionTypes =
394 requestDatagram.completionTypes;
395
396 if (completionTypes.contains(CompletionType.REPLIED)) {
397 CompletionHandler<Object> completionHandler =
398 requestDatagram.completionHandler;
399
400 completionHandler.replied(
401 requestDatagram.attachment, datagram);
402 }
403 else if (_log.isWarnEnabled()) {
404 _log.warn(
405 "Dropped unconcerned response " + datagram);
406 }
407 }
408 }
409 else {
410 if (datagram.isAckRequest()) {
411 Datagram ackResponseDatagram =
412 Datagram.createACKResponseDatagram(
413 datagram.getSequenceId());
414
415 doSendDatagram(
416 channelContext.getRegistrationReference(),
417 ackResponseDatagram);
418 }
419
420 int index = datagram.getType() & 0xFF;
421
422 DatagramReceiveHandler datagramReceiveHandler =
423 datagramReceiveHandlersReference.get()[index];
424
425 if (datagramReceiveHandler == null) {
426 if (_log.isWarnEnabled()) {
427 _log.warn("Dropped ownerless request " + datagram);
428 }
429 }
430 else {
431 try {
432 datagramReceiveHandler.receive(
433 channelContext.getRegistrationReference(),
434 datagram);
435 }
436 catch (Throwable t) {
437 _log.error("Unable to dispatch", t);
438 }
439 }
440 }
441 }
442 }
443 catch (IOException ioe) {
444 RegistrationReference registrationReference =
445 channelContext.getRegistrationReference();
446
447 registrationReference.cancelRegistration();
448
449 if (_log.isDebugEnabled()) {
450 _log.debug(
451 "Broken read channel, unregister " + registrationReference,
452 ioe);
453 }
454 else if (_log.isInfoEnabled()) {
455 _log.info(
456 "Broken read channel, unregister " + registrationReference);
457 }
458 }
459 }
460
461 protected boolean handleWriting(
462 GatheringByteChannel gatheringByteChannel,
463 ChannelContext channelContext) {
464
465 Datagram datagram = channelContext.getWritingDatagram();
466
467 try {
468 if (datagram.writeTo(gatheringByteChannel)) {
469 channelContext.setWritingDatagram(null);
470
471 EnumSet<CompletionType> completionTypes =
472 datagram.completionTypes;
473
474 if (completionTypes != null) {
475 if (completionTypes.contains(CompletionType.SUBMITTED)) {
476 CompletionHandler<Object> completeHandler =
477 datagram.completionHandler;
478
479 completeHandler.submitted(datagram.attachment);
480 }
481 }
482
483 return true;
484 }
485 else {
486 return false;
487 }
488 }
489 catch (IOException ioe) {
490 RegistrationReference registrationReference =
491 channelContext.getRegistrationReference();
492
493 registrationReference.cancelRegistration();
494
495 CompletionHandler<Object> completionHandler =
496 datagram.completionHandler;
497
498 if (completionHandler != null) {
499 completionHandler.failed(datagram.attachment, ioe);
500 }
501
502 if (_log.isDebugEnabled()) {
503 _log.debug(
504 "Broken write channel, unregister " + registrationReference,
505 ioe);
506 }
507 else if (_log.isInfoEnabled()) {
508 _log.info(
509 "Broken write channel, unregister " +
510 registrationReference);
511 }
512
513 return false;
514 }
515 }
516
517 protected Datagram removeResponseWaitingDatagram(
518 Datagram responseDatagram) {
519
520 long sequenceId = responseDatagram.getSequenceId();
521
522 Datagram requestDatagram = responseWaitingMap.remove(sequenceId);
523
524 if (requestDatagram != null) {
525 timeoutMap.remove(requestDatagram.expireTime);
526 }
527
528 return requestDatagram;
529 }
530
531 protected static final EnumSet<CompletionType> REPLIED_ENUM_SET =
532 EnumSet.of(CompletionType.REPLIED);
533
534 protected final long defaultTimeout;
535 protected final AtomicReference<DatagramReceiveHandler[]>
536 datagramReceiveHandlersReference =
537 new AtomicReference<DatagramReceiveHandler[]>(
538 new DatagramReceiveHandler[256]);
539 protected volatile boolean open = true;
540 protected final Map<Long, Datagram> responseWaitingMap =
541 new ConcurrentHashMap<Long, Datagram>();
542 protected final AtomicLong sequenceIdGenerator = new AtomicLong();
543 protected final NavigableMap<Long, Long> timeoutMap =
544 new ConcurrentSkipListMap<Long, Long>();
545
546 protected static class SendSyncDatagramCompletionHandler
547 implements CompletionHandler<Object> {
548
549 public void delivered(Object attachment) {
550 }
551
552 public void failed(Object attachment, IOException ioe) {
553
554
555
556 _ioe = ioe;
557
558 _countDownLatch.countDown();
559 }
560
561 public void replied(Object attachment, Datagram datagram) {
562
563
564
565 _datagram = datagram;
566
567 _countDownLatch.countDown();
568 }
569
570 public void submitted(Object attachment) {
571 }
572
573 public void timeouted(Object attachment) {
574 }
575
576 public Datagram waitResult(long timeout)
577 throws InterruptedException, IOException, TimeoutException {
578
579 boolean result = _countDownLatch.await(
580 timeout, TimeUnit.MILLISECONDS);
581
582 if (!result) {
583 throw new TimeoutException("Result waiting timeout");
584 }
585
586 if (_ioe != null) {
587 throw _ioe;
588 }
589
590 return _datagram;
591 }
592
593 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
594 private Datagram _datagram;
595 private IOException _ioe;
596
597 }
598
599 private static Log _log = LogFactoryUtil.getLog(BaseIntraBand.class);
600
601 }