001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.messaging;
016    
017    import com.liferay.portal.kernel.messaging.Destination;
018    import com.liferay.portal.kernel.messaging.DestinationWrapper;
019    import com.liferay.portal.kernel.messaging.Message;
020    import com.liferay.portal.kernel.messaging.MessageBusUtil;
021    import com.liferay.portal.kernel.messaging.MessageListener;
022    import com.liferay.portal.kernel.messaging.proxy.MessagingProxy;
023    import com.liferay.portal.kernel.nio.intraband.Datagram;
024    import com.liferay.portal.kernel.nio.intraband.Intraband;
025    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
026    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
027    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
028    import com.liferay.portal.kernel.resiliency.spi.SPI;
029    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
030    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031    import com.liferay.portal.kernel.util.StringPool;
032    
033    import java.nio.ByteBuffer;
034    
035    import java.rmi.RemoteException;
036    
037    import java.util.List;
038    import java.util.Set;
039    
040    /**
041     * @author Shuyang Zhou
042     */
043    public class IntrabandBridgeDestination extends DestinationWrapper {
044    
045            public IntrabandBridgeDestination(Destination destination) {
046                    super(destination);
047            }
048    
049            @Override
050            public void send(Message message) {
051                    if (message.getBoolean(MessagingProxy.LOCAL_MESSAGE)) {
052                            destination.send(message);
053    
054                            return;
055                    }
056    
057                    message.setDestinationName(getName());
058    
059                    MessageRoutingBag messageRoutingBag = (MessageRoutingBag)message.get(
060                            MessageRoutingBag.MESSAGE_ROUTING_BAG);
061    
062                    if (messageRoutingBag == null) {
063                            messageRoutingBag = new MessageRoutingBag(message, true);
064    
065                            message.put(
066                                    MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
067                    }
068    
069                    sendMessageRoutingBag(messageRoutingBag);
070    
071                    try {
072                            Message responseMessage = messageRoutingBag.getMessage();
073    
074                            responseMessage.copyTo(message);
075    
076                            messageRoutingBag.setMessage(message);
077                    }
078                    catch (ClassNotFoundException cnfe) {
079                            throw new RuntimeException(cnfe);
080                    }
081    
082                    Set<MessageListener> messageListeners =
083                            destination.getMessageListeners();
084    
085                    for (MessageListener messageListener : messageListeners) {
086                            try {
087                                    messageListener.receive(message);
088                            }
089                            catch (Exception e) {
090                                    throw new RuntimeException(e);
091                            }
092                    }
093            }
094    
095            public void sendMessageRoutingBag(MessageRoutingBag messageRoutingBag) {
096                    if (SPIUtil.isSPI()) {
097                            SPI spi = SPIUtil.getSPI();
098    
099                            try {
100                                    String routingId = toRoutingId(spi);
101    
102                                    messageRoutingBag.appendRoutingId(routingId);
103    
104                                    if (!messageRoutingBag.isRoutingDowncast()) {
105                                            RegistrationReference registrationReference =
106                                                    spi.getRegistrationReference();
107    
108                                            sendMessageRoutingBag(
109                                                    registrationReference, messageRoutingBag);
110                                    }
111                            }
112                            catch (Exception e) {
113                                    throw new RuntimeException(e);
114                            }
115                    }
116    
117                    List<SPI> spis = MPIHelperUtil.getSPIs();
118    
119                    if (spis.isEmpty() && !SPIUtil.isSPI()) {
120                            MessageBusUtil.addDestination(destination);
121                    }
122                    else {
123                            messageRoutingBag.setRoutingDowncast(true);
124    
125                            try {
126                                    for (SPI spi : spis) {
127                                            String routingId = toRoutingId(spi);
128    
129                                            if (!messageRoutingBag.isVisited(routingId)) {
130                                                    RegistrationReference registrationReference =
131                                                            spi.getRegistrationReference();
132    
133                                                    sendMessageRoutingBag(
134                                                            registrationReference, messageRoutingBag);
135                                            }
136                                    }
137                            }
138                            catch (Exception e) {
139                                    throw new RuntimeException(e);
140                            }
141                    }
142            }
143    
144            protected void sendMessageRoutingBag(
145                    RegistrationReference registrationReference,
146                    MessageRoutingBag messageRoutingBag) {
147    
148                    try {
149                            Intraband intraband = registrationReference.getIntraband();
150    
151                            Datagram datagram = intraband.sendSyncDatagram(
152                                    registrationReference,
153                                    Datagram.createRequestDatagram(
154                                            SystemDataType.MESSAGE.getValue(),
155                                            messageRoutingBag.toByteArray()));
156    
157                            ByteBuffer byteBuffer = datagram.getDataByteBuffer();
158    
159                            MessageRoutingBag receivedMessageRoutingBag =
160                                    MessageRoutingBag.fromByteArray(byteBuffer.array());
161    
162                            Message receivedMessage = receivedMessageRoutingBag.getMessage();
163    
164                            Message message = messageRoutingBag.getMessage();
165    
166                            receivedMessage.copyTo(message);
167    
168                            message.put(
169                                    MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
170                    }
171                    catch (Exception e) {
172                            throw new RuntimeException(e);
173                    }
174            }
175    
176            protected String toRoutingId(SPI spi) throws RemoteException {
177                    String spiProviderName = spi.getSPIProviderName();
178    
179                    SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
180    
181                    String spiId = spiConfiguration.getSPIId();
182    
183                    return spiProviderName.concat(StringPool.POUND).concat(spiId);
184            }
185    
186    }