001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.util.PropsUtil;
027
028 import java.net.InetAddress;
029
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.List;
033 import java.util.Properties;
034
035 import org.jgroups.JChannel;
036
037
040 @DoPrivileged
041 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
042
043 @Override
044 public void destroy() {
045 if (!isEnabled()) {
046 return;
047 }
048
049 for (JChannel jChannel : _transportJChannels) {
050 jChannel.close();
051 }
052 }
053
054 @Override
055 public InetAddress getBindInetAddress() {
056 JChannel jChannel = _transportJChannels.get(0);
057
058 return getBindInetAddress(jChannel);
059 }
060
061 @Override
062 public List<Address> getLocalTransportAddresses() {
063 if (!isEnabled()) {
064 return Collections.emptyList();
065 }
066
067 List<Address> addresses = new ArrayList<Address>(
068 _localTransportAddresses.size());
069
070 for (org.jgroups.Address address : _localTransportAddresses) {
071 addresses.add(new AddressImpl(address));
072 }
073
074 return addresses;
075 }
076
077 @Override
078 public List<Address> getTransportAddresses(Priority priority) {
079 if (!isEnabled()) {
080 return Collections.emptyList();
081 }
082
083 JChannel jChannel = getChannel(priority);
084
085 return getAddresses(jChannel);
086 }
087
088 @Override
089 public void sendMulticastMessage(Message message, Priority priority) {
090 if (!isEnabled()) {
091 return;
092 }
093
094 JChannel jChannel = getChannel(priority);
095
096 try {
097 jChannel.send(null, message);
098 }
099 catch (Exception e) {
100 _log.error("Unable to send multicast message " + message, e);
101 }
102 }
103
104 @Override
105 public void sendUnicastMessage(
106 Address address, Message message, Priority priority) {
107
108 if (!isEnabled()) {
109 return;
110 }
111
112 org.jgroups.Address jGroupsAddress =
113 (org.jgroups.Address)address.getRealAddress();
114
115 JChannel jChannel = getChannel(priority);
116
117 try {
118 jChannel.send(jGroupsAddress, message);
119 }
120 catch (Exception e) {
121 _log.error("Unable to send unicast message " + message, e);
122 }
123 }
124
125 public void setClusterForwardMessageListener(
126 ClusterForwardMessageListener clusterForwardMessageListener) {
127
128 _clusterForwardMessageListener = clusterForwardMessageListener;
129 }
130
131 protected JChannel getChannel(Priority priority) {
132 int channelIndex =
133 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
134
135 if (_log.isDebugEnabled()) {
136 _log.debug(
137 "Select channel number " + channelIndex + " for priority " +
138 priority);
139 }
140
141 return _transportJChannels.get(channelIndex);
142 }
143
144 @Override
145 protected void initChannels() throws Exception {
146 Properties transportProperties = PropsUtil.getProperties(
147 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
148
149 _channelCount = transportProperties.size();
150
151 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
152 throw new IllegalArgumentException(
153 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
154 }
155
156 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
157 _channelCount);
158 _transportJChannels = new ArrayList<JChannel>(_channelCount);
159
160 List<String> keys = new ArrayList<String>(_channelCount);
161
162 for (Object key : transportProperties.keySet()) {
163 keys.add((String)key);
164 }
165
166 Collections.sort(keys);
167
168 for (int i = 0; i < keys.size(); i++) {
169 String customName = keys.get(i);
170
171 String value = transportProperties.getProperty(customName);
172
173 JChannel jChannel = createJChannel(
174 value,
175 new ClusterForwardReceiver(
176 _localTransportAddresses, _clusterForwardMessageListener),
177 _LIFERAY_TRANSPORT_CHANNEL + i);
178
179 _localTransportAddresses.add(jChannel.getAddress());
180 _transportJChannels.add(jChannel);
181 }
182 }
183
184 private static final String _LIFERAY_TRANSPORT_CHANNEL =
185 "LIFERAY-TRANSPORT-CHANNEL-";
186
187 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
188
189 private int _channelCount;
190 private ClusterForwardMessageListener _clusterForwardMessageListener;
191 private List<org.jgroups.Address> _localTransportAddresses;
192 private List<JChannel> _transportJChannels;
193
194 }