001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.server;
016    
017    import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
019    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
020    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
021    import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
022    import com.liferay.portal.fabric.netty.handlers.NettyFabricAgentRegistrationChannelHandler;
023    import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
024    import com.liferay.portal.fabric.server.FabricServer;
025    import com.liferay.portal.kernel.log.Log;
026    import com.liferay.portal.kernel.log.LogFactoryUtil;
027    import com.liferay.portal.kernel.util.NamedThreadFactory;
028    
029    import io.netty.bootstrap.ServerBootstrap;
030    import io.netty.channel.Channel;
031    import io.netty.channel.ChannelFuture;
032    import io.netty.channel.ChannelFutureListener;
033    import io.netty.channel.ChannelInitializer;
034    import io.netty.channel.ChannelPipeline;
035    import io.netty.channel.EventLoop;
036    import io.netty.channel.EventLoopGroup;
037    import io.netty.channel.nio.NioEventLoopGroup;
038    import io.netty.channel.socket.SocketChannel;
039    import io.netty.channel.socket.nio.NioServerSocketChannel;
040    import io.netty.handler.logging.LogLevel;
041    import io.netty.handler.logging.LoggingHandler;
042    import io.netty.util.Attribute;
043    import io.netty.util.AttributeKey;
044    import io.netty.util.concurrent.DefaultEventExecutorGroup;
045    import io.netty.util.concurrent.EventExecutorGroup;
046    
047    import java.nio.file.Files;
048    
049    /**
050     * @author Shuyang Zhou
051     */
052    public class NettyFabricServer implements FabricServer {
053    
054            public NettyFabricServer(
055                    FabricAgentRegistry fabricAgentRegistry,
056                    NettyFabricServerConfig nettyFabricServerConfig) {
057    
058                    _fabricAgentRegistry = fabricAgentRegistry;
059                    _nettyFabricServerConfig = nettyFabricServerConfig;
060            }
061    
062            @Override
063            public synchronized void start() throws Exception {
064                    if (_serverChannel != null) {
065                            throw new IllegalStateException(
066                                    "Netty fabric server was already started");
067                    }
068    
069                    if (_log.isInfoEnabled()) {
070                            _log.info(
071                                    "Starting Netty fabric server using " +
072                                            _nettyFabricServerConfig);
073                    }
074    
075                    Files.createDirectories(
076                            _nettyFabricServerConfig.getRepositoryParentPath());
077    
078                    ServerBootstrap serverBootstrap = new ServerBootstrap();
079    
080                    serverBootstrap.channel(NioServerSocketChannel.class);
081                    serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
082                    serverBootstrap.childHandler(new ChildChannelInitializer());
083    
084                    EventLoopGroup workerGroup = new NioEventLoopGroup(
085                            _nettyFabricServerConfig.getWorkerGroupThreadCount(),
086                            new NamedThreadFactory(
087                                    "Netty Fabric Server/Worker Event Loop Group",
088                                    Thread.NORM_PRIORITY, null));
089    
090                    serverBootstrap.group(
091                            new NioEventLoopGroup(
092                                    _nettyFabricServerConfig.getBossGroupThreadCount(),
093                                    new NamedThreadFactory(
094                                            "Netty Fabric Server/Boss Event Loop Group",
095                                            Thread.NORM_PRIORITY, null)),
096                            workerGroup);
097    
098                    serverBootstrap.attr(
099                            _fileServerEventExecutorGroupAttributeKey,
100                            new DefaultEventExecutorGroup(
101                                    _nettyFabricServerConfig.getFileServerGroupThreadCount(),
102                                    new NamedThreadFactory(
103                                            "Netty Fabric Server/File Server Event Executor Group",
104                                            Thread.NORM_PRIORITY, null)));
105                    serverBootstrap.attr(
106                            _registrationEventExecutorGroupAttributeKey,
107                            new DefaultEventExecutorGroup(
108                                    _nettyFabricServerConfig.getRegistrationGroupThreadCount(),
109                                    new NamedThreadFactory(
110                                            "Netty Fabric Server/Registration Event Executor Group",
111                                            Thread.NORM_PRIORITY, null)));
112                    serverBootstrap.attr(
113                            _rpcEventExecutorGroupAttributeKey,
114                            new DefaultEventExecutorGroup(
115                                    _nettyFabricServerConfig.getRPCGroupThreadCount(),
116                                    new NamedThreadFactory(
117                                            "Netty Fabric Server/RPC Event Executor Group",
118                                            Thread.NORM_PRIORITY, null)));
119                    serverBootstrap.attr(_workerEventLoopGroupAttributeKey, workerGroup);
120    
121                    ChannelFuture channelFuture = serverBootstrap.bind(
122                            _nettyFabricServerConfig.getNettyFabricServerHost(),
123                            _nettyFabricServerConfig.getNettyFabricServerPort());
124    
125                    _serverChannel = channelFuture.channel();
126    
127                    channelFuture.addListener(new PostBindChannelFutureListener());
128    
129                    channelFuture.sync();
130            }
131    
132            @Override
133            public synchronized void stop() throws InterruptedException {
134                    if (_serverChannel == null) {
135                            throw new IllegalStateException(
136                                    "Netty fabric server is not started");
137                    }
138    
139                    try {
140                            ChannelFuture channelFuture = _serverChannel.close();
141    
142                            channelFuture.sync();
143                    }
144                    finally {
145                            EventLoop eventLoop = _serverChannel.eventLoop();
146    
147                            EventLoopGroup bossGroup = eventLoop.parent();
148    
149                            bossGroup.shutdownGracefully();
150    
151                            shutdownEventExecutorGroup(
152                                    _serverChannel, _workerEventLoopGroupAttributeKey);
153                            shutdownEventExecutorGroup(
154                                    _serverChannel, _fileServerEventExecutorGroupAttributeKey);
155                            shutdownEventExecutorGroup(
156                                    _serverChannel, _registrationEventExecutorGroupAttributeKey);
157                            shutdownEventExecutorGroup(
158                                    _serverChannel, _rpcEventExecutorGroupAttributeKey);
159    
160                            FileHelperUtil.delete(
161                                    _nettyFabricServerConfig.getRepositoryParentPath());
162    
163                            _serverChannel = null;
164                    }
165            }
166    
167            protected EventExecutorGroup getEventExecutorGroup(
168                    Channel channel, AttributeKey<EventExecutorGroup> attributeKey) {
169    
170                    Attribute<EventExecutorGroup> attribute = channel.attr(attributeKey);
171    
172                    return attribute.get();
173            }
174    
175            protected void shutdownEventExecutorGroup(
176                    Channel channel,
177                    AttributeKey<? extends EventExecutorGroup> attributeKey) {
178    
179                    Attribute<? extends EventExecutorGroup> attribute = channel.attr(
180                            attributeKey);
181    
182                    EventExecutorGroup eventExecutorGroup = attribute.getAndRemove();
183    
184                    if (eventExecutorGroup != null) {
185                            eventExecutorGroup.shutdownGracefully();
186                    }
187            }
188    
189            protected class ChildChannelInitializer
190                    extends ChannelInitializer<SocketChannel> {
191    
192                    @Override
193                    protected void initChannel(SocketChannel socketChannel) {
194                            ChannelPipeline channelPipeline = socketChannel.pipeline();
195    
196                            channelPipeline.addLast(
197                                    AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
198                            channelPipeline.addLast(
199                                    AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
200                            channelPipeline.addLast(
201                                    getEventExecutorGroup(
202                                            _serverChannel, _rpcEventExecutorGroupAttributeKey),
203                                    NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
204                            channelPipeline.addLast(
205                                    getEventExecutorGroup(
206                                            _serverChannel, _fileServerEventExecutorGroupAttributeKey),
207                                    FileRequestChannelHandler.NAME,
208                                    new FileRequestChannelHandler(
209                                            _nettyFabricServerConfig.
210                                                    getFileServerFolderCompressionLevel()));
211                            channelPipeline.addLast(
212                                    getEventExecutorGroup(
213                                            _serverChannel,
214                                            _registrationEventExecutorGroupAttributeKey),
215                                    new NettyFabricAgentRegistrationChannelHandler(
216                                            _fabricAgentRegistry,
217                                            _nettyFabricServerConfig.getRepositoryParentPath(),
218                                            getEventExecutorGroup(
219                                                    _serverChannel,
220                                                    _fileServerEventExecutorGroupAttributeKey),
221                                            _nettyFabricServerConfig.getRepositoryGetFileTimeout(),
222                                            _nettyFabricServerConfig.getRPCRelayTimeout(),
223                                            _nettyFabricServerConfig.getWorkerStartupTimeout()));
224                    }
225    
226            }
227    
228            protected class PostBindChannelFutureListener
229                    implements ChannelFutureListener {
230    
231                    @Override
232                    public void operationComplete(ChannelFuture channelFuture)
233                            throws InterruptedException {
234    
235                            Channel channel = channelFuture.channel();
236    
237                            if (channelFuture.isSuccess()) {
238                                    if (_log.isInfoEnabled()) {
239                                            _log.info(
240                                                    "Started Netty fabric server on " +
241                                                            channel.localAddress());
242                                    }
243    
244                                    return;
245                            }
246    
247                            String serverAddress =
248                                    _nettyFabricServerConfig.getNettyFabricServerHost() + ":" +
249                                            _nettyFabricServerConfig.getNettyFabricServerPort();
250    
251                            if (channelFuture.isCancelled()) {
252                                    _log.error(
253                                            "Cancelled starting Netty fabric server on " +
254                                                    serverAddress);
255                            }
256                            else {
257                                    _log.error(
258                                            "Unable to start Netty fabric server on " + serverAddress,
259                                            channelFuture.cause());
260                            }
261    
262                            stop();
263                    }
264    
265            }
266    
267            private static final Log _log = LogFactoryUtil.getLog(
268                    NettyFabricServer.class);
269    
270            private static final AttributeKey<EventExecutorGroup>
271                    _fileServerEventExecutorGroupAttributeKey = AttributeKey.valueOf(
272                            "FileServerEventExecutorGroup");
273            private static final AttributeKey<EventExecutorGroup>
274                    _registrationEventExecutorGroupAttributeKey = AttributeKey.valueOf(
275                            "RegistrationEventExecutorGroup");
276            private static final AttributeKey<EventExecutorGroup>
277                    _rpcEventExecutorGroupAttributeKey = AttributeKey.valueOf(
278                            "RPCEventExecutorGroup");
279            private static final AttributeKey<EventLoopGroup>
280                    _workerEventLoopGroupAttributeKey = AttributeKey.valueOf(
281                            "WorkerEventLoopGroup");
282    
283            private final FabricAgentRegistry _fabricAgentRegistry;
284            private final NettyFabricServerConfig _nettyFabricServerConfig;
285            private Channel _serverChannel;
286    
287    }