001
014
015 package com.liferay.portal.kernel.nio.intraband.messaging;
016
017 import com.liferay.portal.kernel.messaging.Destination;
018 import com.liferay.portal.kernel.messaging.DestinationWrapper;
019 import com.liferay.portal.kernel.messaging.Message;
020 import com.liferay.portal.kernel.messaging.MessageBusUtil;
021 import com.liferay.portal.kernel.messaging.MessageListener;
022 import com.liferay.portal.kernel.messaging.proxy.MessagingProxy;
023 import com.liferay.portal.kernel.nio.intraband.Datagram;
024 import com.liferay.portal.kernel.nio.intraband.Intraband;
025 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
026 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
027 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
028 import com.liferay.portal.kernel.resiliency.spi.SPI;
029 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
030 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031 import com.liferay.portal.kernel.util.StringPool;
032
033 import java.nio.ByteBuffer;
034
035 import java.rmi.RemoteException;
036
037 import java.util.List;
038 import java.util.Set;
039
040
043 public class IntrabandBridgeDestination extends DestinationWrapper {
044
045 public IntrabandBridgeDestination(Destination destination) {
046 super(destination);
047 }
048
049 @Override
050 public void send(Message message) {
051 if (message.getBoolean(MessagingProxy.LOCAL_MESSAGE)) {
052 destination.send(message);
053
054 return;
055 }
056
057 message.setDestinationName(getName());
058
059 MessageRoutingBag messageRoutingBag = (MessageRoutingBag)message.get(
060 MessageRoutingBag.MESSAGE_ROUTING_BAG);
061
062 if (messageRoutingBag == null) {
063 messageRoutingBag = new MessageRoutingBag(message, true);
064
065 message.put(
066 MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
067 }
068
069 sendMessageRoutingBag(messageRoutingBag);
070
071 try {
072 Message responseMessage = messageRoutingBag.getMessage();
073
074 responseMessage.copyTo(message);
075
076 messageRoutingBag.setMessage(message);
077 }
078 catch (ClassNotFoundException cnfe) {
079 throw new RuntimeException(cnfe);
080 }
081
082 Set<MessageListener> messageListeners =
083 destination.getMessageListeners();
084
085 for (MessageListener messageListener : messageListeners) {
086 try {
087 messageListener.receive(message);
088 }
089 catch (Exception e) {
090 throw new RuntimeException(e);
091 }
092 }
093 }
094
095 public void sendMessageRoutingBag(MessageRoutingBag messageRoutingBag) {
096 if (SPIUtil.isSPI()) {
097 SPI spi = SPIUtil.getSPI();
098
099 try {
100 String routingId = toRoutingId(spi);
101
102 messageRoutingBag.appendRoutingId(routingId);
103
104 if (!messageRoutingBag.isRoutingDowncast()) {
105 RegistrationReference registrationReference =
106 spi.getRegistrationReference();
107
108 sendMessageRoutingBag(
109 registrationReference, messageRoutingBag);
110 }
111 }
112 catch (Exception e) {
113 throw new RuntimeException(e);
114 }
115 }
116
117 List<SPI> spis = MPIHelperUtil.getSPIs();
118
119 if (spis.isEmpty() && !SPIUtil.isSPI()) {
120 MessageBusUtil.addDestination(destination);
121 }
122 else {
123 messageRoutingBag.setRoutingDowncast(true);
124
125 try {
126 for (SPI spi : spis) {
127 String routingId = toRoutingId(spi);
128
129 if (!messageRoutingBag.isVisited(routingId)) {
130 RegistrationReference registrationReference =
131 spi.getRegistrationReference();
132
133 sendMessageRoutingBag(
134 registrationReference, messageRoutingBag);
135 }
136 }
137 }
138 catch (Exception e) {
139 throw new RuntimeException(e);
140 }
141 }
142 }
143
144 protected void sendMessageRoutingBag(
145 RegistrationReference registrationReference,
146 MessageRoutingBag messageRoutingBag) {
147
148 try {
149 Intraband intraband = registrationReference.getIntraband();
150
151 Datagram datagram = intraband.sendSyncDatagram(
152 registrationReference,
153 Datagram.createRequestDatagram(
154 SystemDataType.MESSAGE.getValue(),
155 messageRoutingBag.toByteArray()));
156
157 ByteBuffer byteBuffer = datagram.getDataByteBuffer();
158
159 MessageRoutingBag receivedMessageRoutingBag =
160 MessageRoutingBag.fromByteArray(byteBuffer.array());
161
162 Message receivedMessage = receivedMessageRoutingBag.getMessage();
163
164 Message message = messageRoutingBag.getMessage();
165
166 receivedMessage.copyTo(message);
167
168 message.put(
169 MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
170 }
171 catch (Exception e) {
172 throw new RuntimeException(e);
173 }
174 }
175
176 protected String toRoutingId(SPI spi) throws RemoteException {
177 String spiProviderName = spi.getSPIProviderName();
178
179 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
180
181 String spiId = spiConfiguration.getSPIId();
182
183 return spiProviderName.concat(StringPool.POUND).concat(spiId);
184 }
185
186 }