001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.rpc;
016    
017    import com.liferay.portal.kernel.concurrent.DefaultNoticeableFuture;
018    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
019    import com.liferay.portal.kernel.io.Deserializer;
020    import com.liferay.portal.kernel.io.Serializer;
021    import com.liferay.portal.kernel.nio.intraband.CompletionHandler;
022    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
023    import com.liferay.portal.kernel.nio.intraband.Datagram;
024    import com.liferay.portal.kernel.nio.intraband.Intraband;
025    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
026    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
027    import com.liferay.portal.kernel.process.ProcessCallable;
028    
029    import java.io.IOException;
030    import java.io.Serializable;
031    
032    import java.util.EnumSet;
033    
034    /**
035     * @author Shuyang Zhou
036     */
037    public class IntrabandRPCUtil {
038    
039            public static <V extends Serializable> NoticeableFuture<V> execute(
040                    RegistrationReference registrationReference,
041                    ProcessCallable<V> processCallable) {
042    
043                    Intraband intraband = registrationReference.getIntraband();
044    
045                    SystemDataType systemDataType = SystemDataType.RPC;
046    
047                    Serializer serializer = new Serializer();
048    
049                    serializer.writeObject(processCallable);
050    
051                    Datagram datagram = Datagram.createRequestDatagram(
052                            systemDataType.getValue(), serializer.toByteBuffer());
053    
054                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
055                            new DefaultNoticeableFuture<>();
056    
057                    intraband.sendDatagram(
058                            registrationReference, datagram, null, repliedEnumSet,
059                            new FutureCompletionHandler<V>(defaultNoticeableFuture));
060    
061                    return defaultNoticeableFuture;
062            }
063    
064            protected static EnumSet<CompletionType> repliedEnumSet = EnumSet.of(
065                    CompletionType.REPLIED);
066    
067            protected static class FutureCompletionHandler<V extends Serializable>
068                    implements CompletionHandler<Object> {
069    
070                    @Override
071                    public void delivered(Object attachment) {
072                    }
073    
074                    @Override
075                    public void failed(Object attachment, IOException ioe) {
076                            _defaultNoticeableFuture.setException(ioe);
077                    }
078    
079                    @Override
080                    public void replied(Object attachment, Datagram datagram) {
081                            Deserializer deserializer = new Deserializer(
082                                    datagram.getDataByteBuffer());
083    
084                            try {
085                                    RPCResponse rpcResponse = deserializer.readObject();
086    
087                                    Exception exception = rpcResponse.getException();
088    
089                                    if (exception != null) {
090                                            _defaultNoticeableFuture.setException(exception);
091                                    }
092                                    else {
093                                            _defaultNoticeableFuture.set((V)rpcResponse.getResult());
094                                    }
095                            }
096                            catch (ClassNotFoundException cnfe) {
097                                    _defaultNoticeableFuture.setException(cnfe);
098                            }
099                    }
100    
101                    @Override
102                    public void submitted(Object attachment) {
103                    }
104    
105                    @Override
106                    public void timedOut(Object attachment) {
107                            _defaultNoticeableFuture.cancel(true);
108                    }
109    
110                    protected FutureCompletionHandler(
111                            DefaultNoticeableFuture<V> defaultNoticeableFuture) {
112    
113                            _defaultNoticeableFuture = defaultNoticeableFuture;
114                    }
115    
116                    private final DefaultNoticeableFuture<V> _defaultNoticeableFuture;
117    
118            }
119    
120    }