001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.concurrent.BatchablePipe;
020 import com.liferay.portal.kernel.increment.Increment;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
024 import com.liferay.portal.security.auth.CompanyThreadLocal;
025
026 import java.io.Serializable;
027
028 import java.util.concurrent.atomic.AtomicInteger;
029
030
033 public class BufferedIncrementRunnable implements Runnable {
034
035 public BufferedIncrementRunnable(
036 BufferedIncrementConfiguration bufferedIncrementConfiguration,
037 BatchablePipe<Serializable, Increment<?>> batchablePipe,
038 AtomicInteger queueLengthTracker) {
039
040 _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
041 _batchablePipe = batchablePipe;
042 _queueLengthTracker = queueLengthTracker;
043
044 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
045 _queueLengthTracker.incrementAndGet();
046 }
047
048 _companyId = CompanyThreadLocal.getCompanyId();
049 }
050
051 @SuppressWarnings("rawtypes")
052 public void run() {
053 CompanyThreadLocal.setCompanyId(_companyId);
054
055 while (true) {
056 BufferedIncreasableEntry bufferedIncreasableEntry =
057 (BufferedIncreasableEntry)_batchablePipe.take();
058
059 if (bufferedIncreasableEntry == null) {
060 break;
061 }
062
063 try {
064 bufferedIncreasableEntry.proceed();
065 }
066 catch (Throwable t) {
067 _log.error(
068 "Unable to write buffered increment value to the database",
069 t);
070 }
071
072 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
073 int queueLength = _queueLengthTracker.decrementAndGet();
074
075 long standbyTime =
076 _bufferedIncrementConfiguration.calculateStandbyTime(
077 queueLength);
078
079 try {
080 Thread.sleep(standbyTime);
081 }
082 catch (InterruptedException ie) {
083 break;
084 }
085 }
086 }
087
088 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
089
090 CentralizedThreadLocal.clearShortLivedThreadLocals();
091 }
092
093 private static Log _log = LogFactoryUtil.getLog(
094 BufferedIncrementRunnable.class);
095
096 private final BatchablePipe<Serializable, Increment<?>> _batchablePipe;
097 private final BufferedIncrementConfiguration
098 _bufferedIncrementConfiguration;
099 private final long _companyId;
100 private final AtomicInteger _queueLengthTracker;
101
102 }