001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025    import com.liferay.portal.kernel.util.PropsKeys;
026    import com.liferay.portal.util.PropsUtil;
027    
028    import java.net.InetAddress;
029    
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.List;
033    import java.util.Properties;
034    
035    import org.jgroups.JChannel;
036    
037    /**
038     * @author Shuyang Zhou
039     */
040    @DoPrivileged
041    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
042    
043            @Override
044            public void destroy() {
045                    if (!isEnabled()) {
046                            return;
047                    }
048    
049                    for (JChannel jChannel : _transportChannels) {
050                            jChannel.close();
051                    }
052            }
053    
054            @Override
055            public InetAddress getBindInetAddress() {
056                    return bindInetAddress;
057            }
058    
059            @Override
060            public List<Address> getLocalTransportAddresses() {
061                    if (!isEnabled()) {
062                            return Collections.emptyList();
063                    }
064    
065                    List<Address> addresses = new ArrayList<Address>(
066                            _localTransportAddresses.size());
067    
068                    for (org.jgroups.Address address : _localTransportAddresses) {
069                            addresses.add(new AddressImpl(address));
070                    }
071    
072                    return addresses;
073            }
074    
075            @Override
076            public List<Address> getTransportAddresses(Priority priority) {
077                    if (!isEnabled()) {
078                            return Collections.emptyList();
079                    }
080    
081                    JChannel jChannel = getChannel(priority);
082    
083                    return getAddresses(jChannel);
084            }
085    
086            @Override
087            public void initialize() {
088                    if (!isEnabled()) {
089                            return;
090                    }
091    
092                    for (JChannel jChannel : _transportChannels) {
093                            BaseReceiver baseReceiver = (BaseReceiver)jChannel.getReceiver();
094    
095                            baseReceiver.openLatch();
096                    }
097            }
098    
099            @Override
100            public void sendMulticastMessage(Message message, Priority priority) {
101                    if (!isEnabled()) {
102                            return;
103                    }
104    
105                    JChannel jChannel = getChannel(priority);
106    
107                    try {
108                            jChannel.send(null, message);
109                    }
110                    catch (Exception e) {
111                            _log.error("Unable to send multicast message " + message, e);
112                    }
113            }
114    
115            @Override
116            public void sendUnicastMessage(
117                    Address address, Message message, Priority priority) {
118    
119                    if (!isEnabled()) {
120                            return;
121                    }
122    
123                    org.jgroups.Address jGroupsAddress =
124                            (org.jgroups.Address)address.getRealAddress();
125    
126                    JChannel jChannel = getChannel(priority);
127    
128                    try {
129                            jChannel.send(jGroupsAddress, message);
130                    }
131                    catch (Exception e) {
132                            _log.error("Unable to send unicast message " + message, e);
133                    }
134            }
135    
136            public void setClusterForwardMessageListener(
137                    ClusterForwardMessageListener clusterForwardMessageListener) {
138    
139                    _clusterForwardMessageListener = clusterForwardMessageListener;
140            }
141    
142            protected JChannel getChannel(Priority priority) {
143                    int channelIndex =
144                            priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
145    
146                    if (_log.isDebugEnabled()) {
147                            _log.debug(
148                                    "Select channel number " + channelIndex + " for priority " +
149                                            priority);
150                    }
151    
152                    return _transportChannels.get(channelIndex);
153            }
154    
155            @Override
156            protected void initChannels() throws Exception {
157                    Properties transportProperties = PropsUtil.getProperties(
158                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
159    
160                    _channelCount = transportProperties.size();
161    
162                    if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
163                            throw new IllegalArgumentException(
164                                    "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
165                    }
166    
167                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
168                            _channelCount);
169                    _transportChannels = new ArrayList<JChannel>(_channelCount);
170    
171                    List<String> keys = new ArrayList<String>(_channelCount);
172    
173                    for (Object key : transportProperties.keySet()) {
174                            keys.add((String)key);
175                    }
176    
177                    Collections.sort(keys);
178    
179                    for (int i = 0; i < keys.size(); i++) {
180                            String customName = keys.get(i);
181    
182                            String value = transportProperties.getProperty(customName);
183    
184                            JChannel jChannel = createJChannel(
185                                    value,
186                                    new ClusterForwardReceiver(
187                                            _localTransportAddresses, _clusterForwardMessageListener),
188                                            _LIFERAY_TRANSPORT_CHANNEL + i);
189    
190                            _localTransportAddresses.add(jChannel.getAddress());
191                            _transportChannels.add(jChannel);
192                    }
193            }
194    
195            private static final String _LIFERAY_TRANSPORT_CHANNEL =
196                    "LIFERAY-TRANSPORT-CHANNEL-";
197    
198            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
199    
200            private int _channelCount;
201            private ClusterForwardMessageListener _clusterForwardMessageListener;
202            private List<org.jgroups.Address> _localTransportAddresses;
203            private List<JChannel> _transportChannels;
204    
205    }