001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.messaging;
016    
017    import com.liferay.portal.kernel.messaging.Destination;
018    import com.liferay.portal.kernel.messaging.DestinationWrapper;
019    import com.liferay.portal.kernel.messaging.Message;
020    import com.liferay.portal.kernel.messaging.MessageBusUtil;
021    import com.liferay.portal.kernel.messaging.MessageListener;
022    import com.liferay.portal.kernel.nio.intraband.Datagram;
023    import com.liferay.portal.kernel.nio.intraband.Intraband;
024    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
025    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
026    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
027    import com.liferay.portal.kernel.resiliency.spi.SPI;
028    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
029    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
030    import com.liferay.portal.kernel.util.StringPool;
031    
032    import java.nio.ByteBuffer;
033    
034    import java.rmi.RemoteException;
035    
036    import java.util.List;
037    import java.util.Set;
038    
039    /**
040     * @author Shuyang Zhou
041     */
042    public class IntrabandBridgeDestination extends DestinationWrapper {
043    
044            public IntrabandBridgeDestination(Destination destination) {
045                    super(destination);
046            }
047    
048            @Override
049            public void send(Message message) {
050                    message.setDestinationName(getName());
051    
052                    MessageRoutingBag messageRoutingBag =
053                            (MessageRoutingBag)message.get(
054                                    MessageRoutingBag.MESSAGE_ROUTING_BAG);
055    
056                    if (messageRoutingBag == null) {
057                            messageRoutingBag = new MessageRoutingBag(message, true);
058    
059                            message.put(
060                                    MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
061                    }
062    
063                    sendMessageRoutingBag(messageRoutingBag);
064    
065                    try {
066                            Message responseMessage = messageRoutingBag.getMessage();
067    
068                            responseMessage.copyTo(message);
069    
070                            messageRoutingBag.setMessage(message);
071                    }
072                    catch (ClassNotFoundException cnfe) {
073                            throw new RuntimeException(cnfe);
074                    }
075    
076                    Set<MessageListener> messageListeners =
077                            destination.getMessageListeners();
078    
079                    for (MessageListener messageListener : messageListeners) {
080                            try {
081                                    messageListener.receive(message);
082                            }
083                            catch (Exception e) {
084                                    throw new RuntimeException(e);
085                            }
086                    }
087            }
088    
089            public void sendMessageRoutingBag(MessageRoutingBag messageRoutingBag) {
090                    if (SPIUtil.isSPI()) {
091                            SPI spi = SPIUtil.getSPI();
092    
093                            try {
094                                    String routingId = toRoutingId(spi);
095    
096                                    messageRoutingBag.appendRoutingId(routingId);
097    
098                                    if (!messageRoutingBag.isRoutingDowncast()) {
099                                            RegistrationReference registrationReference =
100                                                    spi.getRegistrationReference();
101    
102                                            sendMessageRoutingBag(
103                                                    registrationReference, messageRoutingBag);
104                                    }
105                            }
106                            catch (Exception e) {
107                                    throw new RuntimeException(e);
108                            }
109                    }
110    
111                    List<SPI> spis = MPIHelperUtil.getSPIs();
112    
113                    if (spis.isEmpty() && !SPIUtil.isSPI()) {
114                            MessageBusUtil.addDestination(destination);
115                    }
116                    else {
117                            messageRoutingBag.setRoutingDowncast(true);
118    
119                            try {
120                                    for (SPI spi : spis) {
121                                            String routingId = toRoutingId(spi);
122    
123                                            if (!messageRoutingBag.isVisited(routingId)) {
124                                                    RegistrationReference registrationReference =
125                                                            spi.getRegistrationReference();
126    
127                                                    sendMessageRoutingBag(
128                                                            registrationReference, messageRoutingBag);
129                                            }
130                                    }
131                            }
132                            catch (Exception e) {
133                                    throw new RuntimeException(e);
134                            }
135                    }
136            }
137    
138            protected void sendMessageRoutingBag(
139                    RegistrationReference registrationReference,
140                    MessageRoutingBag messageRoutingBag) {
141    
142                    try {
143                            Intraband intraband = registrationReference.getIntraband();
144    
145                            Datagram datagram = intraband.sendSyncDatagram(
146                                    registrationReference,
147                                    Datagram.createRequestDatagram(
148                                            SystemDataType.MESSAGE.getValue(),
149                                            messageRoutingBag.toByteArray()));
150    
151                            ByteBuffer byteBuffer = datagram.getDataByteBuffer();
152    
153                            MessageRoutingBag receivedMessageRoutingBag =
154                                    MessageRoutingBag.fromByteArray(byteBuffer.array());
155    
156                            Message receivedMessage = receivedMessageRoutingBag.getMessage();
157    
158                            Message message = messageRoutingBag.getMessage();
159    
160                            receivedMessage.copyTo(message);
161    
162                            message.put(
163                                    MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
164                    }
165                    catch (Exception e) {
166                            throw new RuntimeException(e);
167                    }
168            }
169    
170            protected String toRoutingId(SPI spi) throws RemoteException {
171                    String spiProviderName = spi.getSPIProviderName();
172    
173                    SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
174    
175                    String spiId = spiConfiguration.getSPIId();
176    
177                    return spiProviderName.concat(StringPool.POUND).concat(spiId);
178            }
179    
180    }