001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.memory.FinalizeAction;
020    import com.liferay.portal.kernel.memory.FinalizeManager;
021    import com.liferay.portal.kernel.util.ReflectionUtil;
022    
023    import java.lang.ref.Reference;
024    import java.lang.reflect.Field;
025    
026    import java.util.Collections;
027    import java.util.Map;
028    import java.util.concurrent.ConcurrentMap;
029    import java.util.concurrent.Future;
030    
031    /**
032     * @author Shuyang Zhou
033     */
034    public class AsyncBroker<K, V> {
035    
036            public Map<K, NoticeableFuture<V>> getOpenBids() {
037                    return Collections.<K, NoticeableFuture<V>>unmodifiableMap(
038                            _defaultNoticeableFutures);
039            }
040    
041            public NoticeableFuture<V> post(final K key) {
042                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
043                            new DefaultNoticeableFuture<V>();
044    
045                    DefaultNoticeableFuture<V> previousDefaultNoticeableFuture =
046                            _defaultNoticeableFutures.putIfAbsent(key, defaultNoticeableFuture);
047    
048                    if (previousDefaultNoticeableFuture != null) {
049                            return previousDefaultNoticeableFuture;
050                    }
051    
052                    defaultNoticeableFuture.addFutureListener(
053                            new FutureListener<V>() {
054    
055                                    @Override
056                                    public void complete(Future<V> future) {
057                                            _defaultNoticeableFutures.remove(key);
058                                    }
059    
060                            });
061    
062                    if (_REFERENT_FIELD != null) {
063                            FinalizeManager.register(
064                                    defaultNoticeableFuture, new CancellationFinalizeAction(key),
065                                    FinalizeManager.PHANTOM_REFERENCE_FACTORY);
066                    }
067    
068                    return defaultNoticeableFuture;
069            }
070    
071            public NoticeableFuture<V> take(K key) {
072                    return _defaultNoticeableFutures.remove(key);
073            }
074    
075            public boolean takeWithException(K key, Throwable throwable) {
076                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
077                            _defaultNoticeableFutures.remove(key);
078    
079                    if (defaultNoticeableFuture == null) {
080                            return false;
081                    }
082    
083                    defaultNoticeableFuture.setException(throwable);
084    
085                    return true;
086            }
087    
088            public boolean takeWithResult(K key, V result) {
089                    DefaultNoticeableFuture<V> defaultNoticeableFuture =
090                            _defaultNoticeableFutures.remove(key);
091    
092                    if (defaultNoticeableFuture == null) {
093                            return false;
094                    }
095    
096                    defaultNoticeableFuture.set(result);
097    
098                    return true;
099            }
100    
101            private static final Field _REFERENT_FIELD;
102    
103            private static final Log _log = LogFactoryUtil.getLog(AsyncBroker.class);
104    
105            static {
106                    Field referentField = null;
107    
108                    try {
109                            referentField = ReflectionUtil.getDeclaredField(
110                                    Reference.class, "referent");
111                    }
112                    catch (Throwable t) {
113                            if (_log.isWarnEnabled()) {
114                                    _log.warn(
115                                            "Cancellation of orphaned noticeable futures is disabled " +
116                                                    "because the JVM does not support phantom reference " +
117                                                            "resurrection",
118                                            t);
119                            }
120                    }
121    
122                    _REFERENT_FIELD = referentField;
123            }
124    
125            private final ConcurrentMap<K, DefaultNoticeableFuture<V>>
126                    _defaultNoticeableFutures =
127                            new ConcurrentReferenceValueHashMap<K, DefaultNoticeableFuture<V>>(
128                                    FinalizeManager.WEAK_REFERENCE_FACTORY);
129    
130            private static class CancellationFinalizeAction implements FinalizeAction {
131    
132                    public CancellationFinalizeAction(Object key) {
133                            _key = key;
134                    }
135    
136                    @Override
137                    public void doFinalize(final Reference<?> reference) {
138                            try {
139                                    NoticeableFuture<?> noticeableFuture =
140                                            (NoticeableFuture<?>)_REFERENT_FIELD.get(reference);
141    
142                                    if (noticeableFuture.cancel(true) && _log.isWarnEnabled()) {
143                                            _log.warn(
144                                                    "Cancelled orphan noticeable future " +
145                                                            noticeableFuture + " with key " + _key);
146                                    }
147                            }
148                            catch (Exception e) {
149                                    _log.error("Unable to access referent of " + reference, e);
150                            }
151                    }
152    
153                    private final Object _key;
154    
155            }
156    
157    }