001
014
015 package com.liferay.util.transport;
016
017 import java.io.IOException;
018
019 import java.net.DatagramPacket;
020 import java.net.InetAddress;
021 import java.net.MulticastSocket;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026
035 public class MulticastTransport extends Thread implements Transport {
036
037 public MulticastTransport(DatagramHandler handler, String host, int port) {
038 super("MulticastListener-" + host + port);
039
040 setDaemon(true);
041 _handler = handler;
042 _host = host;
043 _port = port;
044 }
045
046 public synchronized void connect() throws IOException {
047 if (_socket == null) {
048 _socket = new MulticastSocket(_port);
049 }
050 else if (_socket.isConnected() && _socket.isBound()) {
051 return;
052 }
053
054 _address = InetAddress.getByName(_host);
055
056 _socket.joinGroup(_address);
057
058 _connected = true;
059
060 start();
061 }
062
063 public synchronized void disconnect() {
064
065
066
067 if (_address != null) {
068 try {
069 _socket.leaveGroup(_address);
070 _address = null;
071 }
072 catch (IOException e) {
073 _log.error("Unable to leave group", e);
074 }
075 }
076
077 _connected = false;
078
079 interrupt();
080
081 _socket.close();
082 }
083
084 public synchronized void sendMessage(String msg) throws IOException {
085 _outboundPacket.setData(msg.getBytes());
086 _outboundPacket.setAddress(_address);
087 _outboundPacket.setPort(_port);
088
089 _socket.send(_outboundPacket);
090 }
091
092 public boolean isConnected() {
093 return _connected;
094 }
095
096 @Override
097 public void run() {
098 try {
099 while (_connected) {
100 _socket.receive(_inboundPacket);
101 _handler.process(_inboundPacket);
102 }
103 }
104 catch (IOException e) {
105 _log.error("Unable to process ", e);
106
107 _socket.disconnect();
108
109 _connected = false;
110
111 _handler.errorReceived(e);
112 }
113 }
114
115 private static Log _log = LogFactory.getLog(MulticastTransport.class);
116
117 private byte[] _inboundBuffer = new byte[4096];
118 private DatagramPacket _inboundPacket =
119 new DatagramPacket(_inboundBuffer, _inboundBuffer.length);
120 private byte[] _outboundBuffer = new byte[4096];
121 private DatagramPacket _outboundPacket =
122 new DatagramPacket(_outboundBuffer, _outboundBuffer.length);
123 private String _host;
124 private DatagramHandler _handler;
125 private int _port;
126 private boolean _connected;
127 private MulticastSocket _socket;
128 private InetAddress _address;
129
130 }