001
014
015 package com.liferay.portal.fabric.netty.handlers;
016
017 import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
019 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
020 import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
021 import com.liferay.portal.fabric.netty.repository.NettyRepository;
022 import com.liferay.portal.fabric.repository.Repository;
023 import com.liferay.portal.kernel.log.Log;
024 import com.liferay.portal.kernel.log.LogFactoryUtil;
025 import com.liferay.portal.kernel.util.CharPool;
026 import com.liferay.portal.kernel.util.StringUtil;
027
028 import io.netty.channel.Channel;
029 import io.netty.channel.ChannelFuture;
030 import io.netty.channel.ChannelFutureListener;
031 import io.netty.channel.ChannelHandlerContext;
032 import io.netty.channel.ChannelPipeline;
033 import io.netty.channel.SimpleChannelInboundHandler;
034 import io.netty.util.concurrent.EventExecutorGroup;
035
036 import java.io.IOException;
037
038 import java.net.SocketAddress;
039
040 import java.nio.file.Files;
041 import java.nio.file.Path;
042 import java.nio.file.Paths;
043
044
047 public class NettyFabricAgentRegistrationChannelHandler
048 extends SimpleChannelInboundHandler<NettyFabricAgentConfig> {
049
050 public NettyFabricAgentRegistrationChannelHandler(
051 FabricAgentRegistry fabricAgentRegistry, Path repositoryParentPath,
052 EventExecutorGroup eventExecutorGroup, long getFileTimeout,
053 long rpcRelayTimeout, long startupTimeout) {
054
055 if (fabricAgentRegistry == null) {
056 throw new NullPointerException("Fabric agent registry is null");
057 }
058
059 if (repositoryParentPath == null) {
060 throw new NullPointerException("Repository parent path is null");
061 }
062
063 if (eventExecutorGroup == null) {
064 throw new NullPointerException("Event executor group is null");
065 }
066
067 _fabricAgentRegistry = fabricAgentRegistry;
068 _repositoryParentPath = repositoryParentPath;
069 _eventExecutorGroup = eventExecutorGroup;
070 _getFileTimeout = getFileTimeout;
071 _rpcRelayTimeout = rpcRelayTimeout;
072 _startupTimeout = startupTimeout;
073 }
074
075 @Override
076 public void exceptionCaught(
077 ChannelHandlerContext channelHandlerContext, Throwable throwable) {
078
079 final Channel channel = channelHandlerContext.channel();
080
081 _log.error("Closing " + channel + " due to:", throwable);
082
083 ChannelFuture channelFuture = channel.close();
084
085 channelFuture.addListener(
086 new ChannelFutureListener() {
087
088 @Override
089 public void operationComplete(ChannelFuture channelFuture) {
090 if (_log.isInfoEnabled()) {
091 _log.info(channel + " is closed");
092 }
093 }
094
095 });
096 }
097
098 @Override
099 protected void channelRead0(
100 ChannelHandlerContext channelHandlerContext,
101 NettyFabricAgentConfig nettyFabricAgentConfig)
102 throws IOException {
103
104 Channel channel = channelHandlerContext.channel();
105
106 SocketAddress socketAddress = channel.localAddress();
107
108 Path repositoryPath = Paths.get(
109 _repositoryParentPath.toString(),
110 StringUtil.replace(
111 socketAddress.toString(), CharPool.COLON, CharPool.DASH));
112
113 Files.createDirectories(repositoryPath);
114
115 Repository<Channel> repository = new NettyRepository(
116 repositoryPath, _getFileTimeout);
117
118 ChannelPipeline channelPipeline = channel.pipeline();
119
120 channelPipeline.addLast(
121 new FileResponseChannelHandler(
122 repository.getAsyncBroker(), _eventExecutorGroup));
123
124 NettyFabricAgentStub nettyFabricAgentStub = new NettyFabricAgentStub(
125 channel, repository, nettyFabricAgentConfig.getRepositoryPath(),
126 _rpcRelayTimeout, _startupTimeout);
127
128 if (!_fabricAgentRegistry.registerFabricAgent(
129 nettyFabricAgentStub,
130 new OnRegistration(
131 channel, nettyFabricAgentStub, repository))) {
132
133 if (_log.isWarnEnabled()) {
134 _log.warn("Rejected duplicated fabric agent on " + channel);
135 }
136
137 return;
138 }
139
140 if (_log.isInfoEnabled()) {
141 _log.info("Registered fabric agent on " + channel);
142 }
143 }
144
145 protected class OnRegistration implements Runnable {
146
147 public OnRegistration(
148 Channel channel, NettyFabricAgentStub nettyFabricAgentStub,
149 Repository<Channel> repository) {
150
151 _channel = channel;
152 _nettyFabricAgentStub = nettyFabricAgentStub;
153 _repository = repository;
154 }
155
156 @Override
157 public void run() {
158 NettyChannelAttributes.setNettyFabricAgentStub(
159 _channel, _nettyFabricAgentStub);
160
161 ChannelFuture channelFuture = _channel.closeFuture();
162
163 channelFuture.addListener(
164 new PostDisconnectChannelFutureListener(
165 _channel, _nettyFabricAgentStub, _repository));
166 }
167
168 private final Channel _channel;
169 private final NettyFabricAgentStub _nettyFabricAgentStub;
170 private final Repository<Channel> _repository;
171
172 }
173
174 protected class PostDisconnectChannelFutureListener
175 implements ChannelFutureListener {
176
177 public PostDisconnectChannelFutureListener(
178 Channel channel, NettyFabricAgentStub nettyFabricAgentStub,
179 Repository<Channel> repository) {
180
181 _channel = channel;
182 _nettyFabricAgentStub = nettyFabricAgentStub;
183 _repository = repository;
184 }
185
186 @Override
187 public void operationComplete(ChannelFuture channelFuture) {
188 if (_fabricAgentRegistry.unregisterFabricAgent(
189 _nettyFabricAgentStub, null)) {
190
191 if (_log.isInfoEnabled()) {
192 _log.info("Unregistered fabric agent on " + _channel);
193 }
194 }
195 else if (_log.isWarnEnabled()) {
196 _log.warn("Unable to unregister fabric agent on " + _channel);
197 }
198
199 _repository.dispose(true);
200 }
201
202 private final Channel _channel;
203 private final NettyFabricAgentStub _nettyFabricAgentStub;
204 private final Repository<Channel> _repository;
205
206 }
207
208 private static final Log _log = LogFactoryUtil.getLog(
209 NettyFabricAgentRegistrationChannelHandler.class);
210
211 private final EventExecutorGroup _eventExecutorGroup;
212 private final FabricAgentRegistry _fabricAgentRegistry;
213 private final long _getFileTimeout;
214 private final Path _repositoryParentPath;
215 private final long _rpcRelayTimeout;
216 private final long _startupTimeout;
217
218 }