001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025    import com.liferay.portal.kernel.util.PropsKeys;
026    import com.liferay.portal.util.PropsUtil;
027    
028    import java.net.InetAddress;
029    
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.List;
033    import java.util.Properties;
034    
035    import org.jgroups.JChannel;
036    
037    /**
038     * @author Shuyang Zhou
039     */
040    @DoPrivileged
041    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
042    
043            @Override
044            public void destroy() {
045                    if (!isEnabled()) {
046                            return;
047                    }
048    
049                    for (JChannel jChannel : _transportChannels) {
050                            jChannel.close();
051                    }
052            }
053    
054            @Override
055            public InetAddress getBindInetAddress() {
056                    return bindInetAddress;
057            }
058    
059            @Override
060            public List<Address> getLocalTransportAddresses() {
061                    if (!isEnabled()) {
062                            return Collections.emptyList();
063                    }
064    
065                    List<Address> addresses = new ArrayList<Address>(
066                            _localTransportAddresses.size());
067    
068                    for (org.jgroups.Address address : _localTransportAddresses) {
069                            addresses.add(new AddressImpl(address));
070                    }
071    
072                    return addresses;
073            }
074    
075            @Override
076            public List<Address> getTransportAddresses(Priority priority) {
077                    if (!isEnabled()) {
078                            return Collections.emptyList();
079                    }
080    
081                    JChannel jChannel = getChannel(priority);
082    
083                    return getAddresses(jChannel);
084            }
085    
086            @Override
087            public void sendMulticastMessage(Message message, Priority priority) {
088                    if (!isEnabled()) {
089                            return;
090                    }
091    
092                    JChannel jChannel = getChannel(priority);
093    
094                    try {
095                            jChannel.send(null, message);
096                    }
097                    catch (Exception e) {
098                            _log.error("Unable to send multicast message " + message, e);
099                    }
100            }
101    
102            @Override
103            public void sendUnicastMessage(
104                    Address address, Message message, Priority priority) {
105    
106                    if (!isEnabled()) {
107                            return;
108                    }
109    
110                    org.jgroups.Address jGroupsAddress =
111                            (org.jgroups.Address)address.getRealAddress();
112    
113                    JChannel jChannel = getChannel(priority);
114    
115                    try {
116                            jChannel.send(jGroupsAddress, message);
117                    }
118                    catch (Exception e) {
119                            _log.error("Unable to send unicast message " + message, e);
120                    }
121            }
122    
123            public void setClusterForwardMessageListener(
124                    ClusterForwardMessageListener clusterForwardMessageListener) {
125    
126                    _clusterForwardMessageListener = clusterForwardMessageListener;
127            }
128    
129            protected JChannel getChannel(Priority priority) {
130                    int channelIndex =
131                            priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
132    
133                    if (_log.isDebugEnabled()) {
134                            _log.debug(
135                                    "Select channel number " + channelIndex + " for priority " +
136                                            priority);
137                    }
138    
139                    return _transportChannels.get(channelIndex);
140            }
141    
142            @Override
143            protected void initChannels() throws Exception {
144                    Properties transportProperties = PropsUtil.getProperties(
145                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
146    
147                    _channelCount = transportProperties.size();
148    
149                    if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
150                            throw new IllegalArgumentException(
151                                    "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
152                    }
153    
154                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
155                            _channelCount);
156                    _transportChannels = new ArrayList<JChannel>(_channelCount);
157    
158                    List<String> keys = new ArrayList<String>(_channelCount);
159    
160                    for (Object key : transportProperties.keySet()) {
161                            keys.add((String)key);
162                    }
163    
164                    Collections.sort(keys);
165    
166                    for (int i = 0; i < keys.size(); i++) {
167                            String customName = keys.get(i);
168    
169                            String value = transportProperties.getProperty(customName);
170    
171                            JChannel jChannel = createJChannel(
172                                    value,
173                                    new ClusterForwardReceiver(
174                                            _localTransportAddresses, _clusterForwardMessageListener),
175                                            _LIFERAY_TRANSPORT_CHANNEL + i);
176    
177                            _localTransportAddresses.add(jChannel.getAddress());
178                            _transportChannels.add(jChannel);
179                    }
180            }
181    
182            private static final String _LIFERAY_TRANSPORT_CHANNEL =
183                    "LIFERAY-TRANSPORT-CHANNEL-";
184    
185            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
186    
187            private int _channelCount;
188            private ClusterForwardMessageListener _clusterForwardMessageListener;
189            private List<org.jgroups.Address> _localTransportAddresses;
190            private List<JChannel> _transportChannels;
191    
192    }