001
014
015 package com.liferay.portal.fabric.netty.server;
016
017 import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
019 import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
020 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
021 import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
022 import com.liferay.portal.fabric.netty.handlers.NettyFabricAgentRegistrationChannelHandler;
023 import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
024 import com.liferay.portal.fabric.netty.util.NettyUtil;
025 import com.liferay.portal.fabric.server.FabricServer;
026 import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
027 import com.liferay.portal.kernel.log.Log;
028 import com.liferay.portal.kernel.log.LogFactoryUtil;
029 import com.liferay.portal.kernel.util.NamedThreadFactory;
030
031 import io.netty.bootstrap.ServerBootstrap;
032 import io.netty.channel.Channel;
033 import io.netty.channel.ChannelFuture;
034 import io.netty.channel.ChannelFutureListener;
035 import io.netty.channel.ChannelInitializer;
036 import io.netty.channel.ChannelPipeline;
037 import io.netty.channel.EventLoop;
038 import io.netty.channel.EventLoopGroup;
039 import io.netty.channel.nio.NioEventLoopGroup;
040 import io.netty.channel.socket.SocketChannel;
041 import io.netty.channel.socket.nio.NioServerSocketChannel;
042 import io.netty.handler.logging.LogLevel;
043 import io.netty.handler.logging.LoggingHandler;
044 import io.netty.util.ThreadDeathWatcher;
045 import io.netty.util.concurrent.DefaultEventExecutorGroup;
046 import io.netty.util.concurrent.EventExecutorGroup;
047 import io.netty.util.concurrent.Future;
048 import io.netty.util.concurrent.FutureListener;
049
050 import java.nio.file.Files;
051
052 import java.util.concurrent.TimeUnit;
053
054
057 public class NettyFabricServer implements FabricServer {
058
059 public NettyFabricServer(
060 FabricAgentRegistry fabricAgentRegistry,
061 NettyFabricServerConfig nettyFabricServerConfig) {
062
063 _fabricAgentRegistry = fabricAgentRegistry;
064 _nettyFabricServerConfig = nettyFabricServerConfig;
065 }
066
067 @Override
068 public synchronized void start() throws Exception {
069 if (_serverChannel != null) {
070 throw new IllegalStateException(
071 "Netty fabric server was already started");
072 }
073
074 if (_log.isInfoEnabled()) {
075 _log.info(
076 "Starting Netty fabric server using " +
077 _nettyFabricServerConfig);
078 }
079
080 Files.createDirectories(
081 _nettyFabricServerConfig.getRepositoryParentPath());
082
083 ServerBootstrap serverBootstrap = new ServerBootstrap();
084
085 serverBootstrap.channel(NioServerSocketChannel.class);
086 serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
087 serverBootstrap.childHandler(new ChildChannelInitializer());
088
089 EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(
090 _nettyFabricServerConfig.getBossGroupThreadCount(),
091 new NamedThreadFactory(
092 "Netty Fabric Server/Boss Event Loop Group",
093 Thread.NORM_PRIORITY, null));
094
095 EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(
096 _nettyFabricServerConfig.getWorkerGroupThreadCount(),
097 new NamedThreadFactory(
098 "Netty Fabric Server/Worker Event Loop Group",
099 Thread.NORM_PRIORITY, null));
100
101 NettyUtil.bindShutdown(
102 bossEventLoopGroup, workerEventLoopGroup,
103 _nettyFabricServerConfig.getShutdownQuietPeriod(),
104 _nettyFabricServerConfig.getShutdownTimeout());
105
106 serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup);
107
108 ChannelFuture channelFuture = serverBootstrap.bind(
109 _nettyFabricServerConfig.getNettyFabricServerHost(),
110 _nettyFabricServerConfig.getNettyFabricServerPort());
111
112 _serverChannel = channelFuture.channel();
113
114 channelFuture.addListener(new PostBindChannelFutureListener());
115
116 channelFuture.sync();
117 }
118
119 @Override
120 public synchronized java.util.concurrent.Future<?> stop()
121 throws InterruptedException {
122
123 if (_serverChannel == null) {
124 throw new IllegalStateException(
125 "Netty fabric server is not started");
126 }
127
128 EventLoop eventLoop = _serverChannel.eventLoop();
129
130 EventLoopGroup bossEventLoopGroup = eventLoop.parent();
131
132 DefaultNoticeableFuture<?> defaultNoticeableFuture =
133 new DefaultNoticeableFuture<>();
134
135 try {
136 ChannelFuture channelFuture = _serverChannel.close();
137
138 channelFuture.sync();
139 }
140 finally {
141 Future<?> future = bossEventLoopGroup.shutdownGracefully(
142 _nettyFabricServerConfig.getShutdownQuietPeriod(),
143 _nettyFabricServerConfig.getShutdownTimeout(),
144 TimeUnit.MILLISECONDS);
145
146 future.addListener(
147 new PostShutdownChannelListener(defaultNoticeableFuture));
148 }
149
150 return defaultNoticeableFuture;
151 }
152
153 protected EventExecutorGroup createEventExecutorGroup(
154 int threadCount, String threadPoolName) {
155
156 EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(
157 threadCount,
158 new NamedThreadFactory(threadPoolName, Thread.NORM_PRIORITY, null));
159
160 EventLoop eventLoop = _serverChannel.eventLoop();
161
162 NettyUtil.bindShutdown(
163 eventLoop.parent(), eventExecutorGroup,
164 _nettyFabricServerConfig.getShutdownQuietPeriod(),
165 _nettyFabricServerConfig.getShutdownTimeout());
166
167 return eventExecutorGroup;
168 }
169
170 protected class ChildChannelInitializer
171 extends ChannelInitializer<SocketChannel> {
172
173 @Override
174 protected void initChannel(SocketChannel socketChannel) {
175 ChannelPipeline channelPipeline = socketChannel.pipeline();
176
177 channelPipeline.addLast(
178 AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
179 channelPipeline.addLast(
180 AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
181 channelPipeline.addLast(
182 createEventExecutorGroup(
183 _nettyFabricServerConfig.getRPCGroupThreadCount(),
184 "Netty Fabric Server/RPC Event Executor Group"),
185 NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
186
187 EventExecutorGroup fileServerEventExecutorGroup =
188 createEventExecutorGroup(
189 _nettyFabricServerConfig.getFileServerGroupThreadCount(),
190 "Netty Fabric Server/File Server Event Executor Group");
191
192 channelPipeline.addLast(
193 fileServerEventExecutorGroup, FileRequestChannelHandler.NAME,
194 new FileRequestChannelHandler(
195 _nettyFabricServerConfig.
196 getFileServerFolderCompressionLevel()));
197 channelPipeline.addLast(
198 createEventExecutorGroup(
199 _nettyFabricServerConfig.getRegistrationGroupThreadCount(),
200 "Netty Fabric Server/Registration Event Executor Group"),
201 new NettyFabricAgentRegistrationChannelHandler(
202 _fabricAgentRegistry,
203 _nettyFabricServerConfig.getRepositoryParentPath(),
204 fileServerEventExecutorGroup,
205 _nettyFabricServerConfig.getRepositoryGetFileTimeout(),
206 _nettyFabricServerConfig.getRPCRelayTimeout(),
207 _nettyFabricServerConfig.getWorkerStartupTimeout()));
208 }
209
210 }
211
212 protected class PostBindChannelFutureListener
213 implements ChannelFutureListener {
214
215 @Override
216 public void operationComplete(ChannelFuture channelFuture)
217 throws InterruptedException {
218
219 Channel channel = channelFuture.channel();
220
221 if (channelFuture.isSuccess()) {
222 if (_log.isInfoEnabled()) {
223 _log.info(
224 "Started Netty fabric server on " +
225 channel.localAddress());
226 }
227
228 return;
229 }
230
231 String serverAddress =
232 _nettyFabricServerConfig.getNettyFabricServerHost() + ":" +
233 _nettyFabricServerConfig.getNettyFabricServerPort();
234
235 if (channelFuture.isCancelled()) {
236 _log.error(
237 "Cancelled starting Netty fabric server on " +
238 serverAddress);
239 }
240 else {
241 _log.error(
242 "Unable to start Netty fabric server on " + serverAddress,
243 channelFuture.cause());
244 }
245
246 stop();
247 }
248
249 }
250
251 protected class PostShutdownChannelListener
252 implements FutureListener<Object> {
253
254 @Override
255 public void operationComplete(Future<Object> future)
256 throws InterruptedException {
257
258 FileHelperUtil.delete(
259 _nettyFabricServerConfig.getRepositoryParentPath());
260
261 _serverChannel = null;
262
263 if (!ThreadDeathWatcher.awaitInactivity(
264 _nettyFabricServerConfig.getShutdownTimeout(),
265 TimeUnit.MILLISECONDS)) {
266
267 _log.error("Unable to stop thread death watcher");
268 }
269
270 runnable.run();
271 }
272
273 protected PostShutdownChannelListener(Runnable runnable) {
274 this.runnable = runnable;
275 }
276
277 protected final Runnable runnable;
278
279 }
280
281 private static final Log _log = LogFactoryUtil.getLog(
282 NettyFabricServer.class);
283
284 private final FabricAgentRegistry _fabricAgentRegistry;
285 private final NettyFabricServerConfig _nettyFabricServerConfig;
286 private volatile Channel _serverChannel;
287
288 }