001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.memory.FinalizeAction;
020    import com.liferay.portal.kernel.memory.FinalizeManager;
021    import com.liferay.portal.kernel.util.ReflectionUtil;
022    
023    import java.lang.ref.Reference;
024    import java.lang.reflect.Field;
025    
026    import java.util.Collections;
027    import java.util.Map;
028    import java.util.concurrent.ConcurrentMap;
029    import java.util.concurrent.Future;
030    
031    /**
032     * @author Shuyang Zhou
033     */
034    public class AsyncBroker<K, V> {
035    
036            public Map<K, NoticeableFuture<V>> getOpenBids() {
037                    return Collections.<K, NoticeableFuture<V>>unmodifiableMap(
038                            _defaultNoticeableFutures);
039            }
040    
041            public NoticeableFuture<V> post(K key) {
042                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
043                            new DefaultNoticeableFuture<>();
044    
045                    NoticeableFuture<V> previousNoticeableFuture = post(
046                            key, defaultNoticeableFuture);
047    
048                    if (previousNoticeableFuture == null) {
049                            return defaultNoticeableFuture;
050                    }
051    
052                    return previousNoticeableFuture;
053            }
054    
055            public NoticeableFuture<V> post(
056                    final K key, final DefaultNoticeableFuture<V> defaultNoticeableFuture) {
057    
058                    DefaultNoticeableFuture<V> previousDefaultNoticeableFuture =
059                            _defaultNoticeableFutures.putIfAbsent(key, defaultNoticeableFuture);
060    
061                    if (previousDefaultNoticeableFuture != null) {
062                            return previousDefaultNoticeableFuture;
063                    }
064    
065                    defaultNoticeableFuture.addFutureListener(
066                            new FutureListener<V>() {
067    
068                                    @Override
069                                    public void complete(Future<V> future) {
070                                            _defaultNoticeableFutures.remove(
071                                                    key, defaultNoticeableFuture);
072                                    }
073    
074                            });
075    
076                    if (_REFERENT_FIELD != null) {
077                            FinalizeManager.register(
078                                    defaultNoticeableFuture, new CancellationFinalizeAction(key),
079                                    FinalizeManager.PHANTOM_REFERENCE_FACTORY);
080                    }
081    
082                    return null;
083            }
084    
085            public NoticeableFuture<V> take(K key) {
086                    return _defaultNoticeableFutures.remove(key);
087            }
088    
089            public boolean takeWithException(K key, Throwable throwable) {
090                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
091                            _defaultNoticeableFutures.remove(key);
092    
093                    if (defaultNoticeableFuture == null) {
094                            return false;
095                    }
096    
097                    defaultNoticeableFuture.setException(throwable);
098    
099                    return true;
100            }
101    
102            public boolean takeWithResult(K key, V result) {
103                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
104                            _defaultNoticeableFutures.remove(key);
105    
106                    if (defaultNoticeableFuture == null) {
107                            return false;
108                    }
109    
110                    defaultNoticeableFuture.set(result);
111    
112                    return true;
113            }
114    
115            private static final Field _REFERENT_FIELD;
116    
117            private static final Log _log = LogFactoryUtil.getLog(AsyncBroker.class);
118    
119            static {
120                    Field referentField = null;
121    
122                    try {
123                            referentField = ReflectionUtil.getDeclaredField(
124                                    Reference.class, "referent");
125                    }
126                    catch (Throwable t) {
127                            if (_log.isWarnEnabled()) {
128                                    _log.warn(
129                                            "Cancellation of orphaned noticeable futures is disabled " +
130                                                    "because the JVM does not support phantom reference " +
131                                                            "resurrection",
132                                            t);
133                            }
134                    }
135    
136                    _REFERENT_FIELD = referentField;
137            }
138    
139            private final ConcurrentMap<K, DefaultNoticeableFuture<V>>
140                    _defaultNoticeableFutures = new ConcurrentReferenceValueHashMap<>(
141                            FinalizeManager.WEAK_REFERENCE_FACTORY);
142    
143            private static class CancellationFinalizeAction implements FinalizeAction {
144    
145                    public CancellationFinalizeAction(Object key) {
146                            _key = key;
147                    }
148    
149                    @Override
150                    public void doFinalize(final Reference<?> reference) {
151                            try {
152                                    NoticeableFuture<?> noticeableFuture =
153                                            (NoticeableFuture<?>)_REFERENT_FIELD.get(reference);
154    
155                                    if (noticeableFuture.cancel(true) && _log.isWarnEnabled()) {
156                                            _log.warn(
157                                                    "Cancelled orphan noticeable future " +
158                                                            noticeableFuture + " with key " + _key);
159                                    }
160                            }
161                            catch (Exception e) {
162                                    _log.error("Unable to access referent of " + reference, e);
163                            }
164                    }
165    
166                    private final Object _key;
167    
168            }
169    
170    }