001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.util.PropsKeys;
025    import com.liferay.portal.util.PropsUtil;
026    import com.liferay.portal.util.PropsValues;
027    
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.List;
031    import java.util.Properties;
032    
033    import org.jgroups.ChannelException;
034    import org.jgroups.JChannel;
035    
036    /**
037     * @author Shuyang Zhou
038     */
039    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
040    
041            public static final int MAX_CHANNEL_COUNT = Priority.values().length;
042    
043            @Override
044            public void destroy() {
045                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
046                            return;
047                    }
048    
049                    for (JChannel jChannel : _transportChannels) {
050                            jChannel.close();
051                    }
052            }
053    
054            public List<Address> getLocalTransportAddresses() {
055                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
056                            return Collections.emptyList();
057                    }
058    
059                    List<Address> addresses = new ArrayList<Address>(
060                            _localTransportAddresses.size());
061    
062                    for (org.jgroups.Address address : _localTransportAddresses) {
063                            addresses.add(new AddressImpl(address));
064                    }
065    
066                    return addresses;
067            }
068    
069            public List<Address> getTransportAddresses(Priority priority) {
070                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
071                            return Collections.emptyList();
072                    }
073    
074                    JChannel jChannel = getChannel(priority);
075    
076                    return getAddresses(jChannel);
077            }
078    
079            public void sendMulticastMessage(Message message, Priority priority) {
080                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
081                            return;
082                    }
083    
084                    JChannel jChannel = getChannel(priority);
085    
086                    try {
087                            jChannel.send(null, null, message);
088                    }
089                    catch (ChannelException ce) {
090                            _log.error("Unable to send multicast message " + message, ce);
091                    }
092            }
093    
094            public void sendUnicastMessage(
095                    Address address, Message message, Priority priority) {
096    
097                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
098                            return;
099                    }
100    
101                    org.jgroups.Address jGroupsAddress =
102                            (org.jgroups.Address)address.getRealAddress();
103    
104                    JChannel jChannel = getChannel(priority);
105    
106                    try {
107                            jChannel.send(jGroupsAddress, null, message);
108                    }
109                    catch (ChannelException ce) {
110                            _log.error("Unable to send unicast message " + message, ce);
111                    }
112            }
113    
114            public void setClusterForwardMessageListener(
115                    ClusterForwardMessageListener clusterForwardMessageListener) {
116    
117                    _clusterForwardMessageListener = clusterForwardMessageListener;
118            }
119    
120            protected JChannel getChannel(Priority priority) {
121                    int channelIndex =
122                            priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
123    
124                    if (_log.isDebugEnabled()) {
125                            _log.debug(
126                                    "Select channel number " + channelIndex + " for priority " +
127                                            priority);
128                    }
129    
130                    return _transportChannels.get(channelIndex);
131            }
132    
133            @Override
134            protected void initChannels() throws ChannelException {
135                    Properties transportProperties = PropsUtil.getProperties(
136                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
137    
138                    _channelCount = transportProperties.size();
139    
140                    if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
141                            throw new IllegalArgumentException(
142                                    "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
143                    }
144    
145                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
146                            _channelCount);
147                    _transportChannels = new ArrayList<JChannel>(_channelCount);
148    
149                    List<String> keys = new ArrayList<String>(_channelCount);
150    
151                    for (Object key : transportProperties.keySet()) {
152                            keys.add((String)key);
153                    }
154    
155                    Collections.sort(keys);
156    
157                    for (int i = 0; i < keys.size(); i++) {
158                            String customName = keys.get(i);
159    
160                            String value = transportProperties.getProperty(customName);
161    
162                            JChannel jChannel = createJChannel(
163                                    value,
164                                    new ClusterForwardReceiver(
165                                            _localTransportAddresses, _clusterForwardMessageListener),
166                                            _LIFERAY_TRANSPORT_CHANNEL + i);
167    
168                            _localTransportAddresses.add(jChannel.getLocalAddress());
169                            _transportChannels.add(jChannel);
170                    }
171            }
172    
173            private static final String _LIFERAY_TRANSPORT_CHANNEL =
174                    "LIFERAY-TRANSPORT-CHANNEL-";
175    
176            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
177    
178            private int _channelCount;
179            private ClusterForwardMessageListener _clusterForwardMessageListener;
180            private List<org.jgroups.Address> _localTransportAddresses;
181            private List<JChannel> _transportChannels;
182    
183    }