1
22
23 package com.liferay.portal.cluster;
24
25 import com.liferay.portal.kernel.cluster.Address;
26 import com.liferay.portal.kernel.cluster.ClusterLink;
27 import com.liferay.portal.kernel.cluster.Priority;
28 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
29 import com.liferay.portal.kernel.log.Log;
30 import com.liferay.portal.kernel.log.LogFactoryUtil;
31 import com.liferay.portal.kernel.messaging.Message;
32 import com.liferay.portal.kernel.util.GetterUtil;
33 import com.liferay.portal.kernel.util.PropsKeys;
34 import com.liferay.portal.kernel.util.SocketUtil;
35 import com.liferay.portal.kernel.util.StringPool;
36 import com.liferay.portal.kernel.util.Validator;
37 import com.liferay.portal.util.PropsUtil;
38 import com.liferay.portal.util.PropsValues;
39
40 import java.io.IOException;
41
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.Properties;
46 import java.util.Vector;
47
48 import org.jgroups.ChannelException;
49 import org.jgroups.JChannel;
50 import org.jgroups.ReceiverAdapter;
51 import org.jgroups.View;
52
53
58 public class ClusterLinkImpl implements ClusterLink {
59
60 public void afterPropertiesSet() {
61 if (!PropsValues.CLUSTER_LINK_ENABLED) {
62 return;
63 }
64
65 initSystemProperties();
66
67 try {
68 initBindAddress();
69 }
70 catch (IOException ioe) {
71 if (_log.isWarnEnabled()) {
72 _log.warn("Failed to initialize outgoing IP address", ioe);
73 }
74 }
75
76 try {
77 initChannels();
78 }
79 catch (Exception e) {
80 _log.error(e, e);
81 }
82 }
83
84 public void destory() {
85 if (!PropsValues.CLUSTER_LINK_ENABLED) {
86 return;
87 }
88
89 for (JChannel channel : _channels) {
90 channel.close();
91 }
92 }
93
94 public List<Address> getAddresses() {
95 if (!PropsValues.CLUSTER_LINK_ENABLED) {
96 return Collections.EMPTY_LIST;
97 }
98
99 Vector<org.jgroups.Address> jGroupsAddresses =
100 _channels.get(0).getView().getMembers();
101
102 if (jGroupsAddresses == null) {
103 return new ArrayList<Address>();
104 }
105
106 List<Address> addresses = new ArrayList<Address>(
107 jGroupsAddresses.size());
108
109 for (org.jgroups.Address address : jGroupsAddresses) {
110 addresses.add(new AddressImpl(address));
111 }
112
113 return addresses;
114 }
115
116 public boolean isEnabled() {
117 return PropsValues.CLUSTER_LINK_ENABLED;
118 }
119
120 public void sendMulticastMessage(Message message, Priority priority) {
121 if (!PropsValues.CLUSTER_LINK_ENABLED) {
122 return;
123 }
124
125 JChannel channel = getChannel(priority);
126
127 try {
128 channel.send(null, null, message);
129 }
130 catch (ChannelException ce) {
131 _log.error("Unable to send multicast message " + message, ce);
132 }
133 }
134
135 public void sendUnicastMessage(
136 Address address, Message message, Priority priority) {
137
138 if (!PropsValues.CLUSTER_LINK_ENABLED) {
139 return;
140 }
141
142 org.jgroups.Address jGroupsAddress =
143 (org.jgroups.Address)address.getRealAddress();
144
145 JChannel channel = getChannel(priority);
146
147 try {
148 channel.send(jGroupsAddress, null, message);
149 }
150 catch (ChannelException ex) {
151 _log.error("Unable to send multicast message:" + message, ex);
152 }
153 }
154
155 public void setClusterForwardMessageListener(
156 ClusterForwardMessageListener clusterForwardMessageListener) {
157
158 _clusterForwardMessageListener = clusterForwardMessageListener;
159 }
160
161 protected JChannel createChannel(int index, String properties)
162 throws ChannelException {
163
164 JChannel channel = new JChannel(properties);
165
166 channel.setReceiver(
167 new ReceiverAdapter() {
168
169 public void receive(org.jgroups.Message message) {
170 if ((!_addresses.contains(message.getSrc())) ||
171 (message.getDest() != null)) {
172
173 _clusterForwardMessageListener.receive(
174 (Message)message.getObject());
175 }
176 else {
177 if (_log.isDebugEnabled()) {
178 _log.debug("Block received message " + message);
179 }
180 }
181 }
182
183 public void viewAccepted(View view) {
184 if (_log.isDebugEnabled()) {
185 _log.debug("Cluster link accepted view " + view);
186 }
187 }
188
189 }
190 );
191
192 channel.connect(_LIFERAY_CHANNEL + index);
193
194 if (_log.isInfoEnabled()) {
195 _log.info(
196 "Create a new channel with properties " +
197 channel.getProperties());
198 }
199
200 return channel;
201 }
202
203 protected JChannel getChannel(Priority priority) {
204 int channelIndex =
205 priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
206
207 if (_log.isDebugEnabled()) {
208 _log.debug(
209 "Select channel number " + channelIndex + " for priority " +
210 priority);
211 }
212
213 return _channels.get(channelIndex);
214 }
215
216 protected void initBindAddress() throws IOException {
217 String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
218
219 if (Validator.isNull(autodetectAddress)) {
220 return;
221 }
222
223 String host = autodetectAddress;
224 int port = 80;
225
226 int index = autodetectAddress.indexOf(StringPool.COLON);
227
228 if (index != -1) {
229 host = autodetectAddress.substring(0, index);
230 port = GetterUtil.getInteger(
231 autodetectAddress.substring(index + 1), port);
232 }
233
234 String bindAddress = SocketUtil.getHostAddress(host, port);
235
236 System.setProperty("jgroups.bind_addr", bindAddress);
237
238 if (_log.isInfoEnabled()) {
239 _log.info(
240 "Set JGroups outgoing IP address to " + bindAddress + "}");
241 }
242 }
243
244 protected void initChannels() throws ChannelException {
245 Properties properties = PropsUtil.getProperties(
246 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES, true);
247
248 _channelCount = properties.size();
249
250 if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
251 throw new IllegalArgumentException(
252 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
253 }
254
255 _addresses = new ArrayList<org.jgroups.Address>(_channelCount);
256 _channels = new ArrayList<JChannel>(_channelCount);
257
258 List<String> keys = new ArrayList<String>(_channelCount);
259
260 for (Object key : properties.keySet()) {
261 keys.add((String)key);
262 }
263
264 Collections.sort(keys);
265
266 for (int i = 0; i < keys.size(); i++) {
267 String customName = keys.get(i);
268
269 String value = properties.getProperty(customName);
270
271 JChannel channel = createChannel(i, value);
272
273 _addresses.add(channel.getLocalAddress());
274 _channels.add(channel);
275 }
276 }
277
278 protected void initSystemProperties() {
279 for (String systemProperty :
280 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
281
282 int index = systemProperty.indexOf(StringPool.COLON);
283
284 if (index == -1) {
285 continue;
286 }
287
288 String key = systemProperty.substring(0, index);
289 String value = systemProperty.substring(index + 1);
290
291 System.setProperty(key, value);
292
293 if (_log.isDebugEnabled()) {
294 _log.debug(
295 "Setting system property {key=" + key + ", value=" +
296 value + "}");
297 }
298 }
299 }
300
301 private static final String _LIFERAY_CHANNEL = "LIFERAY-CHANNEL-";
302
303 private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
304
305 private static final Log _log =
306 LogFactoryUtil.getLog(ClusterLinkImpl.class);
307
308 private List<org.jgroups.Address> _addresses;
309 private int _channelCount;
310 private List<JChannel> _channels;
311 private ClusterForwardMessageListener _clusterForwardMessageListener;
312
313 }