001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.concurrent.BatchablePipe;
018 import com.liferay.portal.kernel.increment.Increment;
019 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
020 import com.liferay.portal.kernel.util.ClassLoaderUtil;
021 import com.liferay.portal.kernel.util.NamedThreadFactory;
022 import com.liferay.portal.kernel.util.StringBundler;
023 import com.liferay.portal.kernel.util.StringPool;
024
025 import java.io.Serializable;
026
027 import java.lang.reflect.Method;
028
029 import java.util.concurrent.ExecutorService;
030 import java.util.concurrent.SynchronousQueue;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicInteger;
035
036
039 public class BufferedIncrementProcessor {
040
041 public BufferedIncrementProcessor(
042 BufferedIncrementConfiguration bufferedIncrementConfiguration,
043 Method method) {
044
045 _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
046
047 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
048 0, _bufferedIncrementConfiguration.getThreadpoolMaxSize(),
049 _bufferedIncrementConfiguration.getThreadpoolKeepAliveTime(),
050 TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
051
052 threadPoolExecutor.setRejectedExecutionHandler(new DiscardPolicy());
053
054 Class<?>[] parameterTypes = method.getParameterTypes();
055
056 StringBundler sb = new StringBundler(parameterTypes.length * 2 + 5);
057
058 sb.append("BufferedIncrement-");
059
060 Class<?> clazz = method.getDeclaringClass();
061
062 sb.append(clazz.getSimpleName());
063
064 sb.append(StringPool.PERIOD);
065 sb.append(method.getName());
066 sb.append(StringPool.OPEN_PARENTHESIS);
067
068 for (Class<?> parameterType : parameterTypes) {
069 sb.append(parameterType.getSimpleName());
070 sb.append(StringPool.COMMA);
071 }
072
073 sb.setIndex(sb.index() - 1);
074 sb.append(StringPool.CLOSE_PARENTHESIS);
075
076 threadPoolExecutor.setThreadFactory(
077 new NamedThreadFactory(
078 sb.toString(), Thread.NORM_PRIORITY,
079 ClassLoaderUtil.getContextClassLoader()));
080
081 _executorService = threadPoolExecutor;
082 }
083
084 public void destroy() {
085 _executorService.shutdown();
086 }
087
088 @SuppressWarnings("rawtypes")
089 public void process(BufferedIncreasableEntry bufferedIncreasableEntry) {
090 if (_batchablePipe.put(bufferedIncreasableEntry)) {
091 Runnable runnable = new BufferedIncrementRunnable(
092 _bufferedIncrementConfiguration, _batchablePipe,
093 _queueLengthTracker, Thread.currentThread());
094
095 if (ProxyModeThreadLocal.isForceSync()) {
096 runnable.run();
097 }
098 else {
099 _executorService.execute(runnable);
100 }
101 }
102 }
103
104 private final BatchablePipe<Serializable, Increment<?>> _batchablePipe =
105 new BatchablePipe<>();
106 private final BufferedIncrementConfiguration
107 _bufferedIncrementConfiguration;
108 private final ExecutorService _executorService;
109 private final AtomicInteger _queueLengthTracker = new AtomicInteger();
110
111 }