001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.util.PropsUtil;
027
028 import java.net.InetAddress;
029
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.List;
033 import java.util.Properties;
034
035 import org.jgroups.JChannel;
036
037
040 @DoPrivileged
041 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
042
043 @Override
044 public void destroy() {
045 if (!isEnabled()) {
046 return;
047 }
048
049 for (JChannel jChannel : _transportChannels) {
050 jChannel.close();
051 }
052 }
053
054 @Override
055 public InetAddress getBindInetAddress() {
056 return bindInetAddress;
057 }
058
059 @Override
060 public List<Address> getLocalTransportAddresses() {
061 if (!isEnabled()) {
062 return Collections.emptyList();
063 }
064
065 List<Address> addresses = new ArrayList<Address>(
066 _localTransportAddresses.size());
067
068 for (org.jgroups.Address address : _localTransportAddresses) {
069 addresses.add(new AddressImpl(address));
070 }
071
072 return addresses;
073 }
074
075 @Override
076 public List<Address> getTransportAddresses(Priority priority) {
077 if (!isEnabled()) {
078 return Collections.emptyList();
079 }
080
081 JChannel jChannel = getChannel(priority);
082
083 return getAddresses(jChannel);
084 }
085
086 @Override
087 public void initialize() {
088 if (!isEnabled()) {
089 return;
090 }
091
092 for (JChannel jChannel : _transportChannels) {
093 BaseReceiver baseReceiver = (BaseReceiver)jChannel.getReceiver();
094
095 baseReceiver.openLatch();
096 }
097 }
098
099 @Override
100 public void sendMulticastMessage(Message message, Priority priority) {
101 if (!isEnabled()) {
102 return;
103 }
104
105 JChannel jChannel = getChannel(priority);
106
107 try {
108 jChannel.send(null, message);
109 }
110 catch (Exception e) {
111 _log.error("Unable to send multicast message " + message, e);
112 }
113 }
114
115 @Override
116 public void sendUnicastMessage(
117 Address address, Message message, Priority priority) {
118
119 if (!isEnabled()) {
120 return;
121 }
122
123 org.jgroups.Address jGroupsAddress =
124 (org.jgroups.Address)address.getRealAddress();
125
126 JChannel jChannel = getChannel(priority);
127
128 try {
129 jChannel.send(jGroupsAddress, message);
130 }
131 catch (Exception e) {
132 _log.error("Unable to send unicast message " + message, e);
133 }
134 }
135
136 public void setClusterForwardMessageListener(
137 ClusterForwardMessageListener clusterForwardMessageListener) {
138
139 _clusterForwardMessageListener = clusterForwardMessageListener;
140 }
141
142 protected JChannel getChannel(Priority priority) {
143 int channelIndex =
144 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
145
146 if (_log.isDebugEnabled()) {
147 _log.debug(
148 "Select channel number " + channelIndex + " for priority " +
149 priority);
150 }
151
152 return _transportChannels.get(channelIndex);
153 }
154
155 @Override
156 protected void initChannels() throws Exception {
157 Properties transportProperties = PropsUtil.getProperties(
158 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
159
160 _channelCount = transportProperties.size();
161
162 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
163 throw new IllegalArgumentException(
164 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
165 }
166
167 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
168 _channelCount);
169 _transportChannels = new ArrayList<JChannel>(_channelCount);
170
171 List<String> keys = new ArrayList<String>(_channelCount);
172
173 for (Object key : transportProperties.keySet()) {
174 keys.add((String)key);
175 }
176
177 Collections.sort(keys);
178
179 for (int i = 0; i < keys.size(); i++) {
180 String customName = keys.get(i);
181
182 String value = transportProperties.getProperty(customName);
183
184 JChannel jChannel = createJChannel(
185 value,
186 new ClusterForwardReceiver(
187 _localTransportAddresses, _clusterForwardMessageListener),
188 _LIFERAY_TRANSPORT_CHANNEL + i);
189
190 _localTransportAddresses.add(jChannel.getAddress());
191 _transportChannels.add(jChannel);
192 }
193 }
194
195 private static final String _LIFERAY_TRANSPORT_CHANNEL =
196 "LIFERAY-TRANSPORT-CHANNEL-";
197
198 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
199
200 private int _channelCount;
201 private ClusterForwardMessageListener _clusterForwardMessageListener;
202 private List<org.jgroups.Address> _localTransportAddresses;
203 private List<JChannel> _transportChannels;
204
205 }