001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Message;
023 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
024 import com.liferay.portal.kernel.util.PropsKeys;
025 import com.liferay.portal.util.PropsUtil;
026
027 import java.net.InetAddress;
028
029 import java.util.ArrayList;
030 import java.util.Collections;
031 import java.util.List;
032 import java.util.Properties;
033
034 import org.jgroups.JChannel;
035
036
039 @DoPrivileged
040 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
041
042 @Override
043 public void destroy() {
044 if (!isEnabled()) {
045 return;
046 }
047
048 for (JChannel jChannel : _transportJChannels) {
049 jChannel.close();
050 }
051 }
052
053 @Override
054 public InetAddress getBindInetAddress() {
055 JChannel jChannel = _transportJChannels.get(0);
056
057 return getBindInetAddress(jChannel);
058 }
059
060 @Override
061 public List<Address> getLocalTransportAddresses() {
062 if (!isEnabled()) {
063 return Collections.emptyList();
064 }
065
066 List<Address> addresses = new ArrayList<Address>(
067 _localTransportAddresses.size());
068
069 for (org.jgroups.Address address : _localTransportAddresses) {
070 addresses.add(new AddressImpl(address));
071 }
072
073 return addresses;
074 }
075
076 @Override
077 public List<Address> getTransportAddresses(Priority priority) {
078 if (!isEnabled()) {
079 return Collections.emptyList();
080 }
081
082 JChannel jChannel = getChannel(priority);
083
084 return getAddresses(jChannel);
085 }
086
087 @Override
088 public void initialize() {
089 if (!isEnabled()) {
090 return;
091 }
092
093 try {
094 initChannels();
095 }
096 catch (Exception e) {
097 if (_log.isErrorEnabled()) {
098 _log.error("Unable to initialize channels", e);
099 }
100
101 throw new IllegalStateException(e);
102 }
103
104 for (JChannel jChannel : _transportJChannels) {
105 BaseReceiver baseReceiver = (BaseReceiver)jChannel.getReceiver();
106
107 baseReceiver.openLatch();
108 }
109 }
110
111 @Override
112 public void sendMulticastMessage(Message message, Priority priority) {
113 if (!isEnabled()) {
114 return;
115 }
116
117 JChannel jChannel = getChannel(priority);
118
119 try {
120 jChannel.send(null, message);
121 }
122 catch (Exception e) {
123 _log.error("Unable to send multicast message " + message, e);
124 }
125 }
126
127 @Override
128 public void sendUnicastMessage(
129 Address address, Message message, Priority priority) {
130
131 if (!isEnabled()) {
132 return;
133 }
134
135 org.jgroups.Address jGroupsAddress =
136 (org.jgroups.Address)address.getRealAddress();
137
138 JChannel jChannel = getChannel(priority);
139
140 try {
141 jChannel.send(jGroupsAddress, message);
142 }
143 catch (Exception e) {
144 _log.error("Unable to send unicast message " + message, e);
145 }
146 }
147
148 protected JChannel getChannel(Priority priority) {
149 int channelIndex =
150 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
151
152 if (_log.isDebugEnabled()) {
153 _log.debug(
154 "Select channel number " + channelIndex + " for priority " +
155 priority);
156 }
157
158 return _transportJChannels.get(channelIndex);
159 }
160
161 protected void initChannels() throws Exception {
162 Properties transportProperties = PropsUtil.getProperties(
163 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
164
165 _channelCount = transportProperties.size();
166
167 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
168 throw new IllegalArgumentException(
169 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
170 }
171
172 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
173 _channelCount);
174 _transportJChannels = new ArrayList<JChannel>(_channelCount);
175
176 List<String> keys = new ArrayList<String>(_channelCount);
177
178 for (Object key : transportProperties.keySet()) {
179 keys.add((String)key);
180 }
181
182 Collections.sort(keys);
183
184 for (int i = 0; i < keys.size(); i++) {
185 String customName = keys.get(i);
186
187 String value = transportProperties.getProperty(customName);
188
189 JChannel jChannel = createJChannel(
190 value, new ClusterForwardReceiver(_localTransportAddresses),
191 _LIFERAY_TRANSPORT_CHANNEL + i);
192
193 _localTransportAddresses.add(jChannel.getAddress());
194 _transportJChannels.add(jChannel);
195 }
196 }
197
198 private static final String _LIFERAY_TRANSPORT_CHANNEL =
199 "LIFERAY-TRANSPORT-CHANNEL-";
200
201 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
202
203 private int _channelCount;
204 private List<org.jgroups.Address> _localTransportAddresses;
205 private List<JChannel> _transportJChannels;
206
207 }