001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.increment;
016    
017    import com.liferay.portal.kernel.concurrent.BatchablePipe;
018    import com.liferay.portal.kernel.increment.Increment;
019    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
020    import com.liferay.portal.kernel.util.ClassLoaderUtil;
021    import com.liferay.portal.kernel.util.NamedThreadFactory;
022    import com.liferay.portal.kernel.util.StringBundler;
023    import com.liferay.portal.kernel.util.StringPool;
024    
025    import java.io.Serializable;
026    
027    import java.lang.reflect.Method;
028    
029    import java.util.concurrent.ExecutorService;
030    import java.util.concurrent.SynchronousQueue;
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicInteger;
035    
036    /**
037     * @author Shuyang Zhou
038     */
039    public class BufferedIncrementProcessor {
040    
041            public BufferedIncrementProcessor(
042                    BufferedIncrementConfiguration bufferedIncrementConfiguration,
043                    Method method) {
044    
045                    _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
046    
047                    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
048                            0, _bufferedIncrementConfiguration.getThreadpoolMaxSize(),
049                            _bufferedIncrementConfiguration.getThreadpoolKeepAliveTime(),
050                            TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
051    
052                    threadPoolExecutor.setRejectedExecutionHandler(new DiscardPolicy());
053    
054                    Class<?>[] parameterTypes = method.getParameterTypes();
055    
056                    StringBundler sb = new StringBundler(parameterTypes.length * 2 + 5);
057    
058                    sb.append("BufferedIncrement-");
059    
060                    Class<?> clazz = method.getDeclaringClass();
061    
062                    sb.append(clazz.getSimpleName());
063    
064                    sb.append(StringPool.PERIOD);
065                    sb.append(method.getName());
066                    sb.append(StringPool.OPEN_PARENTHESIS);
067    
068                    for (Class<?> parameterType : parameterTypes) {
069                            sb.append(parameterType.getSimpleName());
070                            sb.append(StringPool.COMMA);
071                    }
072    
073                    sb.setIndex(sb.index() - 1);
074                    sb.append(StringPool.CLOSE_PARENTHESIS);
075    
076                    threadPoolExecutor.setThreadFactory(
077                            new NamedThreadFactory(
078                                    sb.toString(), Thread.NORM_PRIORITY,
079                                    ClassLoaderUtil.getContextClassLoader()));
080    
081                    _executorService = threadPoolExecutor;
082            }
083    
084            public void destroy() {
085                    _executorService.shutdown();
086            }
087    
088            @SuppressWarnings("rawtypes")
089            public void process(BufferedIncreasableEntry bufferedIncreasableEntry) {
090                    if (_batchablePipe.put(bufferedIncreasableEntry)) {
091                            Runnable runnable = new BufferedIncrementRunnable(
092                                    _bufferedIncrementConfiguration, _batchablePipe,
093                                    _queueLengthTracker, Thread.currentThread());
094    
095                            if (ProxyModeThreadLocal.isForceSync()) {
096                                    runnable.run();
097                            }
098                            else {
099                                    _executorService.execute(runnable);
100                            }
101                    }
102            }
103    
104            private final BatchablePipe<Serializable, Increment<?>> _batchablePipe =
105                    new BatchablePipe<>();
106            private final BufferedIncrementConfiguration
107                    _bufferedIncrementConfiguration;
108            private final ExecutorService _executorService;
109            private final AtomicInteger _queueLengthTracker = new AtomicInteger();
110    
111    }