001
014
015 package com.liferay.portal.kernel.cluster;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018
019 import java.io.Serializable;
020
021 import java.util.Map;
022 import java.util.Set;
023 import java.util.concurrent.BlockingQueue;
024 import java.util.concurrent.ConcurrentHashMap;
025 import java.util.concurrent.LinkedBlockingQueue;
026
027
030 public class ClusterNodeResponses implements Serializable {
031
032 public ClusterNodeResponses(Set<String> expectedReplyNodeIds) {
033 _expectedReplyNodeIds = new ConcurrentHashSet<>(expectedReplyNodeIds);
034 }
035
036 public boolean addClusterResponse(ClusterNodeResponse clusterNodeResponse) {
037 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
038
039 String clusterNodeId = clusterNode.getClusterNodeId();
040
041 if (_expectedReplyNodeIds.remove(clusterNodeId)) {
042 _clusterResponsesByClusterNode.put(
043 clusterNodeId, clusterNodeResponse);
044
045 _clusterResponsesQueue.offer(clusterNodeResponse);
046
047 return true;
048 }
049
050 return false;
051 }
052
053 public ClusterNodeResponse getClusterResponse(String clusterNodeId) {
054 return _clusterResponsesByClusterNode.get(clusterNodeId);
055 }
056
057 public BlockingQueue<ClusterNodeResponse> getClusterResponses() {
058 return _clusterResponsesQueue;
059 }
060
061 public int size() {
062 return _clusterResponsesByClusterNode.size();
063 }
064
065 private final Map<String, ClusterNodeResponse>
066 _clusterResponsesByClusterNode = new ConcurrentHashMap<>();
067 private final BlockingQueue<ClusterNodeResponse> _clusterResponsesQueue =
068 new LinkedBlockingQueue<>();
069 private final Set<String> _expectedReplyNodeIds;
070
071 }