001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.handlers;
016    
017    import com.liferay.portal.fabric.agent.FabricAgentRegistry;
018    import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
019    import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
020    import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
021    import com.liferay.portal.fabric.netty.repository.NettyRepository;
022    import com.liferay.portal.fabric.repository.Repository;
023    import com.liferay.portal.kernel.log.Log;
024    import com.liferay.portal.kernel.log.LogFactoryUtil;
025    import com.liferay.portal.kernel.util.CharPool;
026    import com.liferay.portal.kernel.util.StringUtil;
027    
028    import io.netty.channel.Channel;
029    import io.netty.channel.ChannelFuture;
030    import io.netty.channel.ChannelFutureListener;
031    import io.netty.channel.ChannelHandlerContext;
032    import io.netty.channel.ChannelPipeline;
033    import io.netty.channel.SimpleChannelInboundHandler;
034    import io.netty.util.concurrent.EventExecutorGroup;
035    
036    import java.io.IOException;
037    
038    import java.net.SocketAddress;
039    
040    import java.nio.file.Files;
041    import java.nio.file.Path;
042    import java.nio.file.Paths;
043    
044    /**
045     * @author Shuyang Zhou
046     */
047    public class NettyFabricAgentRegistrationChannelHandler
048            extends SimpleChannelInboundHandler<NettyFabricAgentConfig> {
049    
050            public NettyFabricAgentRegistrationChannelHandler(
051                    FabricAgentRegistry fabricAgentRegistry, Path repositoryParentPath,
052                    EventExecutorGroup eventExecutorGroup, long getFileTimeout,
053                    long rpcRelayTimeout, long startupTimeout) {
054    
055                    if (fabricAgentRegistry == null) {
056                            throw new NullPointerException("Fabric agent registry is null");
057                    }
058    
059                    if (repositoryParentPath == null) {
060                            throw new NullPointerException("Repository parent path is null");
061                    }
062    
063                    if (eventExecutorGroup == null) {
064                            throw new NullPointerException("Event executor group is null");
065                    }
066    
067                    _fabricAgentRegistry = fabricAgentRegistry;
068                    _repositoryParentPath = repositoryParentPath;
069                    _eventExecutorGroup = eventExecutorGroup;
070                    _getFileTimeout = getFileTimeout;
071                    _rpcRelayTimeout = rpcRelayTimeout;
072                    _startupTimeout = startupTimeout;
073            }
074    
075            @Override
076            public void exceptionCaught(
077                    ChannelHandlerContext channelHandlerContext, Throwable throwable) {
078    
079                    final Channel channel = channelHandlerContext.channel();
080    
081                    _log.error("Closing " + channel + " due to:", throwable);
082    
083                    ChannelFuture channelFuture = channel.close();
084    
085                    channelFuture.addListener(
086                            new ChannelFutureListener() {
087    
088                                    @Override
089                                    public void operationComplete(ChannelFuture channelFuture) {
090                                            if (_log.isInfoEnabled()) {
091                                                    _log.info(channel + " is closed");
092                                            }
093                                    }
094    
095                            });
096            }
097    
098            @Override
099            protected void channelRead0(
100                            ChannelHandlerContext channelHandlerContext,
101                            NettyFabricAgentConfig nettyFabricAgentConfig)
102                    throws IOException {
103    
104                    Channel channel = channelHandlerContext.channel();
105    
106                    SocketAddress socketAddress = channel.localAddress();
107    
108                    Path repositoryPath = Paths.get(
109                            _repositoryParentPath.toString(),
110                            StringUtil.replace(
111                                    socketAddress.toString(), CharPool.COLON, CharPool.DASH));
112    
113                    Files.createDirectories(repositoryPath);
114    
115                    Repository<Channel> repository = new NettyRepository(
116                            repositoryPath, _getFileTimeout);
117    
118                    ChannelPipeline channelPipeline = channel.pipeline();
119    
120                    channelPipeline.addLast(
121                            new FileResponseChannelHandler(
122                                    repository.getAsyncBroker(), _eventExecutorGroup));
123    
124                    NettyFabricAgentStub nettyFabricAgentStub = new NettyFabricAgentStub(
125                            channel, repository, nettyFabricAgentConfig.getRepositoryPath(),
126                            _rpcRelayTimeout, _startupTimeout);
127    
128                    if (!_fabricAgentRegistry.registerFabricAgent(
129                                    nettyFabricAgentStub,
130                                    new OnRegistration(
131                                            channel, nettyFabricAgentStub, repository))) {
132    
133                            if (_log.isWarnEnabled()) {
134                                    _log.warn("Rejected duplicated fabric agent on " + channel);
135                            }
136    
137                            return;
138                    }
139    
140                    if (_log.isInfoEnabled()) {
141                            _log.info("Registered fabric agent on " + channel);
142                    }
143            }
144    
145            protected class OnRegistration implements Runnable {
146    
147                    public OnRegistration(
148                            Channel channel, NettyFabricAgentStub nettyFabricAgentStub,
149                            Repository<Channel> repository) {
150    
151                            _channel = channel;
152                            _nettyFabricAgentStub = nettyFabricAgentStub;
153                            _repository = repository;
154                    }
155    
156                    @Override
157                    public void run() {
158                            NettyChannelAttributes.setNettyFabricAgentStub(
159                                    _channel, _nettyFabricAgentStub);
160    
161                            ChannelFuture channelFuture = _channel.closeFuture();
162    
163                            channelFuture.addListener(
164                                    new PostDisconnectChannelFutureListener(
165                                            _channel, _nettyFabricAgentStub, _repository));
166                    }
167    
168                    private final Channel _channel;
169                    private final NettyFabricAgentStub _nettyFabricAgentStub;
170                    private final Repository<Channel> _repository;
171    
172            }
173    
174            protected class PostDisconnectChannelFutureListener
175                    implements ChannelFutureListener {
176    
177                    public PostDisconnectChannelFutureListener(
178                            Channel channel, NettyFabricAgentStub nettyFabricAgentStub,
179                            Repository<Channel> repository) {
180    
181                            _channel = channel;
182                            _nettyFabricAgentStub = nettyFabricAgentStub;
183                            _repository = repository;
184                    }
185    
186                    @Override
187                    public void operationComplete(ChannelFuture channelFuture) {
188                            if (_fabricAgentRegistry.unregisterFabricAgent(
189                                            _nettyFabricAgentStub, null)) {
190    
191                                    if (_log.isInfoEnabled()) {
192                                            _log.info("Unregistered fabric agent on " + _channel);
193                                    }
194                            }
195                            else if (_log.isWarnEnabled()) {
196                                    _log.warn("Unable to unregister fabric agent on " + _channel);
197                            }
198    
199                            _repository.dispose(true);
200                    }
201    
202                    private final Channel _channel;
203                    private final NettyFabricAgentStub _nettyFabricAgentStub;
204                    private final Repository<Channel> _repository;
205    
206            }
207    
208            private static final Log _log = LogFactoryUtil.getLog(
209                    NettyFabricAgentRegistrationChannelHandler.class);
210    
211            private final EventExecutorGroup _eventExecutorGroup;
212            private final FabricAgentRegistry _fabricAgentRegistry;
213            private final long _getFileTimeout;
214            private final Path _repositoryParentPath;
215            private final long _rpcRelayTimeout;
216            private final long _startupTimeout;
217    
218    }