001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.util.PropsKeys;
025 import com.liferay.portal.util.PropsUtil;
026 import com.liferay.portal.util.PropsValues;
027
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.List;
031 import java.util.Properties;
032
033 import org.jgroups.ChannelException;
034 import org.jgroups.JChannel;
035
036
039 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
040
041 public static final int MAX_CHANNEL_COUNT = Priority.values().length;
042
043 @Override
044 public void destroy() {
045 if (!PropsValues.CLUSTER_LINK_ENABLED) {
046 return;
047 }
048
049 for (JChannel jChannel : _transportChannels) {
050 jChannel.close();
051 }
052 }
053
054 public List<Address> getLocalTransportAddresses() {
055 if (!PropsValues.CLUSTER_LINK_ENABLED) {
056 return Collections.emptyList();
057 }
058
059 List<Address> addresses = new ArrayList<Address>(
060 _localTransportAddresses.size());
061
062 for (org.jgroups.Address address : _localTransportAddresses) {
063 addresses.add(new AddressImpl(address));
064 }
065
066 return addresses;
067 }
068
069 public List<Address> getTransportAddresses(Priority priority) {
070 if (!PropsValues.CLUSTER_LINK_ENABLED) {
071 return Collections.emptyList();
072 }
073
074 JChannel jChannel = getChannel(priority);
075
076 return getAddresses(jChannel);
077 }
078
079 public void sendMulticastMessage(Message message, Priority priority) {
080 if (!PropsValues.CLUSTER_LINK_ENABLED) {
081 return;
082 }
083
084 JChannel jChannel = getChannel(priority);
085
086 try {
087 jChannel.send(null, null, message);
088 }
089 catch (ChannelException ce) {
090 _log.error("Unable to send multicast message " + message, ce);
091 }
092 }
093
094 public void sendUnicastMessage(
095 Address address, Message message, Priority priority) {
096
097 if (!PropsValues.CLUSTER_LINK_ENABLED) {
098 return;
099 }
100
101 org.jgroups.Address jGroupsAddress =
102 (org.jgroups.Address)address.getRealAddress();
103
104 JChannel jChannel = getChannel(priority);
105
106 try {
107 jChannel.send(jGroupsAddress, null, message);
108 }
109 catch (ChannelException ce) {
110 _log.error("Unable to send unicast message " + message, ce);
111 }
112 }
113
114 public void setClusterForwardMessageListener(
115 ClusterForwardMessageListener clusterForwardMessageListener) {
116
117 _clusterForwardMessageListener = clusterForwardMessageListener;
118 }
119
120 protected JChannel getChannel(Priority priority) {
121 int channelIndex =
122 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
123
124 if (_log.isDebugEnabled()) {
125 _log.debug(
126 "Select channel number " + channelIndex + " for priority " +
127 priority);
128 }
129
130 return _transportChannels.get(channelIndex);
131 }
132
133 @Override
134 protected void initChannels() throws ChannelException {
135 Properties transportProperties = PropsUtil.getProperties(
136 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
137
138 _channelCount = transportProperties.size();
139
140 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
141 throw new IllegalArgumentException(
142 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
143 }
144
145 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
146 _channelCount);
147 _transportChannels = new ArrayList<JChannel>(_channelCount);
148
149 List<String> keys = new ArrayList<String>(_channelCount);
150
151 for (Object key : transportProperties.keySet()) {
152 keys.add((String)key);
153 }
154
155 Collections.sort(keys);
156
157 for (int i = 0; i < keys.size(); i++) {
158 String customName = keys.get(i);
159
160 String value = transportProperties.getProperty(customName);
161
162 JChannel jChannel = createJChannel(
163 value,
164 new ClusterForwardReceiver(
165 _localTransportAddresses, _clusterForwardMessageListener),
166 _LIFERAY_TRANSPORT_CHANNEL + i);
167
168 _localTransportAddresses.add(jChannel.getLocalAddress());
169 _transportChannels.add(jChannel);
170 }
171 }
172
173 private static final String _LIFERAY_TRANSPORT_CHANNEL =
174 "LIFERAY-TRANSPORT-CHANNEL-";
175
176 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
177
178 private int _channelCount;
179 private ClusterForwardMessageListener _clusterForwardMessageListener;
180 private List<org.jgroups.Address> _localTransportAddresses;
181 private List<JChannel> _transportChannels;
182
183 }