001
014
015 package com.liferay.portal.kernel.nio.intraband.messaging;
016
017 import com.liferay.portal.kernel.messaging.Destination;
018 import com.liferay.portal.kernel.messaging.DestinationWrapper;
019 import com.liferay.portal.kernel.messaging.Message;
020 import com.liferay.portal.kernel.messaging.MessageBusUtil;
021 import com.liferay.portal.kernel.messaging.MessageListener;
022 import com.liferay.portal.kernel.nio.intraband.Datagram;
023 import com.liferay.portal.kernel.nio.intraband.Intraband;
024 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
025 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
026 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
027 import com.liferay.portal.kernel.resiliency.spi.SPI;
028 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
029 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
030 import com.liferay.portal.kernel.util.StringPool;
031
032 import java.nio.ByteBuffer;
033
034 import java.rmi.RemoteException;
035
036 import java.util.List;
037 import java.util.Set;
038
039
042 public class IntrabandBridgeDestination extends DestinationWrapper {
043
044 public IntrabandBridgeDestination(Destination destination) {
045 super(destination);
046 }
047
048 @Override
049 public void send(Message message) {
050 message.setDestinationName(getName());
051
052 MessageRoutingBag messageRoutingBag =
053 (MessageRoutingBag)message.get(
054 MessageRoutingBag.MESSAGE_ROUTING_BAG);
055
056 if (messageRoutingBag == null) {
057 messageRoutingBag = new MessageRoutingBag(message, true);
058
059 message.put(
060 MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
061 }
062
063 sendMessageRoutingBag(messageRoutingBag);
064
065 try {
066 Message responseMessage = messageRoutingBag.getMessage();
067
068 responseMessage.copyTo(message);
069
070 messageRoutingBag.setMessage(message);
071 }
072 catch (ClassNotFoundException cnfe) {
073 throw new RuntimeException(cnfe);
074 }
075
076 Set<MessageListener> messageListeners =
077 destination.getMessageListeners();
078
079 for (MessageListener messageListener : messageListeners) {
080 try {
081 messageListener.receive(message);
082 }
083 catch (Exception e) {
084 throw new RuntimeException(e);
085 }
086 }
087 }
088
089 public void sendMessageRoutingBag(MessageRoutingBag messageRoutingBag) {
090 if (SPIUtil.isSPI()) {
091 SPI spi = SPIUtil.getSPI();
092
093 try {
094 String routingId = toRoutingId(spi);
095
096 messageRoutingBag.appendRoutingId(routingId);
097
098 if (!messageRoutingBag.isRoutingDowncast()) {
099 RegistrationReference registrationReference =
100 spi.getRegistrationReference();
101
102 sendMessageRoutingBag(
103 registrationReference, messageRoutingBag);
104 }
105 }
106 catch (Exception e) {
107 throw new RuntimeException(e);
108 }
109 }
110
111 List<SPI> spis = MPIHelperUtil.getSPIs();
112
113 if (spis.isEmpty() && !SPIUtil.isSPI()) {
114 MessageBusUtil.addDestination(destination);
115 }
116 else {
117 messageRoutingBag.setRoutingDowncast(true);
118
119 try {
120 for (SPI spi : spis) {
121 String routingId = toRoutingId(spi);
122
123 if (!messageRoutingBag.isVisited(routingId)) {
124 RegistrationReference registrationReference =
125 spi.getRegistrationReference();
126
127 sendMessageRoutingBag(
128 registrationReference, messageRoutingBag);
129 }
130 }
131 }
132 catch (Exception e) {
133 throw new RuntimeException(e);
134 }
135 }
136 }
137
138 protected void sendMessageRoutingBag(
139 RegistrationReference registrationReference,
140 MessageRoutingBag messageRoutingBag) {
141
142 try {
143 Intraband intraband = registrationReference.getIntraband();
144
145 Datagram datagram = intraband.sendSyncDatagram(
146 registrationReference,
147 Datagram.createRequestDatagram(
148 SystemDataType.MESSAGE.getValue(),
149 messageRoutingBag.toByteArray()));
150
151 ByteBuffer byteBuffer = datagram.getDataByteBuffer();
152
153 MessageRoutingBag receivedMessageRoutingBag =
154 MessageRoutingBag.fromByteArray(byteBuffer.array());
155
156 Message receivedMessage = receivedMessageRoutingBag.getMessage();
157
158 Message message = messageRoutingBag.getMessage();
159
160 receivedMessage.copyTo(message);
161
162 message.put(
163 MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
164 }
165 catch (Exception e) {
166 throw new RuntimeException(e);
167 }
168 }
169
170 protected String toRoutingId(SPI spi) throws RemoteException {
171 String spiProviderName = spi.getSPIProviderName();
172
173 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
174
175 String spiId = spiConfiguration.getSPIId();
176
177 return spiProviderName.concat(StringPool.POUND).concat(spiId);
178 }
179
180 }