001
014
015 package com.liferay.portal.fabric.netty.server;
016
017 import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
019 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
020 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
021 import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
022 import com.liferay.portal.fabric.netty.handlers.NettyFabricAgentRegistrationChannelHandler;
023 import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
024 import com.liferay.portal.fabric.server.FabricServer;
025 import com.liferay.portal.kernel.log.Log;
026 import com.liferay.portal.kernel.log.LogFactoryUtil;
027 import com.liferay.portal.kernel.util.NamedThreadFactory;
028
029 import io.netty.bootstrap.ServerBootstrap;
030 import io.netty.channel.Channel;
031 import io.netty.channel.ChannelFuture;
032 import io.netty.channel.ChannelFutureListener;
033 import io.netty.channel.ChannelInitializer;
034 import io.netty.channel.ChannelPipeline;
035 import io.netty.channel.EventLoop;
036 import io.netty.channel.EventLoopGroup;
037 import io.netty.channel.nio.NioEventLoopGroup;
038 import io.netty.channel.socket.SocketChannel;
039 import io.netty.channel.socket.nio.NioServerSocketChannel;
040 import io.netty.handler.logging.LogLevel;
041 import io.netty.handler.logging.LoggingHandler;
042 import io.netty.util.Attribute;
043 import io.netty.util.AttributeKey;
044 import io.netty.util.concurrent.DefaultEventExecutorGroup;
045 import io.netty.util.concurrent.EventExecutorGroup;
046
047 import java.nio.file.Files;
048
049
052 public class NettyFabricServer implements FabricServer {
053
054 public NettyFabricServer(
055 FabricAgentRegistry fabricAgentRegistry,
056 NettyFabricServerConfig nettyFabricServerConfig) {
057
058 _fabricAgentRegistry = fabricAgentRegistry;
059 _nettyFabricServerConfig = nettyFabricServerConfig;
060 }
061
062 @Override
063 public synchronized void start() throws Exception {
064 if (_serverChannel != null) {
065 throw new IllegalStateException(
066 "Netty fabric server was already started");
067 }
068
069 if (_log.isInfoEnabled()) {
070 _log.info(
071 "Starting Netty fabric server using " +
072 _nettyFabricServerConfig);
073 }
074
075 Files.createDirectories(
076 _nettyFabricServerConfig.getRepositoryParentPath());
077
078 ServerBootstrap serverBootstrap = new ServerBootstrap();
079
080 serverBootstrap.channel(NioServerSocketChannel.class);
081 serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
082 serverBootstrap.childHandler(new ChildChannelInitializer());
083
084 EventLoopGroup workerGroup = new NioEventLoopGroup(
085 _nettyFabricServerConfig.getWorkerGroupThreadCount(),
086 new NamedThreadFactory(
087 "Netty Fabric Server/Worker Event Loop Group",
088 Thread.NORM_PRIORITY, null));
089
090 serverBootstrap.group(
091 new NioEventLoopGroup(
092 _nettyFabricServerConfig.getBossGroupThreadCount(),
093 new NamedThreadFactory(
094 "Netty Fabric Server/Boss Event Loop Group",
095 Thread.NORM_PRIORITY, null)),
096 workerGroup);
097
098 serverBootstrap.attr(
099 _fileServerEventExecutorGroupAttributeKey,
100 new DefaultEventExecutorGroup(
101 _nettyFabricServerConfig.getFileServerGroupThreadCount(),
102 new NamedThreadFactory(
103 "Netty Fabric Server/File Server Event Executor Group",
104 Thread.NORM_PRIORITY, null)));
105 serverBootstrap.attr(
106 _registrationEventExecutorGroupAttributeKey,
107 new DefaultEventExecutorGroup(
108 _nettyFabricServerConfig.getRegistrationGroupThreadCount(),
109 new NamedThreadFactory(
110 "Netty Fabric Server/Registration Event Executor Group",
111 Thread.NORM_PRIORITY, null)));
112 serverBootstrap.attr(
113 _rpcEventExecutorGroupAttributeKey,
114 new DefaultEventExecutorGroup(
115 _nettyFabricServerConfig.getRPCGroupThreadCount(),
116 new NamedThreadFactory(
117 "Netty Fabric Server/RPC Event Executor Group",
118 Thread.NORM_PRIORITY, null)));
119 serverBootstrap.attr(_workerEventLoopGroupAttributeKey, workerGroup);
120
121 ChannelFuture channelFuture = serverBootstrap.bind(
122 _nettyFabricServerConfig.getNettyFabricServerHost(),
123 _nettyFabricServerConfig.getNettyFabricServerPort());
124
125 _serverChannel = channelFuture.channel();
126
127 channelFuture.addListener(new PostBindChannelFutureListener());
128
129 channelFuture.sync();
130 }
131
132 @Override
133 public synchronized void stop() throws InterruptedException {
134 if (_serverChannel == null) {
135 throw new IllegalStateException(
136 "Netty fabric server is not started");
137 }
138
139 try {
140 ChannelFuture channelFuture = _serverChannel.close();
141
142 channelFuture.sync();
143 }
144 finally {
145 EventLoop eventLoop = _serverChannel.eventLoop();
146
147 EventLoopGroup bossGroup = eventLoop.parent();
148
149 bossGroup.shutdownGracefully();
150
151 shutdownEventExecutorGroup(
152 _serverChannel, _workerEventLoopGroupAttributeKey);
153 shutdownEventExecutorGroup(
154 _serverChannel, _fileServerEventExecutorGroupAttributeKey);
155 shutdownEventExecutorGroup(
156 _serverChannel, _registrationEventExecutorGroupAttributeKey);
157 shutdownEventExecutorGroup(
158 _serverChannel, _rpcEventExecutorGroupAttributeKey);
159
160 FileHelperUtil.delete(
161 _nettyFabricServerConfig.getRepositoryParentPath());
162
163 _serverChannel = null;
164 }
165 }
166
167 protected EventExecutorGroup getEventExecutorGroup(
168 Channel channel, AttributeKey<EventExecutorGroup> attributeKey) {
169
170 Attribute<EventExecutorGroup> attribute = channel.attr(attributeKey);
171
172 return attribute.get();
173 }
174
175 protected void shutdownEventExecutorGroup(
176 Channel channel,
177 AttributeKey<? extends EventExecutorGroup> attributeKey) {
178
179 Attribute<? extends EventExecutorGroup> attribute = channel.attr(
180 attributeKey);
181
182 EventExecutorGroup eventExecutorGroup = attribute.getAndRemove();
183
184 if (eventExecutorGroup != null) {
185 eventExecutorGroup.shutdownGracefully();
186 }
187 }
188
189 protected class ChildChannelInitializer
190 extends ChannelInitializer<SocketChannel> {
191
192 @Override
193 protected void initChannel(SocketChannel socketChannel) {
194 ChannelPipeline channelPipeline = socketChannel.pipeline();
195
196 channelPipeline.addLast(
197 AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
198 channelPipeline.addLast(
199 AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
200 channelPipeline.addLast(
201 getEventExecutorGroup(
202 _serverChannel, _rpcEventExecutorGroupAttributeKey),
203 NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
204 channelPipeline.addLast(
205 getEventExecutorGroup(
206 _serverChannel, _fileServerEventExecutorGroupAttributeKey),
207 FileRequestChannelHandler.NAME,
208 new FileRequestChannelHandler(
209 _nettyFabricServerConfig.
210 getFileServerFolderCompressionLevel()));
211 channelPipeline.addLast(
212 getEventExecutorGroup(
213 _serverChannel,
214 _registrationEventExecutorGroupAttributeKey),
215 new NettyFabricAgentRegistrationChannelHandler(
216 _fabricAgentRegistry,
217 _nettyFabricServerConfig.getRepositoryParentPath(),
218 getEventExecutorGroup(
219 _serverChannel,
220 _fileServerEventExecutorGroupAttributeKey),
221 _nettyFabricServerConfig.getRepositoryGetFileTimeout(),
222 _nettyFabricServerConfig.getRPCRelayTimeout(),
223 _nettyFabricServerConfig.getWorkerStartupTimeout()));
224 }
225
226 }
227
228 protected class PostBindChannelFutureListener
229 implements ChannelFutureListener {
230
231 @Override
232 public void operationComplete(ChannelFuture channelFuture)
233 throws InterruptedException {
234
235 Channel channel = channelFuture.channel();
236
237 if (channelFuture.isSuccess()) {
238 if (_log.isInfoEnabled()) {
239 _log.info(
240 "Started Netty fabric server on " +
241 channel.localAddress());
242 }
243
244 return;
245 }
246
247 String serverAddress =
248 _nettyFabricServerConfig.getNettyFabricServerHost() + ":" +
249 _nettyFabricServerConfig.getNettyFabricServerPort();
250
251 if (channelFuture.isCancelled()) {
252 _log.error(
253 "Cancelled starting Netty fabric server on " +
254 serverAddress);
255 }
256 else {
257 _log.error(
258 "Unable to start Netty fabric server on " + serverAddress,
259 channelFuture.cause());
260 }
261
262 stop();
263 }
264
265 }
266
267 private static final Log _log = LogFactoryUtil.getLog(
268 NettyFabricServer.class);
269
270 private static final AttributeKey<EventExecutorGroup>
271 _fileServerEventExecutorGroupAttributeKey = AttributeKey.valueOf(
272 "FileServerEventExecutorGroup");
273 private static final AttributeKey<EventExecutorGroup>
274 _registrationEventExecutorGroupAttributeKey = AttributeKey.valueOf(
275 "RegistrationEventExecutorGroup");
276 private static final AttributeKey<EventExecutorGroup>
277 _rpcEventExecutorGroupAttributeKey = AttributeKey.valueOf(
278 "RPCEventExecutorGroup");
279 private static final AttributeKey<EventLoopGroup>
280 _workerEventLoopGroupAttributeKey = AttributeKey.valueOf(
281 "WorkerEventLoopGroup");
282
283 private final FabricAgentRegistry _fabricAgentRegistry;
284 private final NettyFabricServerConfig _nettyFabricServerConfig;
285 private Channel _serverChannel;
286
287 }