001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.util.PropsUtil;
027
028 import java.net.InetAddress;
029
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.List;
033 import java.util.Properties;
034
035 import org.jgroups.JChannel;
036
037
040 @DoPrivileged
041 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
042
043 @Override
044 public void destroy() {
045 if (!isEnabled()) {
046 return;
047 }
048
049 for (JChannel jChannel : _transportChannels) {
050 jChannel.close();
051 }
052 }
053
054 @Override
055 public InetAddress getBindInetAddress() {
056 return bindInetAddress;
057 }
058
059 @Override
060 public List<Address> getLocalTransportAddresses() {
061 if (!isEnabled()) {
062 return Collections.emptyList();
063 }
064
065 List<Address> addresses = new ArrayList<Address>(
066 _localTransportAddresses.size());
067
068 for (org.jgroups.Address address : _localTransportAddresses) {
069 addresses.add(new AddressImpl(address));
070 }
071
072 return addresses;
073 }
074
075 @Override
076 public List<Address> getTransportAddresses(Priority priority) {
077 if (!isEnabled()) {
078 return Collections.emptyList();
079 }
080
081 JChannel jChannel = getChannel(priority);
082
083 return getAddresses(jChannel);
084 }
085
086 @Override
087 public void sendMulticastMessage(Message message, Priority priority) {
088 if (!isEnabled()) {
089 return;
090 }
091
092 JChannel jChannel = getChannel(priority);
093
094 try {
095 jChannel.send(null, message);
096 }
097 catch (Exception e) {
098 _log.error("Unable to send multicast message " + message, e);
099 }
100 }
101
102 @Override
103 public void sendUnicastMessage(
104 Address address, Message message, Priority priority) {
105
106 if (!isEnabled()) {
107 return;
108 }
109
110 org.jgroups.Address jGroupsAddress =
111 (org.jgroups.Address)address.getRealAddress();
112
113 JChannel jChannel = getChannel(priority);
114
115 try {
116 jChannel.send(jGroupsAddress, message);
117 }
118 catch (Exception e) {
119 _log.error("Unable to send unicast message " + message, e);
120 }
121 }
122
123 public void setClusterForwardMessageListener(
124 ClusterForwardMessageListener clusterForwardMessageListener) {
125
126 _clusterForwardMessageListener = clusterForwardMessageListener;
127 }
128
129 protected JChannel getChannel(Priority priority) {
130 int channelIndex =
131 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
132
133 if (_log.isDebugEnabled()) {
134 _log.debug(
135 "Select channel number " + channelIndex + " for priority " +
136 priority);
137 }
138
139 return _transportChannels.get(channelIndex);
140 }
141
142 @Override
143 protected void initChannels() throws Exception {
144 Properties transportProperties = PropsUtil.getProperties(
145 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
146
147 _channelCount = transportProperties.size();
148
149 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
150 throw new IllegalArgumentException(
151 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
152 }
153
154 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
155 _channelCount);
156 _transportChannels = new ArrayList<JChannel>(_channelCount);
157
158 List<String> keys = new ArrayList<String>(_channelCount);
159
160 for (Object key : transportProperties.keySet()) {
161 keys.add((String)key);
162 }
163
164 Collections.sort(keys);
165
166 for (int i = 0; i < keys.size(); i++) {
167 String customName = keys.get(i);
168
169 String value = transportProperties.getProperty(customName);
170
171 JChannel jChannel = createJChannel(
172 value,
173 new ClusterForwardReceiver(
174 _localTransportAddresses, _clusterForwardMessageListener),
175 _LIFERAY_TRANSPORT_CHANNEL + i);
176
177 _localTransportAddresses.add(jChannel.getAddress());
178 _transportChannels.add(jChannel);
179 }
180 }
181
182 private static final String _LIFERAY_TRANSPORT_CHANNEL =
183 "LIFERAY-TRANSPORT-CHANNEL-";
184
185 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
186
187 private int _channelCount;
188 private ClusterForwardMessageListener _clusterForwardMessageListener;
189 private List<org.jgroups.Address> _localTransportAddresses;
190 private List<JChannel> _transportChannels;
191
192 }