001
014
015 package com.liferay.portal.fabric.netty.rpc;
016
017 import com.liferay.portal.fabric.netty.handlers.NettyChannelAttributes;
018 import com.liferay.portal.kernel.concurrent.AsyncBroker;
019 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022
023 import io.netty.channel.Channel;
024 import io.netty.channel.ChannelFuture;
025 import io.netty.channel.ChannelFutureListener;
026
027 import java.io.Serializable;
028
029
032 public class RPCUtil {
033
034 public static <T extends Serializable> NoticeableFuture<T> execute(
035 Channel channel, RPCCallable<T> rpcCallable) {
036
037 final AsyncBroker<Long, T> asyncBroker =
038 NettyChannelAttributes.getAsyncBroker(channel);
039
040 final long id = NettyChannelAttributes.nextId(channel);
041
042 final NoticeableFuture<T> noticeableFuture = asyncBroker.post(id);
043
044 ChannelFuture channelFuture = channel.writeAndFlush(
045 new RPCRequest<T>(id, rpcCallable));
046
047 channelFuture.addListener(
048 new ChannelFutureListener() {
049
050 @Override
051 public void operationComplete(ChannelFuture channelFuture) {
052 if (channelFuture.isSuccess()) {
053 return;
054 }
055
056 if (channelFuture.isCancelled()) {
057 noticeableFuture.cancel(true);
058
059 return;
060 }
061
062 if (!asyncBroker.takeWithException(
063 id, channelFuture.cause())) {
064
065 _log.error(
066 "Unable to place exception because no future " +
067 "exists with ID " + id,
068 channelFuture.cause());
069 }
070 }
071
072 });
073
074 return noticeableFuture;
075 }
076
077 private static final Log _log = LogFactoryUtil.getLog(RPCUtil.class);
078
079 }