001
014
015 package com.liferay.portal.fabric.netty.rpc;
016
017 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
018 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019 import com.liferay.portal.kernel.log.Log;
020 import com.liferay.portal.kernel.log.LogFactoryUtil;
021 import com.liferay.portal.kernel.util.StringBundler;
022
023 import io.netty.channel.Channel;
024 import io.netty.channel.ChannelFuture;
025 import io.netty.channel.ChannelFutureListener;
026
027 import java.io.Serializable;
028
029 import java.util.concurrent.Future;
030
031
034 public class RPCRequest<T extends Serializable> extends RPCSerializable {
035
036 public RPCRequest(long id, RPCCallable<T> rpcCallable) {
037 super(id);
038
039 _rpcCallable = rpcCallable;
040 }
041
042 @Override
043 public void execute(final Channel channel) {
044 ChannelThreadLocal.setChannel(channel);
045
046 try {
047 NoticeableFuture<T> noticeableFuture = _rpcCallable.call();
048
049 noticeableFuture.addFutureListener(
050 new BaseFutureListener<T>() {
051
052 @Override
053 public void completeWithCancel(Future<T> future) {
054 sendRPCResponse(
055 channel, new RPCResponse<T>(id, true, null, null));
056 }
057
058 @Override
059 public void completeWithException(
060 Future<T> future, Throwable throwable) {
061
062 sendRPCResponse(
063 channel,
064 new RPCResponse<T>(id, false, null, throwable));
065 }
066
067 @Override
068 public void completeWithResult(Future<T> future, T result) {
069 sendRPCResponse(
070 channel,
071 new RPCResponse<T>(id, false, result, null));
072 }
073
074 });
075 }
076 catch (Throwable t) {
077 sendRPCResponse(channel, new RPCResponse<T>(id, false, null, t));
078 }
079 finally {
080 ChannelThreadLocal.removeChannel();
081 }
082 }
083
084 @Override
085 public String toString() {
086 StringBundler sb = new StringBundler(5);
087
088 sb.append("{id=");
089 sb.append(id);
090 sb.append(", rpcCallable=");
091 sb.append(_rpcCallable);
092 sb.append("}");
093
094 return sb.toString();
095 }
096
097 protected void sendRPCResponse(
098 Channel channel, RPCResponse<T> rpcResponse) {
099
100 ChannelFuture channelFuture = channel.writeAndFlush(rpcResponse);
101
102 channelFuture.addListener(new LogErrorFutureListener(rpcResponse));
103 }
104
105 protected static class LogErrorFutureListener
106 implements ChannelFutureListener {
107
108 @Override
109 public void operationComplete(ChannelFuture channelFuture) {
110 if (channelFuture.isSuccess()) {
111 return;
112 }
113
114 if (channelFuture.isCancelled()) {
115 _log.error(
116 "Cancelled on sending RPC response: " + _rpcResponse);
117
118 return;
119 }
120
121 _log.error(
122 "Unable to send RPC response: " + _rpcResponse,
123 channelFuture.cause());
124 }
125
126 protected LogErrorFutureListener(RPCResponse<?> rpcResponse) {
127 _rpcResponse = rpcResponse;
128 }
129
130 private final RPCResponse<?> _rpcResponse;
131
132 }
133
134 private static final Log _log = LogFactoryUtil.getLog(RPCRequest.class);
135
136 private static final long serialVersionUID = 1L;
137
138 private final RPCCallable<T> _rpcCallable;
139
140 }