001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.client;
016    
017    import com.liferay.portal.fabric.client.FabricClient;
018    import com.liferay.portal.fabric.local.agent.LocalFabricAgent;
019    import com.liferay.portal.fabric.netty.agent.NettyFabricAgentConfig;
020    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectDecoder;
021    import com.liferay.portal.fabric.netty.codec.serialization.AnnotatedObjectEncoder;
022    import com.liferay.portal.fabric.netty.fileserver.handlers.FileRequestChannelHandler;
023    import com.liferay.portal.fabric.netty.fileserver.handlers.FileResponseChannelHandler;
024    import com.liferay.portal.fabric.netty.handlers.NettyChannelAttributes;
025    import com.liferay.portal.fabric.netty.handlers.NettyFabricWorkerExecutionChannelHandler;
026    import com.liferay.portal.fabric.netty.repository.NettyRepository;
027    import com.liferay.portal.fabric.netty.rpc.handlers.NettyRPCChannelHandler;
028    import com.liferay.portal.fabric.repository.Repository;
029    import com.liferay.portal.fabric.worker.FabricWorker;
030    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.process.ProcessCallable;
034    import com.liferay.portal.kernel.process.ProcessExecutor;
035    import com.liferay.portal.kernel.process.TerminationProcessException;
036    import com.liferay.portal.kernel.util.NamedThreadFactory;
037    
038    import io.netty.bootstrap.Bootstrap;
039    import io.netty.channel.Channel;
040    import io.netty.channel.ChannelFuture;
041    import io.netty.channel.ChannelFutureListener;
042    import io.netty.channel.ChannelInitializer;
043    import io.netty.channel.ChannelPipeline;
044    import io.netty.channel.EventLoop;
045    import io.netty.channel.EventLoopGroup;
046    import io.netty.channel.nio.NioEventLoopGroup;
047    import io.netty.channel.socket.SocketChannel;
048    import io.netty.channel.socket.nio.NioSocketChannel;
049    import io.netty.util.Attribute;
050    import io.netty.util.AttributeKey;
051    import io.netty.util.concurrent.DefaultEventExecutorGroup;
052    import io.netty.util.concurrent.EventExecutorGroup;
053    
054    import java.io.IOException;
055    import java.io.Serializable;
056    
057    import java.nio.file.Files;
058    import java.nio.file.Path;
059    
060    import java.util.Map;
061    import java.util.concurrent.Callable;
062    import java.util.concurrent.ExecutionException;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.TimeoutException;
065    import java.util.concurrent.atomic.AtomicInteger;
066    
067    /**
068     * @author Shuyang Zhou
069     */
070    public class NettyFabricClient implements FabricClient {
071    
072            public NettyFabricClient(
073                    ProcessExecutor processExecutor,
074                    NettyFabricClientConfig nettyFabricClientConfig,
075                    NettyFabricClientShutdownCallback nettyFabricClientShutdownCallback) {
076    
077                    _processExecutor = processExecutor;
078                    _nettyFabricClientConfig = nettyFabricClientConfig;
079                    _nettyFabricClientShutdownCallback = nettyFabricClientShutdownCallback;
080            }
081    
082            @Override
083            public synchronized void connect() {
084                    if (_channel != null) {
085                            throw new IllegalStateException(
086                                    "Netty fabric client was already started");
087                    }
088    
089                    if (_log.isInfoEnabled()) {
090                            _log.info(
091                                    "Starting Netty fabric client using " +
092                                            _nettyFabricClientConfig);
093                    }
094    
095                    Runtime runtime = Runtime.getRuntime();
096    
097                    runtime.addShutdownHook(_shutdownThread);
098    
099                    _bootstrap = new Bootstrap();
100    
101                    _bootstrap.attr(
102                            _executionEventExecutorGroupAttributeKey,
103                            new DefaultEventExecutorGroup(
104                                    _nettyFabricClientConfig.getExecutionGroupThreadCount(),
105                                    new NamedThreadFactory(
106                                            "Netty Fabric Client/Execution Event Executor Group",
107                                            Thread.NORM_PRIORITY, null)));
108                    _bootstrap.attr(
109                            _fileServerEventExecutorGroupAttributeKey,
110                            new DefaultEventExecutorGroup(
111                                    _nettyFabricClientConfig.getFileServerGroupThreadCount(),
112                                    new NamedThreadFactory(
113                                            "Netty Fabric Client/File Server Event Executor Group",
114                                            Thread.NORM_PRIORITY, null)));
115                    _bootstrap.attr(
116                            _rpcEventExecutorGroupAttributeKey,
117                            new DefaultEventExecutorGroup(
118                                    _nettyFabricClientConfig.getRPCGroupThreadCount(),
119                                    new NamedThreadFactory(
120                                            "Netty Fabric Client/RPC Event Executor Group",
121                                            Thread.NORM_PRIORITY, null)));
122                    _bootstrap.channel(NioSocketChannel.class);
123                    _bootstrap.group(
124                            new NioEventLoopGroup(
125                                    _nettyFabricClientConfig.getEventLoopGroupThreadCount(),
126                                    new NamedThreadFactory(
127                                            "Netty Fabric Client/NIO Event Loop Group",
128                                            Thread.NORM_PRIORITY, null)));
129                    _bootstrap.handler(new NettyFabricClientChannelInitializer());
130    
131                    int reconnectCount = _nettyFabricClientConfig.getReconnectCount();
132    
133                    if (reconnectCount < 0) {
134                            reconnectCount = Integer.MAX_VALUE;
135                    }
136    
137                    _reconnectCounter.set(reconnectCount);
138    
139                    doConnect();
140            }
141    
142            @Override
143            public synchronized void disconnect() throws InterruptedException {
144                    _reconnectCounter.set(0);
145    
146                    doDisconnect();
147            }
148    
149            protected void disposeRepository(Channel channel) {
150                    Attribute<Repository<Channel>> attribute = channel.attr(
151                            _repositoryAttributeKey);
152    
153                    Repository<Channel> repository = attribute.getAndRemove();
154    
155                    if (repository != null) {
156                            repository.dispose(true);
157                    }
158            }
159    
160            protected void doConnect() {
161                    ChannelFuture channelFuture = _bootstrap.connect(
162                            _nettyFabricClientConfig.getNettyFabricServerHost(),
163                            _nettyFabricClientConfig.getNettyFabricServerPort());
164    
165                    _channel = channelFuture.channel();
166    
167                    channelFuture.addListener(new PostConnectChannelFutureListener());
168            }
169    
170            protected void doDisconnect() throws InterruptedException {
171                    if (_channel == null) {
172                            throw new IllegalStateException(
173                                    "Netty fabric client is not started");
174                    }
175    
176                    try {
177                            ChannelFuture channelFuture = _channel.close();
178    
179                            channelFuture.sync();
180                    }
181                    finally {
182                            terminateFabricWorkers(_channel);
183    
184                            disposeRepository(_channel);
185    
186                            EventLoop eventLoop = _channel.eventLoop();
187    
188                            EventLoopGroup eventLoopGroup = eventLoop.parent();
189    
190                            if (_reconnectCounter.getAndDecrement() > 0) {
191                                    eventLoopGroup.schedule(
192                                            new Callable<Void>() {
193    
194                                                    @Override
195                                                    public Void call() {
196                                                            doConnect();
197    
198                                                            return null;
199                                                    }
200    
201                                            },
202                                            _nettyFabricClientConfig.getReconnectInterval(),
203                                            TimeUnit.MILLISECONDS);
204    
205                                    if (_log.isInfoEnabled()) {
206                                            _log.info(
207                                                    "Try to reconnect " +
208                                                            _nettyFabricClientConfig.getReconnectInterval() +
209                                                                    "ms later");
210                                    }
211                            }
212                            else {
213                                    if (_log.isInfoEnabled()) {
214                                            _log.info(
215                                                    "Shutting down Netty fabric client on " + _channel);
216                                    }
217    
218                                    try {
219                                            eventLoopGroup.shutdownGracefully();
220    
221                                            shutdownEventExecutorGroup(
222                                                    _channel, _executionEventExecutorGroupAttributeKey);
223                                            shutdownEventExecutorGroup(
224                                                    _channel, _fileServerEventExecutorGroupAttributeKey);
225                                            shutdownEventExecutorGroup(
226                                                    _channel, _rpcEventExecutorGroupAttributeKey);
227    
228                                            _channel = null;
229                                            _bootstrap = null;
230                                    }
231                                    finally {
232                                            _nettyFabricClientShutdownCallback.shutdown();
233    
234                                            Runtime runtime = Runtime.getRuntime();
235    
236                                            runtime.removeShutdownHook(_shutdownThread);
237                                    }
238                            }
239                    }
240            }
241    
242            protected EventExecutorGroup getEventExecutorGroup(
243                    Channel channel, AttributeKey<EventExecutorGroup> attributeKey) {
244    
245                    Attribute<EventExecutorGroup> attribute = channel.attr(attributeKey);
246    
247                    return attribute.get();
248            }
249    
250            protected Repository<Channel> getRepository(Channel channel)
251                    throws IOException {
252    
253                    Attribute<Repository<Channel>> attribute = channel.attr(
254                            _repositoryAttributeKey);
255    
256                    Repository<Channel> repository = attribute.get();
257    
258                    if (repository == null) {
259                            Path repositoryPath = _nettyFabricClientConfig.getRepositoryPath();
260    
261                            Files.createDirectories(repositoryPath);
262    
263                            repository = new NettyRepository(
264                                    repositoryPath,
265                                    _nettyFabricClientConfig.getRepositoryGetFileTimeout());
266    
267                            Repository<Channel> previousRepository = attribute.setIfAbsent(
268                                    repository);
269    
270                            if (previousRepository != null) {
271                                    repository.dispose(true);
272    
273                                    repository = previousRepository;
274                            }
275    
276                            ChannelPipeline channelPipeline = channel.pipeline();
277    
278                            channelPipeline.addLast(
279                                    new FileResponseChannelHandler(
280                                            repository.getAsyncBroker(),
281                                            getEventExecutorGroup(
282                                                    _channel, _fileServerEventExecutorGroupAttributeKey)));
283                    }
284    
285                    return repository;
286            }
287    
288            protected void registerNettyFabricAgent() throws IOException {
289                    ChannelPipeline channelPipeline = _channel.pipeline();
290    
291                    Repository<Channel> repository = getRepository(_channel);
292    
293                    channelPipeline.addLast(
294                            getEventExecutorGroup(
295                                    _channel, _executionEventExecutorGroupAttributeKey),
296                            new NettyFabricWorkerExecutionChannelHandler(
297                                    repository, new LocalFabricAgent(_processExecutor),
298                                    _nettyFabricClientConfig.getExecutionTimeout()));
299    
300                    Path repositoryPath = repository.getRepositoryPath();
301    
302                    ChannelFuture channelFuture = _channel.writeAndFlush(
303                            new NettyFabricAgentConfig(repositoryPath.toFile()));
304    
305                    channelFuture.addListener(new PostRegisterChannelFutureListener());
306            }
307    
308            protected void shutdownEventExecutorGroup(
309                    Channel channel, AttributeKey<EventExecutorGroup> attributeKey) {
310    
311                    Attribute<EventExecutorGroup> attribute = channel.attr(attributeKey);
312    
313                    EventExecutorGroup eventExecutorGroup = attribute.getAndRemove();
314    
315                    if (eventExecutorGroup != null) {
316                            eventExecutorGroup.shutdownGracefully();
317                    }
318            }
319    
320            protected void terminateFabricWorkers(Channel channel) {
321                    Map<Long, FabricWorker<?>> fabricWorkers =
322                            NettyChannelAttributes.getFabricWorkers(channel);
323    
324                    if (fabricWorkers == null) {
325                            return;
326                    }
327    
328                    for (Map.Entry<Long, FabricWorker<?>> entry :
329                                    fabricWorkers.entrySet()) {
330    
331                            FabricWorker<?> fabricWorker = entry.getValue();
332    
333                            fabricWorker.write(_runtimeExitProcessCallable);
334    
335                            NoticeableFuture<?> noticeableFuture =
336                                    fabricWorker.getProcessNoticeableFuture();
337    
338                            try {
339                                    try {
340                                            noticeableFuture.get(
341                                                    _nettyFabricClientConfig.getExecutionTimeout(),
342                                                    TimeUnit.MILLISECONDS);
343                                    }
344                                    catch (TimeoutException te) {
345                                            fabricWorker.write(_runtimeHaltProcessCallable);
346    
347                                            noticeableFuture.get(
348                                                    _nettyFabricClientConfig.getExecutionTimeout(),
349                                                    TimeUnit.MILLISECONDS);
350                                    }
351                            }
352                            catch (Throwable t) {
353                                    if (t instanceof ExecutionException) {
354                                            Throwable cause = t.getCause();
355    
356                                            if (cause instanceof TerminationProcessException) {
357                                                    TerminationProcessException tpe =
358                                                            (TerminationProcessException)cause;
359    
360                                                    if (_log.isWarnEnabled()) {
361                                                            _log.warn(
362                                                                    "Forcibly terminate fabric worker " +
363                                                                            entry.getKey() + " with exit code " +
364                                                                                    tpe.getExitCode());
365                                                    }
366    
367                                                    continue;
368                                            }
369                                    }
370    
371                                    _log.error(
372                                            "Unable to terminate fabric worker " + entry.getKey(), t);
373                            }
374                    }
375            }
376    
377            protected class NettyFabricClientChannelInitializer
378                    extends ChannelInitializer<SocketChannel> {
379    
380                    @Override
381                    protected void initChannel(SocketChannel socketChannel) {
382                            ChannelPipeline channelPipeline = socketChannel.pipeline();
383    
384                            channelPipeline.addLast(
385                                    AnnotatedObjectEncoder.NAME, AnnotatedObjectEncoder.INSTANCE);
386                            channelPipeline.addLast(
387                                    AnnotatedObjectDecoder.NAME, new AnnotatedObjectDecoder());
388                            channelPipeline.addLast(
389                                    getEventExecutorGroup(
390                                            socketChannel, _fileServerEventExecutorGroupAttributeKey),
391                                    FileRequestChannelHandler.NAME,
392                                    new FileRequestChannelHandler(
393                                            _nettyFabricClientConfig.
394                                                    getFileServerFolderCompressionLevel()));
395                            channelPipeline.addLast(
396                                    getEventExecutorGroup(
397                                            socketChannel, _rpcEventExecutorGroupAttributeKey),
398                                    NettyRPCChannelHandler.NAME, NettyRPCChannelHandler.INSTANCE);
399                    }
400    
401            }
402    
403            protected class PostConnectChannelFutureListener
404                    implements ChannelFutureListener {
405    
406                    @Override
407                    public void operationComplete(ChannelFuture channelFuture)
408                            throws Exception {
409    
410                            Channel channel = channelFuture.channel();
411    
412                            if (channelFuture.isSuccess()) {
413                                    if (_log.isInfoEnabled()) {
414                                            _log.info("Connected to " + channel.remoteAddress());
415                                    }
416    
417                                    registerNettyFabricAgent();
418    
419                                    return;
420                            }
421    
422                            String serverAddress =
423                                    _nettyFabricClientConfig.getNettyFabricServerHost() + ":" +
424                                            _nettyFabricClientConfig.getNettyFabricServerPort();
425    
426                            if (channelFuture.isCancelled()) {
427                                    _log.error("Cancelled connecting to " + serverAddress);
428                            }
429                            else {
430                                    _log.error(
431                                            "Unable to connect to " + serverAddress,
432                                            channelFuture.cause());
433                            }
434    
435                            doDisconnect();
436                    }
437    
438            }
439    
440            protected class PostDisconnectChannelFutureListener
441                    implements ChannelFutureListener {
442    
443                    @Override
444                    public void operationComplete(ChannelFuture channelFuture)
445                            throws InterruptedException {
446    
447                            Channel channel = channelFuture.channel();
448    
449                            if (_log.isInfoEnabled()) {
450                                    _log.info("Disconnected from " + channel.remoteAddress());
451                            }
452    
453                            doDisconnect();
454                    }
455    
456            }
457    
458            protected class PostRegisterChannelFutureListener
459                    implements ChannelFutureListener {
460    
461                    @Override
462                    public void operationComplete(ChannelFuture channelFuture)
463                            throws InterruptedException {
464    
465                            if (channelFuture.isSuccess()) {
466                                    _reconnectCounter.set(
467                                            _nettyFabricClientConfig.getReconnectCount());
468    
469                                    if (_log.isInfoEnabled()) {
470                                            _log.info("Registered Netty fabric agent on " + _channel);
471                                    }
472    
473                                    channelFuture = _channel.closeFuture();
474    
475                                    channelFuture.addListener(
476                                            new PostDisconnectChannelFutureListener());
477    
478                                    return;
479                            }
480    
481                            _log.error("Unable to register Netty fabric agent on " + _channel);
482    
483                            doDisconnect();
484                    }
485    
486            }
487    
488            private static final int _FABRIC_AGENT_SHUTDOWN_CODE = 211;
489    
490            private static final Log _log = LogFactoryUtil.getLog(
491                    NettyFabricClient.class);
492    
493            private static final AttributeKey<EventExecutorGroup>
494                    _executionEventExecutorGroupAttributeKey = AttributeKey.valueOf(
495                            "ExecutionEventExecutorGroup");
496            private static final AttributeKey<EventExecutorGroup>
497                    _fileServerEventExecutorGroupAttributeKey = AttributeKey.valueOf(
498                            "FileServerEventExecutorGroup");
499            private static final AttributeKey<Repository<Channel>>
500                    _repositoryAttributeKey = AttributeKey.valueOf("Repository");
501            private static final AttributeKey<EventExecutorGroup>
502                    _rpcEventExecutorGroupAttributeKey = AttributeKey.valueOf(
503                            "RPCEventExecutorGroup");
504    
505            private static final ProcessCallable<Serializable>
506                    _runtimeExitProcessCallable = new ProcessCallable<Serializable>() {
507    
508                            @Override
509                            public Serializable call() {
510                                    Runtime runtime = Runtime.getRuntime();
511    
512                                    runtime.exit(_FABRIC_AGENT_SHUTDOWN_CODE);
513    
514                                    return null;
515                            }
516    
517                            private static final long serialVersionUID = 1L;
518    
519                    };
520    
521            private static final ProcessCallable<Serializable>
522                    _runtimeHaltProcessCallable = new ProcessCallable<Serializable>() {
523    
524                            @Override
525                            public Serializable call() {
526                                    Runtime runtime = Runtime.getRuntime();
527    
528                                    runtime.halt(_FABRIC_AGENT_SHUTDOWN_CODE);
529    
530                                    return null;
531                            }
532    
533                            private static final long serialVersionUID = 1L;
534    
535                    };
536    
537            private Bootstrap _bootstrap;
538            private volatile Channel _channel;
539            private final NettyFabricClientConfig _nettyFabricClientConfig;
540            private final NettyFabricClientShutdownCallback
541                    _nettyFabricClientShutdownCallback;
542            private final ProcessExecutor _processExecutor;
543            private final AtomicInteger _reconnectCounter = new AtomicInteger();
544    
545            private final Thread _shutdownThread = new Thread() {
546    
547                    @Override
548                    public void run() {
549                            Channel channel = _channel;
550    
551                            if (channel != null) {
552                                    _reconnectCounter.set(0);
553    
554                                    ChannelFuture channelFuture = channel.close();
555    
556                                    channelFuture.syncUninterruptibly();
557                            }
558                    }
559    
560            };
561    
562    }