001
014
015 package com.liferay.portal.kernel.messaging.sender;
016
017 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
018 import com.liferay.portal.kernel.dao.orm.EntityCacheUtil;
019 import com.liferay.portal.kernel.dao.orm.FinderCacheUtil;
020 import com.liferay.portal.kernel.messaging.Message;
021 import com.liferay.portal.kernel.messaging.MessageBus;
022 import com.liferay.portal.kernel.messaging.MessageBusException;
023 import com.liferay.portal.kernel.messaging.MessageListener;
024
025
028 public class SynchronousMessageListener implements MessageListener {
029
030 public SynchronousMessageListener(
031 MessageBus messageBus, Message message, long timeout) {
032
033 _messageBus = messageBus;
034 _message = message;
035 _timeout = timeout;
036 _responseId = _message.getResponseId();
037 }
038
039 public Object getResults() {
040 return _results;
041 }
042
043 public void receive(Message message) {
044 if (!message.getResponseId().equals(_responseId)) {
045 return;
046 }
047
048 synchronized (this) {
049 _results = message.getPayload();
050
051 notify();
052 }
053 }
054
055 public Object send() throws MessageBusException {
056 String destinationName = _message.getDestinationName();
057 String responseDestinationName = _message.getResponseDestinationName();
058
059 _messageBus.registerMessageListener(responseDestinationName, this);
060
061 try {
062 synchronized (this) {
063 _messageBus.sendMessage(destinationName, _message);
064
065 wait(_timeout);
066
067 if (_results == null) {
068 throw new MessageBusException(
069 "No reply received for message: " + _message);
070 }
071 }
072
073 return _results;
074 }
075 catch (InterruptedException ie) {
076 throw new MessageBusException(
077 "Message sending interrupted for: " + _message, ie);
078 }
079 finally {
080 _messageBus.unregisterMessageListener(
081 responseDestinationName, this);
082
083 EntityCacheUtil.clearLocalCache();
084 FinderCacheUtil.clearLocalCache();
085 ThreadLocalCacheManager.destroy();
086 }
087 }
088
089 private Message _message;
090 private MessageBus _messageBus;
091 private String _responseId;
092 private Object _results;
093 private long _timeout;
094
095 }