001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.rpc;
016    
017    import com.liferay.portal.kernel.concurrent.FutureListener;
018    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019    import com.liferay.portal.kernel.log.Log;
020    import com.liferay.portal.kernel.log.LogFactoryUtil;
021    import com.liferay.portal.kernel.util.StringBundler;
022    
023    import io.netty.channel.Channel;
024    import io.netty.channel.ChannelFuture;
025    import io.netty.channel.ChannelFutureListener;
026    
027    import java.io.Serializable;
028    
029    import java.util.concurrent.ExecutionException;
030    import java.util.concurrent.Future;
031    
032    /**
033     * @author Shuyang Zhou
034     */
035    public class RPCRequest<T extends Serializable> extends RPCSerializable {
036    
037            public RPCRequest(long id, RPCCallable<T> rpcCallable) {
038                    super(id);
039    
040                    _rpcCallable = rpcCallable;
041            }
042    
043            @Override
044            public void execute(final Channel channel) {
045                    ChannelThreadLocal.setChannel(channel);
046    
047                    try {
048                            NoticeableFuture<T> noticeableFuture = _rpcCallable.call();
049    
050                            noticeableFuture.addFutureListener(
051                                    new FutureListener<T>() {
052    
053                                            @Override
054                                            public void complete(Future<T> future) {
055                                                    if (future.isCancelled()) {
056                                                            sendRPCResponse(
057                                                                    channel,
058                                                                    new RPCResponse<T>(id, true, null, null));
059    
060                                                            return;
061                                                    }
062    
063                                                    try {
064                                                            sendRPCResponse(
065                                                                    channel,
066                                                                    new RPCResponse<T>(
067                                                                            id, false, future.get(), null));
068                                                    }
069                                                    catch (Throwable throwable) {
070                                                            if (throwable instanceof ExecutionException) {
071                                                                    throwable = throwable.getCause();
072                                                            }
073    
074                                                            sendRPCResponse(
075                                                                    channel,
076                                                                    new RPCResponse<T>(id, false, null, throwable));
077                                                    }
078                                            }
079    
080                                    });
081                    }
082                    catch (Throwable t) {
083                            sendRPCResponse(channel, new RPCResponse<T>(id, false, null, t));
084                    }
085                    finally {
086                            ChannelThreadLocal.removeChannel();
087                    }
088            }
089    
090            @Override
091            public String toString() {
092                    StringBundler sb = new StringBundler(5);
093    
094                    sb.append("{id=");
095                    sb.append(id);
096                    sb.append(", rpcCallable=");
097                    sb.append(_rpcCallable);
098                    sb.append("}");
099    
100                    return sb.toString();
101            }
102    
103            protected void sendRPCResponse(
104                    Channel channel, RPCResponse<T> rpcResponse) {
105    
106                    ChannelFuture channelFuture = channel.writeAndFlush(rpcResponse);
107    
108                    channelFuture.addListener(new LogErrorFutureListener(rpcResponse));
109            }
110    
111            protected static class LogErrorFutureListener
112                    implements ChannelFutureListener {
113    
114                    @Override
115                    public void operationComplete(ChannelFuture channelFuture) {
116                            if (channelFuture.isSuccess()) {
117                                    return;
118                            }
119    
120                            if (channelFuture.isCancelled()) {
121                                    _log.error(
122                                            "Cancelled on sending RPC response: " + _rpcResponse);
123    
124                                    return;
125                            }
126    
127                            _log.error(
128                                    "Unable to send RPC response: " + _rpcResponse,
129                                    channelFuture.cause());
130                    }
131    
132                    protected LogErrorFutureListener(RPCResponse<?> rpcResponse) {
133                            _rpcResponse = rpcResponse;
134                    }
135    
136                    private final RPCResponse<?> _rpcResponse;
137    
138            }
139    
140            private static final Log _log = LogFactoryUtil.getLog(RPCRequest.class);
141    
142            private static final long serialVersionUID = 1L;
143    
144            private final RPCCallable<T> _rpcCallable;
145    
146    }