001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.cluster;
016    
017    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
018    
019    import java.util.HashSet;
020    import java.util.List;
021    import java.util.Set;
022    import java.util.concurrent.BlockingQueue;
023    import java.util.concurrent.ExecutionException;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.TimeoutException;
026    import java.util.concurrent.atomic.AtomicInteger;
027    
028    /**
029     * @author Tina Tian
030     */
031    public class FutureClusterResponses
032            extends DefaultNoticeableFuture<ClusterNodeResponses> {
033    
034            public FutureClusterResponses(List<Address> addresses) {
035                    _clusterNodeResponses = new ClusterNodeResponses();
036    
037                    int size = addresses.size();
038    
039                    if (size == 0) {
040                            set(_clusterNodeResponses);
041                    }
042    
043                    _counter = new AtomicInteger(size);
044                    _expectedReplyAddress = new HashSet<Address>(addresses);
045            }
046    
047            public void addClusterNodeResponse(
048                    ClusterNodeResponse clusterNodeResponse) {
049    
050                    _clusterNodeResponses.addClusterResponse(clusterNodeResponse);
051    
052                    if (_counter.decrementAndGet() == 0) {
053                            set(_clusterNodeResponses);
054                    }
055            }
056    
057            public boolean expectsReply(Address address) {
058                    return _expectedReplyAddress.contains(address);
059            }
060    
061            @Override
062            public ClusterNodeResponses get() throws InterruptedException {
063                    try {
064                            return super.get();
065                    }
066                    catch (ExecutionException ee) {
067                            throw new AssertionError(ee);
068                    }
069            }
070    
071            @Override
072            public ClusterNodeResponses get(long timeout, TimeUnit unit)
073                    throws InterruptedException, TimeoutException {
074    
075                    try {
076                            return super.get(timeout, unit);
077                    }
078                    catch (ExecutionException ee) {
079                            throw new AssertionError(ee);
080                    }
081            }
082    
083            public BlockingQueue<ClusterNodeResponse> getPartialResults() {
084                    return _clusterNodeResponses.getClusterResponses();
085            }
086    
087            private final ClusterNodeResponses _clusterNodeResponses;
088            private final AtomicInteger _counter;
089            private final Set<Address> _expectedReplyAddress;
090    
091    }