001
014
015 package com.liferay.portal.cache.cluster.clusterlink;
016
017 import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterEvent;
018
019 import java.util.HashSet;
020 import java.util.Set;
021 import java.util.concurrent.BlockingQueue;
022 import java.util.concurrent.LinkedBlockingQueue;
023
024
027 public class BlockingPortalCacheClusterEventQueue
028 implements PortalCacheClusterEventQueue {
029
030 public BlockingPortalCacheClusterEventQueue() {
031 _blockingQueue = new LinkedBlockingQueue<PortalCacheClusterEvent>();
032 }
033
034 @Override
035 public long coalescedCount() {
036 return 0;
037 }
038
039 @Override
040 public int pendingCount() {
041 return _blockingQueue.size();
042 }
043
044 @Override
045 public void put(PortalCacheClusterEvent portalCacheClusterEvent)
046 throws InterruptedException {
047
048 _blockingQueue.put(portalCacheClusterEvent);
049 }
050
051 @Override
052 public PortalCacheClusterEvent take() throws InterruptedException {
053 return _blockingQueue.take();
054 }
055
056 @Override
057 public Set<PortalCacheClusterEvent> takeSnapshot() {
058 Set<PortalCacheClusterEvent> portalCacheClusterEvents =
059 new HashSet<PortalCacheClusterEvent>();
060
061 _blockingQueue.drainTo(portalCacheClusterEvents);
062
063 return portalCacheClusterEvents;
064 }
065
066 private final BlockingQueue<PortalCacheClusterEvent> _blockingQueue;
067
068 }