| SynchronousMessageListener.java |
1 /**
2 * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3 *
4 *
5 *
6 *
7 * The contents of this file are subject to the terms of the Liferay Enterprise
8 * Subscription License ("License"). You may not use this file except in
9 * compliance with the License. You can obtain a copy of the License by
10 * contacting Liferay, Inc. See the License for the specific language governing
11 * permissions and limitations under the License, including but not limited to
12 * distribution rights of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 package com.liferay.portal.kernel.messaging.sender;
24
25 import com.liferay.portal.kernel.messaging.Message;
26 import com.liferay.portal.kernel.messaging.MessageBus;
27 import com.liferay.portal.kernel.messaging.MessageBusException;
28 import com.liferay.portal.kernel.messaging.MessageListener;
29
30 /**
31 * <a href="SynchronousMessageListener.java.html"><b><i>View Source</i></b></a>
32 *
33 * @author Michael C. Han
34 */
35 public class SynchronousMessageListener implements MessageListener {
36
37 public SynchronousMessageListener(
38 MessageBus messageBus, Message message, long timeout) {
39
40 _messageBus = messageBus;
41 _message = message;
42 _timeout = timeout;
43 _responseId = _message.getResponseId();
44 }
45
46 public Object getResults() {
47 return _results;
48 }
49
50 public void receive(Message message) {
51 if (!message.getResponseId().equals(_responseId)) {
52 return;
53 }
54
55 synchronized (this) {
56 _results = message.getPayload();
57
58 notify();
59 }
60 }
61
62 public Object send() throws MessageBusException {
63 String destinationName = _message.getDestinationName();
64 String responseDestinationName = _message.getResponseDestinationName();
65
66 _messageBus.registerMessageListener(responseDestinationName, this);
67
68 try {
69 synchronized (this) {
70 _messageBus.sendMessage(destinationName, _message);
71
72 wait(_timeout);
73
74 if (_results == null) {
75 throw new MessageBusException(
76 "No reply received for message: " + _message);
77 }
78 }
79
80 return _results;
81 }
82 catch (InterruptedException ie) {
83 throw new MessageBusException(
84 "Message sending interrupted for: " + _message, ie);
85 }
86 finally {
87 _messageBus.unregisterMessageListener(
88 responseDestinationName, this);
89 }
90 }
91
92 private MessageBus _messageBus;
93 private Message _message;
94 private long _timeout;
95 private String _responseId;
96 private Object _results;
97
98 }