001
014
015 package com.liferay.portal.fabric.netty.handlers;
016
017 import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
019 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
020 import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
021 import com.liferay.portal.fabric.netty.repository.NettyRepository;
022 import com.liferay.portal.fabric.repository.Repository;
023 import com.liferay.portal.kernel.log.Log;
024 import com.liferay.portal.kernel.log.LogFactoryUtil;
025 import com.liferay.portal.kernel.util.CharPool;
026 import com.liferay.portal.kernel.util.StringUtil;
027
028 import io.netty.channel.Channel;
029 import io.netty.channel.ChannelFuture;
030 import io.netty.channel.ChannelFutureListener;
031 import io.netty.channel.ChannelHandlerContext;
032 import io.netty.channel.ChannelPipeline;
033 import io.netty.channel.SimpleChannelInboundHandler;
034 import io.netty.util.concurrent.EventExecutorGroup;
035
036 import java.io.IOException;
037
038 import java.net.SocketAddress;
039
040 import java.nio.file.Files;
041 import java.nio.file.Path;
042 import java.nio.file.Paths;
043
044
047 public class NettyFabricAgentRegistrationChannelHandler
048 extends SimpleChannelInboundHandler<NettyFabricAgentConfig> {
049
050 public NettyFabricAgentRegistrationChannelHandler(
051 FabricAgentRegistry fabricAgentRegistry, Path repositoryParentPath,
052 EventExecutorGroup eventExecutorGroup, long getFileTimeout,
053 long rpcRelayTimeout, long startupTimeout) {
054
055 if (fabricAgentRegistry == null) {
056 throw new NullPointerException("Fabric agent registry is null");
057 }
058
059 if (repositoryParentPath == null) {
060 throw new NullPointerException("Repository parent path is null");
061 }
062
063 if (eventExecutorGroup == null) {
064 throw new NullPointerException("Event executor group is null");
065 }
066
067 _fabricAgentRegistry = fabricAgentRegistry;
068 _repositoryParentPath = repositoryParentPath;
069 _eventExecutorGroup = eventExecutorGroup;
070 _getFileTimeout = getFileTimeout;
071 _rpcRelayTimeout = rpcRelayTimeout;
072 _startupTimeout = startupTimeout;
073 }
074
075 @Override
076 public void exceptionCaught(
077 ChannelHandlerContext channelHandlerContext, Throwable throwable) {
078
079 final Channel channel = channelHandlerContext.channel();
080
081 _log.error("Closing " + channel + " due to:", throwable);
082
083 ChannelFuture channelFuture = channel.close();
084
085 channelFuture.addListener(
086 new ChannelFutureListener() {
087
088 @Override
089 public void operationComplete(ChannelFuture channelFuture) {
090 if (_log.isInfoEnabled()) {
091 _log.info(channel + " is closed");
092 }
093 }
094
095 });
096 }
097
098 @Override
099 protected void channelRead0(
100 ChannelHandlerContext channelHandlerContext,
101 NettyFabricAgentConfig nettyFabricAgentConfig)
102 throws IOException {
103
104 Channel channel = channelHandlerContext.channel();
105
106 SocketAddress socketAddress = channel.localAddress();
107
108 Path repositoryPath = Paths.get(
109 _repositoryParentPath.toString(),
110 StringUtil.replace(
111 socketAddress.toString(), CharPool.COLON, CharPool.DASH));
112
113 Files.createDirectories(repositoryPath);
114
115 Repository<Channel> repository = new NettyRepository(
116 repositoryPath, _getFileTimeout);
117
118 ChannelPipeline channelPipeline = channel.pipeline();
119
120 channelPipeline.addLast(
121 new FileResponseChannelHandler(
122 repository.getAsyncBroker(), _eventExecutorGroup));
123
124 NettyFabricAgentStub nettyFabricAgentStub =
125 new NettyFabricAgentStub(
126 channel, repository, nettyFabricAgentConfig.getRepositoryPath(),
127 _rpcRelayTimeout, _startupTimeout);
128
129 if (!_fabricAgentRegistry.registerFabricAgent(
130 nettyFabricAgentStub,
131 new OnRegistration(
132 channel, nettyFabricAgentStub, repository))) {
133
134 if (_log.isWarnEnabled()) {
135 _log.warn("Rejected duplicated fabric agent on " + channel);
136 }
137
138 return;
139 }
140
141 if (_log.isInfoEnabled()) {
142 _log.info("Registered fabric agent on " + channel);
143 }
144 }
145
146 protected class OnRegistration implements Runnable {
147
148 public OnRegistration(
149 Channel channel, NettyFabricAgentStub nettyFabricAgentStub,
150 Repository<Channel> repository) {
151
152 _channel = channel;
153 _nettyFabricAgentStub = nettyFabricAgentStub;
154 _repository = repository;
155 }
156
157 @Override
158 public void run() {
159 NettyChannelAttributes.setNettyFabricAgentStub(
160 _channel, _nettyFabricAgentStub);
161
162 ChannelFuture channelFuture = _channel.closeFuture();
163
164 channelFuture.addListener(
165 new PostDisconnectChannelFutureListener(
166 _channel, _nettyFabricAgentStub, _repository));
167 }
168
169 private final Channel _channel;
170 private final NettyFabricAgentStub _nettyFabricAgentStub;
171 private final Repository<Channel> _repository;
172
173 }
174
175 protected class PostDisconnectChannelFutureListener
176 implements ChannelFutureListener {
177
178 public PostDisconnectChannelFutureListener(
179 Channel channel, NettyFabricAgentStub nettyFabricAgentStub,
180 Repository<Channel> repository) {
181
182 _channel = channel;
183 _nettyFabricAgentStub = nettyFabricAgentStub;
184 _repository = repository;
185 }
186
187 @Override
188 public void operationComplete(ChannelFuture channelFuture) {
189 if (_fabricAgentRegistry.unregisterFabricAgent(
190 _nettyFabricAgentStub, null)) {
191
192 if (_log.isInfoEnabled()) {
193 _log.info("Unregistered fabric agent on " + _channel);
194 }
195 }
196 else if (_log.isWarnEnabled()) {
197 _log.warn("Unable to unregister fabric agent on " + _channel);
198 }
199
200 _repository.dispose(true);
201 }
202
203 private final Channel _channel;
204 private final NettyFabricAgentStub _nettyFabricAgentStub;
205 private final Repository<Channel> _repository;
206
207 }
208
209 private static final Log _log = LogFactoryUtil.getLog(
210 NettyFabricAgentRegistrationChannelHandler.class);
211
212 private final EventExecutorGroup _eventExecutorGroup;
213 private final FabricAgentRegistry _fabricAgentRegistry;
214 private final long _getFileTimeout;
215 private final Path _repositoryParentPath;
216 private final long _rpcRelayTimeout;
217 private final long _startupTimeout;
218
219 }