001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.server;
016    
017    import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
019    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
020    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
021    import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
022    import com.liferay.portal.fabric.netty.handlers.NettyFabricAgentRegistrationChannelHandler;
023    import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
024    import com.liferay.portal.fabric.netty.util.NettyUtil;
025    import com.liferay.portal.fabric.server.FabricServer;
026    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
027    import com.liferay.portal.kernel.log.Log;
028    import com.liferay.portal.kernel.log.LogFactoryUtil;
029    import com.liferay.portal.kernel.util.NamedThreadFactory;
030    
031    import io.netty.bootstrap.ServerBootstrap;
032    import io.netty.channel.Channel;
033    import io.netty.channel.ChannelFuture;
034    import io.netty.channel.ChannelFutureListener;
035    import io.netty.channel.ChannelInitializer;
036    import io.netty.channel.ChannelPipeline;
037    import io.netty.channel.EventLoop;
038    import io.netty.channel.EventLoopGroup;
039    import io.netty.channel.nio.NioEventLoopGroup;
040    import io.netty.channel.socket.SocketChannel;
041    import io.netty.channel.socket.nio.NioServerSocketChannel;
042    import io.netty.handler.logging.LogLevel;
043    import io.netty.handler.logging.LoggingHandler;
044    import io.netty.util.ThreadDeathWatcher;
045    import io.netty.util.concurrent.DefaultEventExecutorGroup;
046    import io.netty.util.concurrent.EventExecutorGroup;
047    import io.netty.util.concurrent.Future;
048    import io.netty.util.concurrent.FutureListener;
049    
050    import java.nio.file.Files;
051    
052    import java.util.concurrent.TimeUnit;
053    
054    /**
055     * @author Shuyang Zhou
056     */
057    public class NettyFabricServer implements FabricServer {
058    
059            public NettyFabricServer(
060                    FabricAgentRegistry fabricAgentRegistry,
061                    NettyFabricServerConfig nettyFabricServerConfig) {
062    
063                    _fabricAgentRegistry = fabricAgentRegistry;
064                    _nettyFabricServerConfig = nettyFabricServerConfig;
065            }
066    
067            @Override
068            public synchronized void start() throws Exception {
069                    if (_serverChannel != null) {
070                            throw new IllegalStateException(
071                                    "Netty fabric server was already started");
072                    }
073    
074                    if (_log.isInfoEnabled()) {
075                            _log.info(
076                                    "Starting Netty fabric server using " +
077                                            _nettyFabricServerConfig);
078                    }
079    
080                    Files.createDirectories(
081                            _nettyFabricServerConfig.getRepositoryParentPath());
082    
083                    ServerBootstrap serverBootstrap = new ServerBootstrap();
084    
085                    serverBootstrap.channel(NioServerSocketChannel.class);
086                    serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
087                    serverBootstrap.childHandler(new ChildChannelInitializer());
088    
089                    EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(
090                            _nettyFabricServerConfig.getBossGroupThreadCount(),
091                            new NamedThreadFactory(
092                                    "Netty Fabric Server/Boss Event Loop Group",
093                                    Thread.NORM_PRIORITY, null));
094    
095                    EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(
096                            _nettyFabricServerConfig.getWorkerGroupThreadCount(),
097                            new NamedThreadFactory(
098                                    "Netty Fabric Server/Worker Event Loop Group",
099                                    Thread.NORM_PRIORITY, null));
100    
101                    NettyUtil.bindShutdown(
102                            bossEventLoopGroup, workerEventLoopGroup,
103                            _nettyFabricServerConfig.getShutdownQuietPeriod(),
104                            _nettyFabricServerConfig.getShutdownTimeout());
105    
106                    serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup);
107    
108                    ChannelFuture channelFuture = serverBootstrap.bind(
109                            _nettyFabricServerConfig.getNettyFabricServerHost(),
110                            _nettyFabricServerConfig.getNettyFabricServerPort());
111    
112                    _serverChannel = channelFuture.channel();
113    
114                    channelFuture.addListener(new PostBindChannelFutureListener());
115    
116                    channelFuture.sync();
117            }
118    
119            @Override
120            public synchronized java.util.concurrent.Future<?> stop()
121                    throws InterruptedException {
122    
123                    if (_serverChannel == null) {
124                            throw new IllegalStateException(
125                                    "Netty fabric server is not started");
126                    }
127    
128                    EventLoop eventLoop = _serverChannel.eventLoop();
129    
130                    EventLoopGroup bossEventLoopGroup = eventLoop.parent();
131    
132                    DefaultNoticeableFuture<?> defaultNoticeableFuture =
133                            new DefaultNoticeableFuture<>();
134    
135                    try {
136                            ChannelFuture channelFuture = _serverChannel.close();
137    
138                            channelFuture.sync();
139                    }
140                    finally {
141                            Future<?> future = bossEventLoopGroup.shutdownGracefully(
142                                    _nettyFabricServerConfig.getShutdownQuietPeriod(),
143                                    _nettyFabricServerConfig.getShutdownTimeout(),
144                                    TimeUnit.MILLISECONDS);
145    
146                            future.addListener(
147                                    new PostShutdownChannelListener(defaultNoticeableFuture));
148                    }
149    
150                    return defaultNoticeableFuture;
151            }
152    
153            protected EventExecutorGroup createEventExecutorGroup(
154                    int threadCount, String threadPoolName) {
155    
156                    EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(
157                            threadCount,
158                            new NamedThreadFactory(threadPoolName, Thread.NORM_PRIORITY, null));
159    
160                    EventLoop eventLoop = _serverChannel.eventLoop();
161    
162                    NettyUtil.bindShutdown(
163                            eventLoop.parent(), eventExecutorGroup,
164                            _nettyFabricServerConfig.getShutdownQuietPeriod(),
165                            _nettyFabricServerConfig.getShutdownTimeout());
166    
167                    return eventExecutorGroup;
168            }
169    
170            protected class ChildChannelInitializer
171                    extends ChannelInitializer<SocketChannel> {
172    
173                    @Override
174                    protected void initChannel(SocketChannel socketChannel) {
175                            ChannelPipeline channelPipeline = socketChannel.pipeline();
176    
177                            channelPipeline.addLast(
178                                    AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
179                            channelPipeline.addLast(
180                                    AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
181                            channelPipeline.addLast(
182                                    createEventExecutorGroup(
183                                            _nettyFabricServerConfig.getRPCGroupThreadCount(),
184                                            "Netty Fabric Server/RPC Event Executor Group"),
185                                    NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
186    
187                            EventExecutorGroup fileServerEventExecutorGroup =
188                                    createEventExecutorGroup(
189                                            _nettyFabricServerConfig.getFileServerGroupThreadCount(),
190                                            "Netty Fabric Server/File Server Event Executor Group");
191    
192                            channelPipeline.addLast(
193                                    fileServerEventExecutorGroup, FileRequestChannelHandler.NAME,
194                                    new FileRequestChannelHandler(
195                                            _nettyFabricServerConfig.
196                                                    getFileServerFolderCompressionLevel()));
197                            channelPipeline.addLast(
198                                    createEventExecutorGroup(
199                                            _nettyFabricServerConfig.getRegistrationGroupThreadCount(),
200                                            "Netty Fabric Server/Registration Event Executor Group"),
201                                    new NettyFabricAgentRegistrationChannelHandler(
202                                            _fabricAgentRegistry,
203                                            _nettyFabricServerConfig.getRepositoryParentPath(),
204                                            fileServerEventExecutorGroup,
205                                            _nettyFabricServerConfig.getRepositoryGetFileTimeout(),
206                                            _nettyFabricServerConfig.getRPCRelayTimeout(),
207                                            _nettyFabricServerConfig.getWorkerStartupTimeout()));
208                    }
209    
210            }
211    
212            protected class PostBindChannelFutureListener
213                    implements ChannelFutureListener {
214    
215                    @Override
216                    public void operationComplete(ChannelFuture channelFuture)
217                            throws InterruptedException {
218    
219                            Channel channel = channelFuture.channel();
220    
221                            if (channelFuture.isSuccess()) {
222                                    if (_log.isInfoEnabled()) {
223                                            _log.info(
224                                                    "Started Netty fabric server on " +
225                                                            channel.localAddress());
226                                    }
227    
228                                    return;
229                            }
230    
231                            String serverAddress =
232                                    _nettyFabricServerConfig.getNettyFabricServerHost() + ":" +
233                                            _nettyFabricServerConfig.getNettyFabricServerPort();
234    
235                            if (channelFuture.isCancelled()) {
236                                    _log.error(
237                                            "Cancelled starting Netty fabric server on " +
238                                                    serverAddress);
239                            }
240                            else {
241                                    _log.error(
242                                            "Unable to start Netty fabric server on " + serverAddress,
243                                            channelFuture.cause());
244                            }
245    
246                            stop();
247                    }
248    
249            }
250    
251            protected class PostShutdownChannelListener
252                    implements FutureListener<Object> {
253    
254                    @Override
255                    public void operationComplete(Future<Object> future)
256                            throws InterruptedException {
257    
258                            FileHelperUtil.delete(
259                                    _nettyFabricServerConfig.getRepositoryParentPath());
260    
261                            _serverChannel = null;
262    
263                            if (!ThreadDeathWatcher.awaitInactivity(
264                                            _nettyFabricServerConfig.getShutdownTimeout(),
265                                            TimeUnit.MILLISECONDS)) {
266    
267                                    _log.error("Unable to stop thread death watcher");
268                            }
269    
270                            runnable.run();
271                    }
272    
273                    protected PostShutdownChannelListener(Runnable runnable) {
274                            this.runnable = runnable;
275                    }
276    
277                    protected final Runnable runnable;
278    
279            }
280    
281            private static final Log _log = LogFactoryUtil.getLog(
282                    NettyFabricServer.class);
283    
284            private final FabricAgentRegistry _fabricAgentRegistry;
285            private final NettyFabricServerConfig _nettyFabricServerConfig;
286            private volatile Channel _serverChannel;
287    
288    }