001
014
015 package com.liferay.portal.kernel.messaging.sender;
016
017 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
018 import com.liferay.portal.kernel.dao.orm.EntityCacheUtil;
019 import com.liferay.portal.kernel.dao.orm.FinderCacheUtil;
020 import com.liferay.portal.kernel.messaging.Message;
021 import com.liferay.portal.kernel.messaging.MessageBus;
022 import com.liferay.portal.kernel.messaging.MessageBusException;
023 import com.liferay.portal.kernel.messaging.MessageListener;
024
025 import java.util.concurrent.CountDownLatch;
026 import java.util.concurrent.TimeUnit;
027
028
031 public class SynchronousMessageListener implements MessageListener {
032
033 public SynchronousMessageListener(
034 MessageBus messageBus, Message message, long timeout) {
035
036 _messageBus = messageBus;
037 _message = message;
038 _timeout = timeout;
039 _responseId = _message.getResponseId();
040 }
041
042 public Object getResults() {
043 return _results;
044 }
045
046 @Override
047 public void receive(Message message) {
048 if (!message.getResponseId().equals(_responseId)) {
049 return;
050 }
051
052 _results = message.getPayload();
053
054 _countDownLatch.countDown();
055 }
056
057 public Object send() throws MessageBusException {
058 String destinationName = _message.getDestinationName();
059 String responseDestinationName = _message.getResponseDestinationName();
060
061 _messageBus.registerMessageListener(responseDestinationName, this);
062
063 try {
064 _messageBus.sendMessage(destinationName, _message);
065
066 _countDownLatch.await(_timeout, TimeUnit.MILLISECONDS);
067
068 if (_results == null) {
069 throw new MessageBusException(
070 "No reply received for message: " + _message);
071 }
072
073 return _results;
074 }
075 catch (InterruptedException ie) {
076 throw new MessageBusException(
077 "Message sending interrupted for: " + _message, ie);
078 }
079 finally {
080 _messageBus.unregisterMessageListener(
081 responseDestinationName, this);
082
083 EntityCacheUtil.clearLocalCache();
084 FinderCacheUtil.clearLocalCache();
085 ThreadLocalCacheManager.destroy();
086 }
087 }
088
089 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
090 private final Message _message;
091 private final MessageBus _messageBus;
092 private final String _responseId;
093 private Object _results;
094 private final long _timeout;
095
096 }