001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.concurrent.BatchablePipe;
018 import com.liferay.portal.kernel.increment.Increment;
019 import com.liferay.portal.kernel.util.NamedThreadFactory;
020 import com.liferay.portal.kernel.util.StringBundler;
021 import com.liferay.portal.kernel.util.StringPool;
022 import com.liferay.portal.util.ClassLoaderUtil;
023
024 import java.io.Serializable;
025
026 import java.lang.reflect.Method;
027
028 import java.util.concurrent.ExecutorService;
029 import java.util.concurrent.SynchronousQueue;
030 import java.util.concurrent.ThreadPoolExecutor;
031 import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicInteger;
034
035
038 public class BufferedIncrementProcessor {
039
040 public BufferedIncrementProcessor(
041 BufferedIncrementConfiguration bufferedIncrementConfiguration,
042 Method method) {
043
044 _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
045
046 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
047 0, _bufferedIncrementConfiguration.getThreadpoolMaxSize(),
048 _bufferedIncrementConfiguration.getThreadpoolKeepAliveTime(),
049 TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
050
051 threadPoolExecutor.setRejectedExecutionHandler(new DiscardPolicy());
052
053 Class<?>[] parameterTypes = method.getParameterTypes();
054
055 StringBundler sb = new StringBundler(parameterTypes.length * 2 + 5);
056
057 sb.append("BufferedIncreament-");
058
059 Class<?> clazz = method.getDeclaringClass();
060
061 sb.append(clazz.getSimpleName());
062
063 sb.append(StringPool.PERIOD);
064 sb.append(method.getName());
065 sb.append(StringPool.OPEN_PARENTHESIS);
066
067 for (Class<?> parameterType : parameterTypes) {
068 sb.append(parameterType.getSimpleName());
069 sb.append(StringPool.COMMA);
070 }
071
072 sb.setIndex(sb.index() - 1);
073 sb.append(StringPool.CLOSE_PARENTHESIS);
074
075 threadPoolExecutor.setThreadFactory(
076 new NamedThreadFactory(
077 sb.toString(), Thread.NORM_PRIORITY,
078 ClassLoaderUtil.getContextClassLoader()));
079
080 _executorService = threadPoolExecutor;
081 }
082
083 public void destroy() {
084 _executorService.shutdown();
085 }
086
087 @SuppressWarnings("rawtypes")
088 public void process(BufferedIncreasableEntry bufferedIncreasableEntry) {
089 if (_batchablePipe.put(bufferedIncreasableEntry)) {
090 _executorService.execute(
091 new BufferedIncrementRunnable(
092 _bufferedIncrementConfiguration, _batchablePipe,
093 _queueLengthTracker));
094 }
095 }
096
097 private final BatchablePipe<Serializable, Increment<?>> _batchablePipe =
098 new BatchablePipe<Serializable, Increment<?>>();
099 private final BufferedIncrementConfiguration
100 _bufferedIncrementConfiguration;
101 private final ExecutorService _executorService;
102 private final AtomicInteger _queueLengthTracker = new AtomicInteger();
103
104 }