001
014
015 package com.liferay.portal.cache.cluster.clusterlink;
016
017 import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterChannel;
018 import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
020 import com.liferay.portal.kernel.cluster.Priority;
021 import com.liferay.portal.kernel.io.Serializer;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Message;
025
026 import java.nio.ByteBuffer;
027
028 import java.util.concurrent.atomic.AtomicInteger;
029 import java.util.concurrent.atomic.AtomicLong;
030
031
034 public class ClusterLinkPortalCacheClusterChannel
035 implements PortalCacheClusterChannel, Runnable {
036
037 public ClusterLinkPortalCacheClusterChannel(
038 String destinationName,
039 PortalCacheClusterEventQueue portalCacheClusterEventQueue,
040 Priority priority) {
041
042 _destinationName = destinationName;
043 _portalCacheClusterEventQueue = portalCacheClusterEventQueue;
044 _priority = priority;
045
046 _dispatchThread = new Thread(
047 this,
048 "PortalCacheClusterChannel dispatch thread-" +
049 _dispatchThreadCounter.getAndIncrement());
050 }
051
052 @Override
053 public void destroy() {
054 _destroy = true;
055
056 _dispatchThread.interrupt();
057 }
058
059 public void dispatchEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
060 Message message = new Message();
061
062 message.setDestinationName(_destinationName);
063
064 Serializer serializer = new Serializer();
065
066 serializer.writeObject(portalCacheClusterEvent);
067
068 ByteBuffer byteBuffer = serializer.toByteBuffer();
069
070 message.setPayload(byteBuffer.array());
071
072 ClusterLinkUtil.sendMulticastMessage(message, _priority);
073 }
074
075 @Override
076 public long getCoalescedEventNumber() {
077 return _portalCacheClusterEventQueue.coalescedCount();
078 }
079
080 @Override
081 public int getPendingEventNumber() {
082 return _portalCacheClusterEventQueue.pendingCount();
083 }
084
085 @Override
086 public long getSentEventNumber() {
087 return _sentEventCounter.get();
088 }
089
090 @Override
091 public void run() {
092 while (true) {
093 try {
094 if (_destroy) {
095 for (PortalCacheClusterEvent event :
096 _portalCacheClusterEventQueue.takeSnapshot()) {
097
098 dispatchEvent(event);
099
100 _sentEventCounter.incrementAndGet();
101 }
102
103 break;
104 }
105 else {
106 try {
107 PortalCacheClusterEvent portalCacheClusterEvent =
108 _portalCacheClusterEventQueue.take();
109
110 dispatchEvent(portalCacheClusterEvent);
111
112 _sentEventCounter.incrementAndGet();
113 }
114 catch (InterruptedException ie) {
115 }
116 }
117 }
118 catch (Throwable t) {
119 if (_log.isWarnEnabled()) {
120 _log.warn("Please fix the unexpected throwable", t);
121 }
122 }
123 }
124 }
125
126 @Override
127 public void sendEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
128 if (_started == false) {
129 synchronized (this) {
130 if (_started == false) {
131 _dispatchThread.start();
132
133 _started = true;
134 }
135 }
136 }
137
138 if (_destroy) {
139 dispatchEvent(portalCacheClusterEvent);
140 }
141 else {
142 try {
143 _portalCacheClusterEventQueue.put(portalCacheClusterEvent);
144 }
145 catch (InterruptedException ie) {
146 }
147 }
148 }
149
150 private static Log _log = LogFactoryUtil.getLog(
151 ClusterLinkPortalCacheClusterChannel.class);
152
153 private static final AtomicInteger _dispatchThreadCounter =
154 new AtomicInteger(0);
155
156 private String _destinationName;
157 private volatile boolean _destroy;
158 private final Thread _dispatchThread;
159 private final PortalCacheClusterEventQueue _portalCacheClusterEventQueue;
160 private Priority _priority;
161 private final AtomicLong _sentEventCounter = new AtomicLong(0);
162 private volatile boolean _started;
163
164 }