001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.cluster;
016    
017    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
018    
019    import java.util.Set;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.ExecutionException;
022    import java.util.concurrent.TimeUnit;
023    import java.util.concurrent.TimeoutException;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    /**
027     * @author Tina Tian
028     */
029    public class FutureClusterResponses
030            extends DefaultNoticeableFuture<ClusterNodeResponses> {
031    
032            public FutureClusterResponses(Set<String> clusterNodeIds) {
033                    _clusterNodeResponses = new ClusterNodeResponses(clusterNodeIds);
034    
035                    int size = clusterNodeIds.size();
036    
037                    if (size == 0) {
038                            set(_clusterNodeResponses);
039                    }
040    
041                    _counter = new AtomicInteger(size);
042            }
043    
044            public boolean addClusterNodeResponse(
045                    ClusterNodeResponse clusterNodeResponse) {
046    
047                    if (_clusterNodeResponses.addClusterResponse(clusterNodeResponse)) {
048                            if (_counter.decrementAndGet() == 0) {
049                                    set(_clusterNodeResponses);
050                            }
051    
052                            return true;
053                    }
054    
055                    return false;
056            }
057    
058            @Override
059            public ClusterNodeResponses get() throws InterruptedException {
060                    try {
061                            return super.get();
062                    }
063                    catch (ExecutionException ee) {
064                            throw new AssertionError(ee);
065                    }
066            }
067    
068            @Override
069            public ClusterNodeResponses get(long timeout, TimeUnit unit)
070                    throws InterruptedException, TimeoutException {
071    
072                    try {
073                            return super.get(timeout, unit);
074                    }
075                    catch (ExecutionException ee) {
076                            throw new AssertionError(ee);
077                    }
078            }
079    
080            public BlockingQueue<ClusterNodeResponse> getPartialResults() {
081                    return _clusterNodeResponses.getClusterResponses();
082            }
083    
084            private final ClusterNodeResponses _clusterNodeResponses;
085            private final AtomicInteger _counter;
086    
087    }