001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.util;
016    
017    import com.liferay.portal.kernel.concurrent.FutureListener;
018    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019    import com.liferay.portal.kernel.log.Log;
020    import com.liferay.portal.kernel.log.LogFactoryUtil;
021    
022    import io.netty.channel.Channel;
023    import io.netty.channel.ChannelInitializer;
024    import io.netty.channel.ChannelPipeline;
025    import io.netty.channel.EventLoop;
026    import io.netty.channel.embedded.EmbeddedChannel;
027    import io.netty.util.concurrent.EventExecutorGroup;
028    
029    import java.util.concurrent.Future;
030    import java.util.concurrent.TimeUnit;
031    
032    /**
033     * @author Shuyang Zhou
034     */
035    public class NettyUtil {
036    
037            public static void bindShutdown(
038                    EventExecutorGroup master, final EventExecutorGroup slave,
039                    final long quietPeriod, final long timeout) {
040    
041                    io.netty.util.concurrent.Future<?> future = master.terminationFuture();
042    
043                    future.addListener(
044                            new io.netty.util.concurrent.FutureListener<Object>() {
045    
046                                    @Override
047                                    public void operationComplete(
048                                                    io.netty.util.concurrent.Future<Object> future)
049                                            throws InterruptedException {
050    
051                                            slave.shutdownGracefully(
052                                                    quietPeriod, timeout, TimeUnit.MILLISECONDS);
053    
054                                            if (!slave.awaitTermination(
055                                                            timeout, TimeUnit.MILLISECONDS) &&
056                                                    _log.isWarnEnabled()) {
057    
058                                                    _log.warn("Bind shutdown timeout " + slave);
059                                            }
060                                    }
061    
062                            });
063            }
064    
065            public static ChannelPipeline createEmptyChannelPipeline() {
066                    Channel channel = new EmbeddedChannel(
067                            new ChannelInitializer<Channel>() {
068    
069                                    @Override
070                                    protected void initChannel(Channel channel) {
071                                    }
072    
073                            });
074    
075                    ChannelPipeline channelPipeline = channel.pipeline();
076    
077                    channelPipeline.removeLast();
078    
079                    return channelPipeline;
080            }
081    
082            public static <T> void scheduleCancellation(
083                    Channel channel, final NoticeableFuture<T> noticeableFuture,
084                    long timeout) {
085    
086                    EventLoop eventLoop = channel.eventLoop();
087    
088                    final Future<?> cancellationFuture = eventLoop.schedule(
089                            new Runnable() {
090    
091                                    @Override
092                                    public void run() {
093                                            if (noticeableFuture.cancel(true) && _log.isWarnEnabled()) {
094                                                    _log.warn("Cancelled timeout " + noticeableFuture);
095                                            }
096                                    }
097    
098                            },
099                            timeout, TimeUnit.MILLISECONDS);
100    
101                    noticeableFuture.addFutureListener(
102                            new FutureListener<T>() {
103    
104                                    @Override
105                                    public void complete(Future<T> future) {
106                                            if (cancellationFuture.cancel(true) &&
107                                                    _log.isDebugEnabled()) {
108    
109                                                    _log.debug(
110                                                            "Cancelled scheduled cancellation for " +
111                                                                    noticeableFuture);
112                                            }
113                                    }
114    
115                            });
116            }
117    
118            private static final Log _log = LogFactoryUtil.getLog(NettyUtil.class);
119    
120    }