001
014
015 package com.liferay.portal.kernel.cluster;
016
017 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
018
019 import java.util.Set;
020 import java.util.concurrent.BlockingQueue;
021 import java.util.concurrent.ExecutionException;
022 import java.util.concurrent.TimeUnit;
023 import java.util.concurrent.TimeoutException;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026
029 public class FutureClusterResponses
030 extends DefaultNoticeableFuture<ClusterNodeResponses> {
031
032 public FutureClusterResponses(Set<String> clusterNodeIds) {
033 _clusterNodeResponses = new ClusterNodeResponses(clusterNodeIds);
034
035 int size = clusterNodeIds.size();
036
037 if (size == 0) {
038 set(_clusterNodeResponses);
039 }
040
041 _counter = new AtomicInteger(size);
042 }
043
044 public boolean addClusterNodeResponse(
045 ClusterNodeResponse clusterNodeResponse) {
046
047 if (_clusterNodeResponses.addClusterResponse(clusterNodeResponse)) {
048 if (_counter.decrementAndGet() == 0) {
049 set(_clusterNodeResponses);
050 }
051
052 return true;
053 }
054
055 return false;
056 }
057
058 @Override
059 public ClusterNodeResponses get() throws InterruptedException {
060 try {
061 return super.get();
062 }
063 catch (ExecutionException ee) {
064 throw new AssertionError(ee);
065 }
066 }
067
068 @Override
069 public ClusterNodeResponses get(long timeout, TimeUnit unit)
070 throws InterruptedException, TimeoutException {
071
072 try {
073 return super.get(timeout, unit);
074 }
075 catch (ExecutionException ee) {
076 throw new AssertionError(ee);
077 }
078 }
079
080 public BlockingQueue<ClusterNodeResponse> getPartialResults() {
081 return _clusterNodeResponses.getClusterResponses();
082 }
083
084 private final ClusterNodeResponses _clusterNodeResponses;
085 private final AtomicInteger _counter;
086
087 }