001
014
015 package com.liferay.portal.kernel.cluster;
016
017 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
018
019 import java.util.HashSet;
020 import java.util.List;
021 import java.util.Set;
022 import java.util.concurrent.BlockingQueue;
023 import java.util.concurrent.ExecutionException;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.TimeoutException;
026 import java.util.concurrent.atomic.AtomicInteger;
027
028
031 public class FutureClusterResponses
032 extends DefaultNoticeableFuture<ClusterNodeResponses> {
033
034 public FutureClusterResponses(List<Address> addresses) {
035 _clusterNodeResponses = new ClusterNodeResponses();
036
037 int size = addresses.size();
038
039 if (size == 0) {
040 set(_clusterNodeResponses);
041 }
042
043 _counter = new AtomicInteger(size);
044 _expectedReplyAddress = new HashSet<Address>(addresses);
045 }
046
047 public void addClusterNodeResponse(
048 ClusterNodeResponse clusterNodeResponse) {
049
050 _clusterNodeResponses.addClusterResponse(clusterNodeResponse);
051
052 if (_counter.decrementAndGet() == 0) {
053 set(_clusterNodeResponses);
054 }
055 }
056
057 public boolean expectsReply(Address address) {
058 return _expectedReplyAddress.contains(address);
059 }
060
061 @Override
062 public ClusterNodeResponses get() throws InterruptedException {
063 try {
064 return super.get();
065 }
066 catch (ExecutionException ee) {
067 throw new AssertionError(ee);
068 }
069 }
070
071 @Override
072 public ClusterNodeResponses get(long timeout, TimeUnit unit)
073 throws InterruptedException, TimeoutException {
074
075 try {
076 return super.get(timeout, unit);
077 }
078 catch (ExecutionException ee) {
079 throw new AssertionError(ee);
080 }
081 }
082
083 public BlockingQueue<ClusterNodeResponse> getPartialResults() {
084 return _clusterNodeResponses.getClusterResponses();
085 }
086
087 private final ClusterNodeResponses _clusterNodeResponses;
088 private final AtomicInteger _counter;
089 private final Set<Address> _expectedReplyAddress;
090
091 }