001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cache.cluster.clusterlink;
016    
017    import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterChannel;
018    import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterChannelFactory;
019    import com.liferay.portal.kernel.cache.cluster.PortalCacheClusterException;
020    import com.liferay.portal.kernel.cluster.Priority;
021    import com.liferay.portal.util.PropsValues;
022    
023    import java.util.Collections;
024    import java.util.List;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    /**
028     * @author Shuyang Zhou
029     */
030    public class ClusterLinkPortalCacheClusterChannelFactory
031            implements PortalCacheClusterChannelFactory {
032    
033            @Override
034            public PortalCacheClusterChannel createPortalCacheClusterChannel()
035                    throws PortalCacheClusterException {
036    
037                    int count = _counter.getAndIncrement();
038    
039                    if (count >= _priorities.size()) {
040                            throw new IllegalStateException(
041                                    "Cannot create more than " + _priorities.size() + " channels");
042                    }
043    
044                    if (PropsValues.CLUSTER_LINK_CACHE_USING_COALESCED_PIPE) {
045                            return new ClusterLinkPortalCacheClusterChannel(
046                                    _destinationName,
047                                    new CoalescedPipePortalCacheClusterEventQueue(),
048                                    _priorities.get(count));
049                    }
050    
051                    return new ClusterLinkPortalCacheClusterChannel(
052                            _destinationName, new BlockingPortalCacheClusterEventQueue(),
053                            _priorities.get(count));
054            }
055    
056            public void setDestinationName(String destinationName) {
057                    _destinationName = destinationName;
058            }
059    
060            public void setPriorities(List<Priority> priorities) {
061                    _priorities = priorities;
062    
063                    Collections.sort(priorities);
064            }
065    
066            private AtomicInteger _counter = new AtomicInteger(0);
067            private String _destinationName;
068            private List<Priority> _priorities;
069    
070    }