001
014
015 package com.liferay.portal.fabric.netty.util;
016
017 import com.liferay.portal.kernel.concurrent.FutureListener;
018 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019 import com.liferay.portal.kernel.log.Log;
020 import com.liferay.portal.kernel.log.LogFactoryUtil;
021
022 import io.netty.channel.Channel;
023 import io.netty.channel.ChannelInitializer;
024 import io.netty.channel.ChannelPipeline;
025 import io.netty.channel.EventLoop;
026 import io.netty.channel.embedded.EmbeddedChannel;
027 import io.netty.util.concurrent.EventExecutorGroup;
028
029 import java.util.concurrent.Future;
030 import java.util.concurrent.TimeUnit;
031
032
035 public class NettyUtil {
036
037 public static void bindShutdown(
038 EventExecutorGroup master, final EventExecutorGroup slave,
039 final long quietPeriod, final long timeout) {
040
041 io.netty.util.concurrent.Future<?> future = master.terminationFuture();
042
043 future.addListener(
044 new io.netty.util.concurrent.FutureListener<Object>() {
045
046 @Override
047 public void operationComplete(
048 io.netty.util.concurrent.Future<Object> future)
049 throws InterruptedException {
050
051 slave.shutdownGracefully(
052 quietPeriod, timeout, TimeUnit.MILLISECONDS);
053
054 if (!slave.awaitTermination(
055 timeout, TimeUnit.MILLISECONDS) &&
056 _log.isWarnEnabled()) {
057
058 _log.warn("Bind shutdown timeout " + slave);
059 }
060 }
061
062 });
063 }
064
065 public static ChannelPipeline createEmptyChannelPipeline() {
066 Channel channel = new EmbeddedChannel(
067 new ChannelInitializer<Channel>() {
068
069 @Override
070 protected void initChannel(Channel channel) {
071 }
072
073 });
074
075 ChannelPipeline channelPipeline = channel.pipeline();
076
077 channelPipeline.removeLast();
078
079 return channelPipeline;
080 }
081
082 public static <T> void scheduleCancellation(
083 Channel channel, final NoticeableFuture<T> noticeableFuture,
084 long timeout) {
085
086 EventLoop eventLoop = channel.eventLoop();
087
088 final Future<?> cancellationFuture = eventLoop.schedule(
089 new Runnable() {
090
091 @Override
092 public void run() {
093 if (noticeableFuture.cancel(true) && _log.isWarnEnabled()) {
094 _log.warn("Cancelled timeout " + noticeableFuture);
095 }
096 }
097
098 },
099 timeout, TimeUnit.MILLISECONDS);
100
101 noticeableFuture.addFutureListener(
102 new FutureListener<T>() {
103
104 @Override
105 public void complete(Future<T> future) {
106 if (cancellationFuture.cancel(true) &&
107 _log.isDebugEnabled()) {
108
109 _log.debug(
110 "Cancelled scheduled cancellation for " +
111 noticeableFuture);
112 }
113 }
114
115 });
116 }
117
118 private static final Log _log = LogFactoryUtil.getLog(NettyUtil.class);
119
120 }