001
014
015 package com.liferay.portal.fabric.netty.rpc;
016
017 import com.liferay.portal.kernel.concurrent.FutureListener;
018 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019 import com.liferay.portal.kernel.log.Log;
020 import com.liferay.portal.kernel.log.LogFactoryUtil;
021 import com.liferay.portal.kernel.util.StringBundler;
022
023 import io.netty.channel.Channel;
024 import io.netty.channel.ChannelFuture;
025 import io.netty.channel.ChannelFutureListener;
026
027 import java.io.Serializable;
028
029 import java.util.concurrent.ExecutionException;
030 import java.util.concurrent.Future;
031
032
035 public class RPCRequest<T extends Serializable> extends RPCSerializable {
036
037 public RPCRequest(long id, RPCCallable<T> rpcCallable) {
038 super(id);
039
040 _rpcCallable = rpcCallable;
041 }
042
043 @Override
044 public void execute(final Channel channel) {
045 ChannelThreadLocal.setChannel(channel);
046
047 try {
048 NoticeableFuture<T> noticeableFuture = _rpcCallable.call();
049
050 noticeableFuture.addFutureListener(
051 new FutureListener<T>() {
052
053 @Override
054 public void complete(Future<T> future) {
055 if (future.isCancelled()) {
056 sendRPCResponse(
057 channel,
058 new RPCResponse<T>(id, true, null, null));
059
060 return;
061 }
062
063 try {
064 sendRPCResponse(
065 channel,
066 new RPCResponse<T>(
067 id, false, future.get(), null));
068 }
069 catch (Throwable throwable) {
070 if (throwable instanceof ExecutionException) {
071 throwable = throwable.getCause();
072 }
073
074 sendRPCResponse(
075 channel,
076 new RPCResponse<T>(id, false, null, throwable));
077 }
078 }
079
080 });
081 }
082 catch (Throwable t) {
083 sendRPCResponse(channel, new RPCResponse<T>(id, false, null, t));
084 }
085 finally {
086 ChannelThreadLocal.removeChannel();
087 }
088 }
089
090 @Override
091 public String toString() {
092 StringBundler sb = new StringBundler(5);
093
094 sb.append("{id=");
095 sb.append(id);
096 sb.append(", rpcCallable=");
097 sb.append(_rpcCallable);
098 sb.append("}");
099
100 return sb.toString();
101 }
102
103 protected void sendRPCResponse(
104 Channel channel, RPCResponse<T> rpcResponse) {
105
106 ChannelFuture channelFuture = channel.writeAndFlush(rpcResponse);
107
108 channelFuture.addListener(new LogErrorFutureListener(rpcResponse));
109 }
110
111 protected static class LogErrorFutureListener
112 implements ChannelFutureListener {
113
114 @Override
115 public void operationComplete(ChannelFuture channelFuture) {
116 if (channelFuture.isSuccess()) {
117 return;
118 }
119
120 if (channelFuture.isCancelled()) {
121 _log.error(
122 "Cancelled on sending RPC response: " + _rpcResponse);
123
124 return;
125 }
126
127 _log.error(
128 "Unable to send RPC response: " + _rpcResponse,
129 channelFuture.cause());
130 }
131
132 protected LogErrorFutureListener(RPCResponse<?> rpcResponse) {
133 _rpcResponse = rpcResponse;
134 }
135
136 private final RPCResponse<?> _rpcResponse;
137
138 }
139
140 private static final Log _log = LogFactoryUtil.getLog(RPCRequest.class);
141
142 private static final long serialVersionUID = 1L;
143
144 private final RPCCallable<T> _rpcCallable;
145
146 }