001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.client;
016    
017    import com.liferay.portal.fabric.client.FabricClient;
018    import com.liferay.portal.fabric.local.agent.LocalFabricAgent;
019    import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
020    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
021    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
022    import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
023    import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
024    import com.liferay.portal.fabric.netty.handlers.NettyChannelAttributes;
025    import com.liferay.portal.fabric.netty.handlers.NettyFabricWorkerExecutionChannelHandler;
026    import com.liferay.portal.fabric.netty.repository.NettyRepository;
027    import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
028    import com.liferay.portal.fabric.netty.util.NettyUtil;
029    import com.liferay.portal.fabric.repository.Repository;
030    import com.liferay.portal.fabric.worker.FabricWorker;
031    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
032    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.process.ProcessCallable;
036    import com.liferay.portal.kernel.process.ProcessExecutor;
037    import com.liferay.portal.kernel.process.TerminationProcessException;
038    import com.liferay.portal.kernel.util.NamedThreadFactory;
039    
040    import io.netty.bootstrap.Bootstrap;
041    import io.netty.channel.Channel;
042    import io.netty.channel.ChannelFuture;
043    import io.netty.channel.ChannelFutureListener;
044    import io.netty.channel.ChannelInitializer;
045    import io.netty.channel.ChannelPipeline;
046    import io.netty.channel.EventLoopGroup;
047    import io.netty.channel.nio.NioEventLoopGroup;
048    import io.netty.channel.socket.SocketChannel;
049    import io.netty.channel.socket.nio.NioSocketChannel;
050    import io.netty.util.concurrent.DefaultEventExecutorGroup;
051    import io.netty.util.concurrent.EventExecutorGroup;
052    import io.netty.util.concurrent.Future;
053    import io.netty.util.concurrent.FutureListener;
054    
055    import java.io.IOException;
056    import java.io.Serializable;
057    
058    import java.lang.Thread.State;
059    
060    import java.nio.file.Files;
061    import java.nio.file.Path;
062    
063    import java.util.Map;
064    import java.util.concurrent.ExecutionException;
065    import java.util.concurrent.TimeUnit;
066    import java.util.concurrent.TimeoutException;
067    import java.util.concurrent.atomic.AtomicInteger;
068    
069    /**
070     * @author Shuyang Zhou
071     */
072    public class NettyFabricClient implements FabricClient {
073    
074            public NettyFabricClient(
075                    ProcessExecutor processExecutor,
076                    NettyFabricClientConfig nettyFabricClientConfig,
077                    NettyFabricClientShutdownCallback nettyFabricClientShutdownCallback) {
078    
079                    _processExecutor = processExecutor;
080                    _nettyFabricClientConfig = nettyFabricClientConfig;
081                    _nettyFabricClientShutdownCallback = nettyFabricClientShutdownCallback;
082            }
083    
084            @Override
085            public synchronized void connect() {
086                    if (_channel != null) {
087                            throw new IllegalStateException(
088                                    "Netty fabric client was already started");
089                    }
090    
091                    if (_log.isInfoEnabled()) {
092                            _log.info(
093                                    "Starting Netty fabric client using " +
094                                            _nettyFabricClientConfig);
095                    }
096    
097                    Runtime runtime = Runtime.getRuntime();
098    
099                    runtime.addShutdownHook(_shutdownThread);
100    
101                    _bootstrap = new Bootstrap();
102    
103                    _bootstrap.channel(NioSocketChannel.class);
104                    _bootstrap.group(
105                            new NioEventLoopGroup(
106                                    _nettyFabricClientConfig.getEventLoopGroupThreadCount(),
107                                    new NamedThreadFactory(
108                                            "Netty Fabric Client/NIO Event Loop Group",
109                                            Thread.NORM_PRIORITY, null)));
110                    _bootstrap.handler(new NettyFabricClientChannelInitializer());
111    
112                    int reconnectCount = _nettyFabricClientConfig.getReconnectCount();
113    
114                    if (reconnectCount < 0) {
115                            reconnectCount = Integer.MAX_VALUE;
116                    }
117    
118                    _reconnectCounter.set(reconnectCount);
119    
120                    doConnect();
121            }
122    
123            @Override
124            public synchronized java.util.concurrent.Future<?> disconnect() {
125                    if (_channel == null) {
126                            throw new IllegalStateException(
127                                    "Netty fabric client is not started");
128                    }
129    
130                    _reconnectCounter.set(0);
131    
132                    _channel.close();
133    
134                    EventExecutorGroup eventExecutorGroup = _bootstrap.group();
135    
136                    Future<?> future = eventExecutorGroup.terminationFuture();
137    
138                    final DefaultNoticeableFuture<?> defaultNoticeableFuture =
139                            new DefaultNoticeableFuture<>();
140    
141                    future.addListener(
142                            new FutureListener<Object>() {
143    
144                                    @Override
145                                    public void operationComplete(Future<Object> future) {
146                                            defaultNoticeableFuture.run();
147                                    }
148    
149                            });
150    
151                    return defaultNoticeableFuture;
152            }
153    
154            protected EventExecutorGroup createEventExecutorGroup(
155                    int threadCount, String threadPoolName) {
156    
157                    EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(
158                            threadCount,
159                            new NamedThreadFactory(threadPoolName, Thread.NORM_PRIORITY, null));
160    
161                    NettyUtil.bindShutdown(
162                            _bootstrap.group(), eventExecutorGroup,
163                            _nettyFabricClientConfig.getShutdownQuietPeriod(),
164                            _nettyFabricClientConfig.getShutdownTimeout());
165    
166                    return eventExecutorGroup;
167            }
168    
169            protected void doConnect() {
170                    ChannelFuture channelFuture = _bootstrap.connect(
171                            _nettyFabricClientConfig.getNettyFabricServerHost(),
172                            _nettyFabricClientConfig.getNettyFabricServerPort());
173    
174                    _channel = channelFuture.channel();
175    
176                    channelFuture.addListener(new PostConnectChannelFutureListener());
177            }
178    
179            protected void terminateFabricWorkers(Channel channel) {
180                    Map<Long, FabricWorker<?>> fabricWorkers =
181                            NettyChannelAttributes.getFabricWorkers(channel);
182    
183                    if (fabricWorkers == null) {
184                            return;
185                    }
186    
187                    for (Map.Entry<Long, FabricWorker<?>> entry :
188                                    fabricWorkers.entrySet()) {
189    
190                            FabricWorker<?> fabricWorker = entry.getValue();
191    
192                            fabricWorker.write(_runtimeExitProcessCallable);
193    
194                            NoticeableFuture<?> noticeableFuture =
195                                    fabricWorker.getProcessNoticeableFuture();
196    
197                            try {
198                                    try {
199                                            noticeableFuture.get(
200                                                    _nettyFabricClientConfig.getExecutionTimeout(),
201                                                    TimeUnit.MILLISECONDS);
202                                    }
203                                    catch (TimeoutException te) {
204                                            fabricWorker.write(_runtimeHaltProcessCallable);
205    
206                                            noticeableFuture.get(
207                                                    _nettyFabricClientConfig.getExecutionTimeout(),
208                                                    TimeUnit.MILLISECONDS);
209                                    }
210                            }
211                            catch (Throwable t) {
212                                    if (t instanceof ExecutionException) {
213                                            Throwable cause = t.getCause();
214    
215                                            if (cause instanceof TerminationProcessException) {
216                                                    TerminationProcessException tpe =
217                                                            (TerminationProcessException)cause;
218    
219                                                    if (_log.isWarnEnabled()) {
220                                                            _log.warn(
221                                                                    "Forcibly terminate fabric worker " +
222                                                                            entry.getKey() + " with exit code " +
223                                                                                    tpe.getExitCode());
224                                                    }
225    
226                                                    continue;
227                                            }
228                                    }
229    
230                                    _log.error(
231                                            "Unable to terminate fabric worker " + entry.getKey(), t);
232                            }
233                    }
234            }
235    
236            protected class NettyFabricClientChannelInitializer
237                    extends ChannelInitializer<SocketChannel> {
238    
239                    @Override
240                    protected void initChannel(SocketChannel socketChannel)
241                            throws IOException {
242    
243                            Path repositoryPath = _nettyFabricClientConfig.getRepositoryPath();
244    
245                            Files.createDirectories(repositoryPath);
246    
247                            Repository<Channel> repository = new NettyRepository(
248                                    repositoryPath,
249                                    _nettyFabricClientConfig.getRepositoryGetFileTimeout());
250    
251                            ChannelFuture channelFuture = socketChannel.closeFuture();
252    
253                            channelFuture.addListener(
254                                    new PostDisconnectChannelFutureListener(repository));
255    
256                            ChannelPipeline channelPipeline = socketChannel.pipeline();
257    
258                            channelPipeline.addLast(
259                                    AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
260                            channelPipeline.addLast(
261                                    AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
262    
263                            EventExecutorGroup fileServerEventExecutorGroup =
264                                    createEventExecutorGroup(
265                                            _nettyFabricClientConfig.getFileServerGroupThreadCount(),
266                                            "Netty Fabric Client/File Server Event Executor Group");
267    
268                            channelPipeline.addLast(
269                                    fileServerEventExecutorGroup, FileRequestChannelHandler.NAME,
270                                    new FileRequestChannelHandler(
271                                            _nettyFabricClientConfig.
272                                                    getFileServerFolderCompressionLevel()));
273                            channelPipeline.addLast(
274                                    new FileResponseChannelHandler(
275                                            repository.getAsyncBroker(), fileServerEventExecutorGroup));
276                            channelPipeline.addLast(
277                                    createEventExecutorGroup(
278                                            _nettyFabricClientConfig.getRPCGroupThreadCount(),
279                                            "Netty Fabric Client/RPC Event Executor Group"),
280                                    NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
281                            channelPipeline.addLast(
282                                    createEventExecutorGroup(
283                                            _nettyFabricClientConfig.getExecutionGroupThreadCount(),
284                                            "Netty Fabric Client/Execution Event Executor Group"),
285                                    new NettyFabricWorkerExecutionChannelHandler(
286                                            repository, new LocalFabricAgent(_processExecutor),
287                                            _nettyFabricClientConfig.getExecutionTimeout()));
288                    }
289    
290            }
291    
292            protected class PostConnectChannelFutureListener
293                    implements ChannelFutureListener {
294    
295                    @Override
296                    public void operationComplete(ChannelFuture channelFuture) {
297                            Channel channel = channelFuture.channel();
298    
299                            if (channelFuture.isSuccess()) {
300                                    if (_log.isInfoEnabled()) {
301                                            _log.info("Connected to " + channel.remoteAddress());
302                                    }
303    
304                                    Path repositoryPath =
305                                            _nettyFabricClientConfig.getRepositoryPath();
306    
307                                    ChannelFuture registerChannelFuture = _channel.writeAndFlush(
308                                            new NettyFabricAgentConfig(repositoryPath.toFile()));
309    
310                                    registerChannelFuture.addListener(
311                                            new PostRegisterChannelFutureListener());
312    
313                                    return;
314                            }
315    
316                            String serverAddress =
317                                    _nettyFabricClientConfig.getNettyFabricServerHost() + ":" +
318                                            _nettyFabricClientConfig.getNettyFabricServerPort();
319    
320                            if (channelFuture.isCancelled()) {
321                                    _log.error("Cancelled connecting to " + serverAddress);
322                            }
323                            else {
324                                    _log.error(
325                                            "Unable to connect to " + serverAddress,
326                                            channelFuture.cause());
327                            }
328                    }
329    
330            }
331    
332            protected class PostDisconnectChannelFutureListener
333                    implements ChannelFutureListener {
334    
335                    @Override
336                    public void operationComplete(ChannelFuture channelFuture) {
337                            terminateFabricWorkers(_channel);
338    
339                            repository.dispose(true);
340    
341                            EventLoopGroup eventLoopGroup = _bootstrap.group();
342    
343                            if (_reconnectCounter.getAndDecrement() > 0) {
344                                    eventLoopGroup.schedule(
345                                            new Runnable() {
346    
347                                                    @Override
348                                                    public void run() {
349                                                            doConnect();
350                                                    }
351    
352                                            },
353                                            _nettyFabricClientConfig.getReconnectInterval(),
354                                            TimeUnit.MILLISECONDS);
355    
356                                    if (_log.isInfoEnabled()) {
357                                            _log.info(
358                                                    "Try to reconnect " +
359                                                            _nettyFabricClientConfig.getReconnectInterval() +
360                                                                    " ms later");
361                                    }
362                            }
363                            else {
364                                    if (_log.isInfoEnabled()) {
365                                            _log.info(
366                                                    "Shutting down Netty fabric client on " + _channel);
367                                    }
368    
369                                    Future<?> future = eventLoopGroup.shutdownGracefully(
370                                            _nettyFabricClientConfig.getShutdownQuietPeriod(),
371                                            _nettyFabricClientConfig.getShutdownTimeout(),
372                                            TimeUnit.MILLISECONDS);
373    
374                                    future.addListener(new PostShutdownChannelFutureListener());
375                            }
376                    }
377    
378                    protected PostDisconnectChannelFutureListener(
379                            Repository<Channel> repository) {
380    
381                            this.repository = repository;
382                    }
383    
384                    protected final Repository<Channel> repository;
385    
386            }
387    
388            protected class PostRegisterChannelFutureListener
389                    implements ChannelFutureListener {
390    
391                    @Override
392                    public void operationComplete(ChannelFuture channelFuture) {
393                            if (channelFuture.isSuccess()) {
394                                    int reconnectCount =
395                                            _nettyFabricClientConfig.getReconnectCount();
396    
397                                    if (reconnectCount < 0) {
398                                            reconnectCount = Integer.MAX_VALUE;
399                                    }
400    
401                                    _reconnectCounter.set(reconnectCount);
402    
403                                    if (_log.isInfoEnabled()) {
404                                            _log.info("Registered Netty fabric agent on " + _channel);
405                                    }
406    
407                                    return;
408                            }
409    
410                            _log.error("Unable to register Netty fabric agent on " + _channel);
411    
412                            _channel.close();
413                    }
414    
415            }
416    
417            protected class PostShutdownChannelFutureListener
418                    implements FutureListener<Object> {
419    
420                    @Override
421                    public void operationComplete(Future<Object> future) {
422                            _channel = null;
423                            _bootstrap = null;
424    
425                            _nettyFabricClientShutdownCallback.shutdown();
426    
427                            if (_shutdownThread.getState() == State.NEW) {
428                                    Runtime runtime = Runtime.getRuntime();
429    
430                                    runtime.removeShutdownHook(_shutdownThread);
431                            }
432                    }
433    
434            }
435    
436            private static final int _FABRIC_AGENT_SHUTDOWN_CODE = 211;
437    
438            private static final Log _log = LogFactoryUtil.getLog(
439                    NettyFabricClient.class);
440    
441            private static final ProcessCallable<Serializable>
442                    _runtimeExitProcessCallable = new ProcessCallable<Serializable>() {
443    
444                            @Override
445                            public Serializable call() {
446                                    Runtime runtime = Runtime.getRuntime();
447    
448                                    runtime.exit(_FABRIC_AGENT_SHUTDOWN_CODE);
449    
450                                    return null;
451                            }
452    
453                            private static final long serialVersionUID = 1L;
454    
455                    };
456    
457            private static final ProcessCallable<Serializable>
458                    _runtimeHaltProcessCallable = new ProcessCallable<Serializable>() {
459    
460                            @Override
461                            public Serializable call() {
462                                    Runtime runtime = Runtime.getRuntime();
463    
464                                    runtime.halt(_FABRIC_AGENT_SHUTDOWN_CODE);
465    
466                                    return null;
467                            }
468    
469                            private static final long serialVersionUID = 1L;
470    
471                    };
472    
473            private volatile Bootstrap _bootstrap;
474            private volatile Channel _channel;
475            private final NettyFabricClientConfig _nettyFabricClientConfig;
476            private final NettyFabricClientShutdownCallback
477                    _nettyFabricClientShutdownCallback;
478            private final ProcessExecutor _processExecutor;
479            private final AtomicInteger _reconnectCounter = new AtomicInteger();
480    
481            private final Thread _shutdownThread = new Thread() {
482    
483                    @Override
484                    public void run() {
485                            Channel channel = _channel;
486    
487                            if (channel != null) {
488                                    _reconnectCounter.set(0);
489    
490                                    ChannelFuture channelFuture = channel.close();
491    
492                                    channelFuture.syncUninterruptibly();
493                            }
494                    }
495    
496            };
497    
498    }