1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    *
5    *
6    *
7    * The contents of this file are subject to the terms of the Liferay Enterprise
8    * Subscription License ("License"). You may not use this file except in
9    * compliance with the License. You can obtain a copy of the License by
10   * contacting Liferay, Inc. See the License for the specific language governing
11   * permissions and limitations under the License, including but not limited to
12   * distribution rights of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.cluster;
24  
25  import com.liferay.portal.kernel.cluster.Address;
26  import com.liferay.portal.kernel.cluster.ClusterLink;
27  import com.liferay.portal.kernel.cluster.Priority;
28  import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
29  import com.liferay.portal.kernel.log.Log;
30  import com.liferay.portal.kernel.log.LogFactoryUtil;
31  import com.liferay.portal.kernel.messaging.Message;
32  import com.liferay.portal.kernel.util.GetterUtil;
33  import com.liferay.portal.kernel.util.PropsKeys;
34  import com.liferay.portal.kernel.util.SocketUtil;
35  import com.liferay.portal.kernel.util.StringPool;
36  import com.liferay.portal.kernel.util.Validator;
37  import com.liferay.portal.util.PropsUtil;
38  import com.liferay.portal.util.PropsValues;
39  
40  import java.io.IOException;
41  
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.Properties;
46  import java.util.Vector;
47  
48  import org.jgroups.ChannelException;
49  import org.jgroups.JChannel;
50  import org.jgroups.ReceiverAdapter;
51  import org.jgroups.View;
52  
53  /**
54   * <a href="ClusterLinkImpl.java.html"><b><i>View Source</i></b></a>
55   *
56   * @author Shuyang Zhou
57   */
58  public class ClusterLinkImpl implements ClusterLink {
59  
60      public void afterPropertiesSet() {
61          if (!PropsValues.CLUSTER_LINK_ENABLED) {
62              return;
63          }
64  
65          initSystemProperties();
66  
67          try {
68              initBindAddress();
69          }
70          catch (IOException ioe) {
71              if (_log.isWarnEnabled()) {
72                  _log.warn("Failed to initialize outgoing IP address", ioe);
73              }
74          }
75  
76          try {
77              initChannels();
78          }
79          catch (Exception e) {
80              _log.error(e, e);
81          }
82      }
83  
84      public void destory() {
85          if (!PropsValues.CLUSTER_LINK_ENABLED) {
86              return;
87          }
88  
89          for (JChannel channel : _channels) {
90              channel.close();
91          }
92      }
93  
94      public List<Address> getAddresses() {
95          if (!PropsValues.CLUSTER_LINK_ENABLED) {
96              return Collections.EMPTY_LIST;
97          }
98  
99          Vector<org.jgroups.Address> jGroupsAddresses =
100             _channels.get(0).getView().getMembers();
101 
102         if (jGroupsAddresses == null) {
103             return new ArrayList<Address>();
104         }
105 
106         List<Address> addresses = new ArrayList<Address>(
107             jGroupsAddresses.size());
108 
109         for (org.jgroups.Address address : jGroupsAddresses) {
110             addresses.add(new AddressImpl(address));
111         }
112 
113         return addresses;
114     }
115 
116     public boolean isEnabled() {
117         return PropsValues.CLUSTER_LINK_ENABLED;
118     }
119 
120     public void sendMulticastMessage(Message message, Priority priority) {
121         if (!PropsValues.CLUSTER_LINK_ENABLED) {
122             return;
123         }
124 
125         JChannel channel = getChannel(priority);
126 
127         try {
128             channel.send(null, null, message);
129         }
130         catch (ChannelException ce) {
131             _log.error("Unable to send multicast message " + message, ce);
132         }
133     }
134 
135     public void sendUnicastMessage(
136         Address address, Message message, Priority priority) {
137 
138         if (!PropsValues.CLUSTER_LINK_ENABLED) {
139             return;
140         }
141 
142         org.jgroups.Address jGroupsAddress =
143             (org.jgroups.Address)address.getRealAddress();
144 
145         JChannel channel = getChannel(priority);
146 
147         try {
148             channel.send(jGroupsAddress, null, message);
149         }
150         catch (ChannelException ex) {
151             _log.error("Unable to send multicast message:" + message, ex);
152         }
153     }
154 
155     public void setClusterForwardMessageListener(
156         ClusterForwardMessageListener clusterForwardMessageListener) {
157 
158         _clusterForwardMessageListener = clusterForwardMessageListener;
159     }
160 
161     protected JChannel createChannel(int index, String properties)
162         throws ChannelException {
163 
164         JChannel channel = new JChannel(properties);
165 
166         channel.setReceiver(
167             new ReceiverAdapter() {
168 
169                 public void receive(org.jgroups.Message message) {
170                     if ((!_addresses.contains(message.getSrc())) ||
171                         (message.getDest() != null)) {
172 
173                         _clusterForwardMessageListener.receive(
174                             (Message)message.getObject());
175                     }
176                     else {
177                         if (_log.isDebugEnabled()) {
178                             _log.debug("Block received message " + message);
179                         }
180                     }
181                 }
182 
183                 public void viewAccepted(View view) {
184                     if (_log.isDebugEnabled()) {
185                         _log.debug("Cluster link accepted view " + view);
186                     }
187                 }
188 
189             }
190         );
191 
192         channel.connect(_LIFERAY_CHANNEL + index);
193 
194         if (_log.isInfoEnabled()) {
195             _log.info(
196                 "Create a new channel with properties " +
197                     channel.getProperties());
198         }
199 
200         return channel;
201     }
202 
203     protected JChannel getChannel(Priority priority) {
204         int channelIndex =
205             priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
206 
207         if (_log.isDebugEnabled()) {
208             _log.debug(
209                 "Select channel number " + channelIndex + " for priority " +
210                     priority);
211         }
212 
213         return _channels.get(channelIndex);
214     }
215 
216     protected void initBindAddress() throws IOException {
217         String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
218 
219         if (Validator.isNull(autodetectAddress)) {
220             return;
221         }
222 
223         String host = autodetectAddress;
224         int port = 80;
225 
226         int index = autodetectAddress.indexOf(StringPool.COLON);
227 
228         if (index != -1) {
229             host = autodetectAddress.substring(0, index);
230             port = GetterUtil.getInteger(
231                 autodetectAddress.substring(index + 1), port);
232         }
233 
234         String bindAddress = SocketUtil.getHostAddress(host, port);
235 
236         System.setProperty("jgroups.bind_addr", bindAddress);
237 
238         if (_log.isInfoEnabled()) {
239             _log.info(
240                 "Set JGroups outgoing IP address to " + bindAddress + "}");
241         }
242     }
243 
244     protected void initChannels() throws ChannelException {
245         Properties properties = PropsUtil.getProperties(
246             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES, true);
247 
248         _channelCount = properties.size();
249 
250         if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
251             throw new IllegalArgumentException(
252                 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
253         }
254 
255         _addresses = new ArrayList<org.jgroups.Address>(_channelCount);
256         _channels = new ArrayList<JChannel>(_channelCount);
257 
258         List<String> keys = new ArrayList<String>(_channelCount);
259 
260         for (Object key : properties.keySet()) {
261             keys.add((String)key);
262         }
263 
264         Collections.sort(keys);
265 
266         for (int i = 0; i < keys.size(); i++) {
267             String customName = keys.get(i);
268 
269             String value = properties.getProperty(customName);
270 
271             JChannel channel = createChannel(i, value);
272 
273             _addresses.add(channel.getLocalAddress());
274             _channels.add(channel);
275         }
276     }
277 
278     protected void initSystemProperties() {
279         for (String systemProperty :
280                 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
281 
282             int index = systemProperty.indexOf(StringPool.COLON);
283 
284             if (index == -1) {
285                 continue;
286             }
287 
288             String key = systemProperty.substring(0, index);
289             String value = systemProperty.substring(index + 1);
290 
291             System.setProperty(key, value);
292 
293             if (_log.isDebugEnabled()) {
294                 _log.debug(
295                     "Setting system property {key=" + key + ", value=" +
296                         value + "}");
297             }
298         }
299     }
300 
301     private static final String _LIFERAY_CHANNEL = "LIFERAY-CHANNEL-";
302 
303     private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
304 
305     private static final Log _log =
306         LogFactoryUtil.getLog(ClusterLinkImpl.class);
307 
308     private List<org.jgroups.Address> _addresses;
309     private int _channelCount;
310     private List<JChannel> _channels;
311     private ClusterForwardMessageListener _clusterForwardMessageListener;
312 
313 }