001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.sender;
016    
017    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
018    import com.liferay.portal.kernel.dao.orm.EntityCacheUtil;
019    import com.liferay.portal.kernel.dao.orm.FinderCacheUtil;
020    import com.liferay.portal.kernel.messaging.Message;
021    import com.liferay.portal.kernel.messaging.MessageBus;
022    import com.liferay.portal.kernel.messaging.MessageBusException;
023    import com.liferay.portal.kernel.messaging.MessageListener;
024    
025    import java.util.concurrent.CountDownLatch;
026    import java.util.concurrent.TimeUnit;
027    
028    /**
029     * @author Michael C. Han
030     */
031    public class SynchronousMessageListener implements MessageListener {
032    
033            public SynchronousMessageListener(
034                    MessageBus messageBus, Message message, long timeout) {
035    
036                    _messageBus = messageBus;
037                    _message = message;
038                    _timeout = timeout;
039                    _responseId = _message.getResponseId();
040            }
041    
042            public Object getResults() {
043                    return _results;
044            }
045    
046            @Override
047            public void receive(Message message) {
048                    if (!message.getResponseId().equals(_responseId)) {
049                            return;
050                    }
051    
052                    _results = message.getPayload();
053    
054                    _countDownLatch.countDown();
055            }
056    
057            public Object send() throws MessageBusException {
058                    String destinationName = _message.getDestinationName();
059                    String responseDestinationName = _message.getResponseDestinationName();
060    
061                    _messageBus.registerMessageListener(responseDestinationName, this);
062    
063                    try {
064                            _messageBus.sendMessage(destinationName, _message);
065    
066                            _countDownLatch.await(_timeout, TimeUnit.MILLISECONDS);
067    
068                            if (_results == null) {
069                                    throw new MessageBusException(
070                                            "No reply received for message: " + _message);
071                            }
072    
073                            return _results;
074                    }
075                    catch (InterruptedException ie) {
076                            throw new MessageBusException(
077                                    "Message sending interrupted for: " + _message, ie);
078                    }
079                    finally {
080                            _messageBus.unregisterMessageListener(
081                                    responseDestinationName, this);
082    
083                            EntityCacheUtil.clearLocalCache();
084                            FinderCacheUtil.clearLocalCache();
085                            ThreadLocalCacheManager.destroy();
086                    }
087            }
088    
089            private final CountDownLatch _countDownLatch = new CountDownLatch(1);
090            private final Message _message;
091            private final MessageBus _messageBus;
092            private final String _responseId;
093            private Object _results;
094            private final long _timeout;
095    
096    }