001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.messaging.Message;
023    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
024    import com.liferay.portal.kernel.util.PropsKeys;
025    import com.liferay.portal.util.PropsUtil;
026    
027    import java.net.InetAddress;
028    
029    import java.util.ArrayList;
030    import java.util.Collections;
031    import java.util.List;
032    import java.util.Properties;
033    
034    import org.jgroups.JChannel;
035    
036    /**
037     * @author Shuyang Zhou
038     */
039    @DoPrivileged
040    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
041    
042            @Override
043            public void destroy() {
044                    if (!isEnabled()) {
045                            return;
046                    }
047    
048                    for (JChannel jChannel : _transportJChannels) {
049                            jChannel.close();
050                    }
051            }
052    
053            @Override
054            public InetAddress getBindInetAddress() {
055                    JChannel jChannel = _transportJChannels.get(0);
056    
057                    return getBindInetAddress(jChannel);
058            }
059    
060            @Override
061            public List<Address> getLocalTransportAddresses() {
062                    if (!isEnabled()) {
063                            return Collections.emptyList();
064                    }
065    
066                    List<Address> addresses = new ArrayList<Address>(
067                            _localTransportAddresses.size());
068    
069                    for (org.jgroups.Address address : _localTransportAddresses) {
070                            addresses.add(new AddressImpl(address));
071                    }
072    
073                    return addresses;
074            }
075    
076            @Override
077            public List<Address> getTransportAddresses(Priority priority) {
078                    if (!isEnabled()) {
079                            return Collections.emptyList();
080                    }
081    
082                    JChannel jChannel = getChannel(priority);
083    
084                    return getAddresses(jChannel);
085            }
086    
087            @Override
088            public void initialize() {
089                    if (!isEnabled()) {
090                            return;
091                    }
092    
093                    try {
094                            initChannels();
095                    }
096                    catch (Exception e) {
097                            if (_log.isErrorEnabled()) {
098                                    _log.error("Unable to initialize channels", e);
099                            }
100    
101                            throw new IllegalStateException(e);
102                    }
103    
104                    for (JChannel jChannel : _transportJChannels) {
105                            BaseReceiver baseReceiver = (BaseReceiver)jChannel.getReceiver();
106    
107                            baseReceiver.openLatch();
108                    }
109            }
110    
111            @Override
112            public void sendMulticastMessage(Message message, Priority priority) {
113                    if (!isEnabled()) {
114                            return;
115                    }
116    
117                    JChannel jChannel = getChannel(priority);
118    
119                    try {
120                            jChannel.send(null, message);
121                    }
122                    catch (Exception e) {
123                            _log.error("Unable to send multicast message " + message, e);
124                    }
125            }
126    
127            @Override
128            public void sendUnicastMessage(
129                    Address address, Message message, Priority priority) {
130    
131                    if (!isEnabled()) {
132                            return;
133                    }
134    
135                    org.jgroups.Address jGroupsAddress =
136                            (org.jgroups.Address)address.getRealAddress();
137    
138                    JChannel jChannel = getChannel(priority);
139    
140                    try {
141                            jChannel.send(jGroupsAddress, message);
142                    }
143                    catch (Exception e) {
144                            _log.error("Unable to send unicast message " + message, e);
145                    }
146            }
147    
148            protected JChannel getChannel(Priority priority) {
149                    int channelIndex =
150                            priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
151    
152                    if (_log.isDebugEnabled()) {
153                            _log.debug(
154                                    "Select channel number " + channelIndex + " for priority " +
155                                            priority);
156                    }
157    
158                    return _transportJChannels.get(channelIndex);
159            }
160    
161            protected void initChannels() throws Exception {
162                    Properties transportProperties = PropsUtil.getProperties(
163                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
164    
165                    _channelCount = transportProperties.size();
166    
167                    if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
168                            throw new IllegalArgumentException(
169                                    "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
170                    }
171    
172                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
173                            _channelCount);
174                    _transportJChannels = new ArrayList<JChannel>(_channelCount);
175    
176                    List<String> keys = new ArrayList<String>(_channelCount);
177    
178                    for (Object key : transportProperties.keySet()) {
179                            keys.add((String)key);
180                    }
181    
182                    Collections.sort(keys);
183    
184                    for (int i = 0; i < keys.size(); i++) {
185                            String customName = keys.get(i);
186    
187                            String value = transportProperties.getProperty(customName);
188    
189                            JChannel jChannel = createJChannel(
190                                    value, new ClusterForwardReceiver(_localTransportAddresses),
191                                    _LIFERAY_TRANSPORT_CHANNEL + i);
192    
193                            _localTransportAddresses.add(jChannel.getAddress());
194                            _transportJChannels.add(jChannel);
195                    }
196            }
197    
198            private static final String _LIFERAY_TRANSPORT_CHANNEL =
199                    "LIFERAY-TRANSPORT-CHANNEL-";
200    
201            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
202    
203            private int _channelCount;
204            private List<org.jgroups.Address> _localTransportAddresses;
205            private List<JChannel> _transportJChannels;
206    
207    }