001
014
015 package com.liferay.portal.cache.cluster.clusterlink;
016
017 import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterEvent;
018 import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterEventCoalesceComparator;
019 import com.liferay.portal.kernel.concurrent.CoalescedPipe;
020
021 import java.util.HashSet;
022 import java.util.Set;
023
024
027 public class CoalescedPipePortalCacheClusterEventQueue
028 implements PortalCacheClusterEventQueue {
029
030 public CoalescedPipePortalCacheClusterEventQueue() {
031 _coalescedPipe = new CoalescedPipe<PortalCacheClusterEvent>(
032 new PortalCacheClusterEventCoalesceComparator());
033 }
034
035 @Override
036 public long coalescedCount() {
037 return _coalescedPipe.coalescedCount();
038 }
039
040 @Override
041 public int pendingCount() {
042 return _coalescedPipe.pendingCount();
043 }
044
045 @Override
046 public void put(PortalCacheClusterEvent portalCacheClusterEvent)
047 throws InterruptedException {
048
049 _coalescedPipe.put(portalCacheClusterEvent);
050 }
051
052 @Override
053 public PortalCacheClusterEvent take() throws InterruptedException {
054 return _coalescedPipe.take();
055 }
056
057 @Override
058 public Set<PortalCacheClusterEvent> takeSnapshot() {
059 Set<PortalCacheClusterEvent> portalCacheClusterEvents =
060 new HashSet<PortalCacheClusterEvent>();
061
062 for (Object object : _coalescedPipe.takeSnapshot()) {
063 portalCacheClusterEvents.add((PortalCacheClusterEvent)object);
064 }
065
066 return portalCacheClusterEvents;
067 }
068
069 private final CoalescedPipe<PortalCacheClusterEvent> _coalescedPipe;
070
071 }