001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.sender;
016    
017    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
018    import com.liferay.portal.kernel.dao.orm.EntityCacheUtil;
019    import com.liferay.portal.kernel.dao.orm.FinderCacheUtil;
020    import com.liferay.portal.kernel.messaging.Message;
021    import com.liferay.portal.kernel.messaging.MessageBus;
022    import com.liferay.portal.kernel.messaging.MessageBusException;
023    import com.liferay.portal.kernel.messaging.MessageListener;
024    
025    /**
026     * @author Michael C. Han
027     */
028    public class SynchronousMessageListener implements MessageListener {
029    
030            public SynchronousMessageListener(
031                    MessageBus messageBus, Message message, long timeout) {
032    
033                    _messageBus = messageBus;
034                    _message = message;
035                    _timeout = timeout;
036                    _responseId = _message.getResponseId();
037            }
038    
039            public Object getResults() {
040                    return _results;
041            }
042    
043            public void receive(Message message) {
044                    if (!message.getResponseId().equals(_responseId)) {
045                            return;
046                    }
047    
048                    synchronized (this) {
049                            _results = message.getPayload();
050    
051                            notify();
052                    }
053            }
054    
055            public Object send() throws MessageBusException {
056                    String destinationName = _message.getDestinationName();
057                    String responseDestinationName = _message.getResponseDestinationName();
058    
059                    _messageBus.registerMessageListener(responseDestinationName, this);
060    
061                    try {
062                            synchronized (this) {
063                                    _messageBus.sendMessage(destinationName, _message);
064    
065                                    wait(_timeout);
066    
067                                    if (_results == null) {
068                                            throw new MessageBusException(
069                                                    "No reply received for message: " + _message);
070                                    }
071                            }
072    
073                            return _results;
074                    }
075                    catch (InterruptedException ie) {
076                            throw new MessageBusException(
077                                    "Message sending interrupted for: " + _message, ie);
078                    }
079                    finally {
080                            _messageBus.unregisterMessageListener(
081                                    responseDestinationName, this);
082    
083                            EntityCacheUtil.clearLocalCache();
084                            FinderCacheUtil.clearLocalCache();
085                            ThreadLocalCacheManager.destroy();
086                    }
087            }
088    
089            private Message _message;
090            private MessageBus _messageBus;
091            private String _responseId;
092            private Object _results;
093            private long _timeout;
094    
095    }