001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cache.cluster.clusterlink;
016    
017    import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterChannel;
018    import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
020    import com.liferay.portal.kernel.cluster.Priority;
021    import com.liferay.portal.kernel.io.Serializer;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.Message;
025    
026    import java.nio.ByteBuffer;
027    
028    import java.util.concurrent.atomic.AtomicInteger;
029    import java.util.concurrent.atomic.AtomicLong;
030    
031    /**
032     * @author Shuyang Zhou
033     */
034    public class ClusterLinkPortalCacheClusterChannel
035            implements PortalCacheClusterChannel, Runnable {
036    
037            public ClusterLinkPortalCacheClusterChannel(
038                    String destinationName,
039                    PortalCacheClusterEventQueue portalCacheClusterEventQueue,
040                    Priority priority) {
041    
042                    _destinationName = destinationName;
043                    _portalCacheClusterEventQueue = portalCacheClusterEventQueue;
044                    _priority = priority;
045    
046                    _dispatchThread = new Thread(
047                            this,
048                            "PortalCacheClusterChannel dispatch thread-" +
049                                    _dispatchThreadCounter.getAndIncrement());
050            }
051    
052            @Override
053            public void destroy() {
054                    _destroy = true;
055    
056                    _dispatchThread.interrupt();
057            }
058    
059            public void dispatchEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
060                    Message message = new Message();
061    
062                    message.setDestinationName(_destinationName);
063    
064                    Serializer serializer = new Serializer();
065    
066                    serializer.writeObject(portalCacheClusterEvent);
067    
068                    ByteBuffer byteBuffer = serializer.toByteBuffer();
069    
070                    message.setPayload(byteBuffer.array());
071    
072                    ClusterLinkUtil.sendMulticastMessage(message, _priority);
073            }
074    
075            @Override
076            public long getCoalescedEventNumber() {
077                    return _portalCacheClusterEventQueue.coalescedCount();
078            }
079    
080            @Override
081            public int getPendingEventNumber() {
082                    return _portalCacheClusterEventQueue.pendingCount();
083            }
084    
085            @Override
086            public long getSentEventNumber() {
087                    return _sentEventCounter.get();
088            }
089    
090            @Override
091            public void run() {
092                    while (true) {
093                            try {
094                                    if (_destroy) {
095                                            for (PortalCacheClusterEvent event :
096                                                            _portalCacheClusterEventQueue.takeSnapshot()) {
097    
098                                                    dispatchEvent(event);
099    
100                                                    _sentEventCounter.incrementAndGet();
101                                            }
102    
103                                            break;
104                                    }
105                                    else {
106                                            try {
107                                                    PortalCacheClusterEvent portalCacheClusterEvent =
108                                                            _portalCacheClusterEventQueue.take();
109    
110                                                    dispatchEvent(portalCacheClusterEvent);
111    
112                                                    _sentEventCounter.incrementAndGet();
113                                            }
114                                            catch (InterruptedException ie) {
115                                            }
116                                    }
117                            }
118                            catch (Throwable t) {
119                                    if (_log.isWarnEnabled()) {
120                                            _log.warn("Please fix the unexpected throwable", t);
121                                    }
122                            }
123                    }
124            }
125    
126            @Override
127            public void sendEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
128                    if (_started == false) {
129                            synchronized (this) {
130                                    if (_started == false) {
131                                            _dispatchThread.start();
132    
133                                            _started = true;
134                                    }
135                            }
136                    }
137    
138                    if (_destroy) {
139                            dispatchEvent(portalCacheClusterEvent);
140                    }
141                    else {
142                            try {
143                                    _portalCacheClusterEventQueue.put(portalCacheClusterEvent);
144                            }
145                            catch (InterruptedException ie) {
146                            }
147                    }
148            }
149    
150            private static Log _log = LogFactoryUtil.getLog(
151                    ClusterLinkPortalCacheClusterChannel.class);
152    
153            private static final AtomicInteger _dispatchThreadCounter =
154                    new AtomicInteger(0);
155    
156            private String _destinationName;
157            private volatile boolean _destroy;
158            private final Thread _dispatchThread;
159            private final PortalCacheClusterEventQueue _portalCacheClusterEventQueue;
160            private Priority _priority;
161            private final AtomicLong _sentEventCounter = new AtomicLong(0);
162            private volatile boolean _started;
163    
164    }