001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.rpc;
016    
017    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
018    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019    import com.liferay.portal.kernel.log.Log;
020    import com.liferay.portal.kernel.log.LogFactoryUtil;
021    import com.liferay.portal.kernel.util.StringBundler;
022    
023    import io.netty.channel.Channel;
024    import io.netty.channel.ChannelFuture;
025    import io.netty.channel.ChannelFutureListener;
026    
027    import java.io.Serializable;
028    
029    import java.util.concurrent.Future;
030    
031    /**
032     * @author Shuyang Zhou
033     */
034    public class RPCRequest<T extends Serializable> extends RPCSerializable {
035    
036            public RPCRequest(long id, RPCCallable<T> rpcCallable) {
037                    super(id);
038    
039                    _rpcCallable = rpcCallable;
040            }
041    
042            @Override
043            public void execute(final Channel channel) {
044                    ChannelThreadLocal.setChannel(channel);
045    
046                    try {
047                            NoticeableFuture<T> noticeableFuture = _rpcCallable.call();
048    
049                            noticeableFuture.addFutureListener(
050                                    new BaseFutureListener<T>() {
051    
052                                            @Override
053                                            public void completeWithCancel(Future<T> future) {
054                                                    sendRPCResponse(
055                                                            channel, new RPCResponse<T>(id, true, null, null));
056                                            }
057    
058                                            @Override
059                                            public void completeWithException(
060                                                    Future<T> future, Throwable throwable) {
061    
062                                                    sendRPCResponse(
063                                                            channel,
064                                                            new RPCResponse<T>(id, false, null, throwable));
065                                            }
066    
067                                            @Override
068                                            public void completeWithResult(Future<T> future, T result) {
069                                                    sendRPCResponse(
070                                                            channel,
071                                                            new RPCResponse<T>(id, false, result, null));
072                                            }
073    
074                                    });
075                    }
076                    catch (Throwable t) {
077                            sendRPCResponse(channel, new RPCResponse<T>(id, false, null, t));
078                    }
079                    finally {
080                            ChannelThreadLocal.removeChannel();
081                    }
082            }
083    
084            @Override
085            public String toString() {
086                    StringBundler sb = new StringBundler(5);
087    
088                    sb.append("{id=");
089                    sb.append(id);
090                    sb.append(", rpcCallable=");
091                    sb.append(_rpcCallable);
092                    sb.append("}");
093    
094                    return sb.toString();
095            }
096    
097            protected void sendRPCResponse(
098                    Channel channel, RPCResponse<T> rpcResponse) {
099    
100                    ChannelFuture channelFuture = channel.writeAndFlush(rpcResponse);
101    
102                    channelFuture.addListener(new LogErrorFutureListener(rpcResponse));
103            }
104    
105            protected static class LogErrorFutureListener
106                    implements ChannelFutureListener {
107    
108                    @Override
109                    public void operationComplete(ChannelFuture channelFuture) {
110                            if (channelFuture.isSuccess()) {
111                                    return;
112                            }
113    
114                            if (channelFuture.isCancelled()) {
115                                    _log.error(
116                                            "Cancelled on sending RPC response: " + _rpcResponse);
117    
118                                    return;
119                            }
120    
121                            _log.error(
122                                    "Unable to send RPC response: " + _rpcResponse,
123                                    channelFuture.cause());
124                    }
125    
126                    protected LogErrorFutureListener(RPCResponse<?> rpcResponse) {
127                            _rpcResponse = rpcResponse;
128                    }
129    
130                    private final RPCResponse<?> _rpcResponse;
131    
132            }
133    
134            private static final Log _log = LogFactoryUtil.getLog(RPCRequest.class);
135    
136            private static final long serialVersionUID = 1L;
137    
138            private final RPCCallable<T> _rpcCallable;
139    
140    }