001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.concurrent.BatchablePipe;
020 import com.liferay.portal.kernel.increment.Increment;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
024 import com.liferay.portal.security.auth.CompanyThreadLocal;
025
026 import java.io.Serializable;
027
028 import java.util.concurrent.atomic.AtomicInteger;
029
030
033 public class BufferedIncrementRunnable implements Runnable {
034
035 public BufferedIncrementRunnable(
036 BufferedIncrementConfiguration bufferedIncrementConfiguration,
037 BatchablePipe<Serializable, Increment<?>> batchablePipe,
038 AtomicInteger queueLengthTracker) {
039
040 _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
041 _batchablePipe = batchablePipe;
042 _queueLengthTracker = queueLengthTracker;
043
044 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
045 _queueLengthTracker.incrementAndGet();
046 }
047
048 _companyId = CompanyThreadLocal.getCompanyId();
049 }
050
051 @Override
052 @SuppressWarnings("rawtypes")
053 public void run() {
054 CompanyThreadLocal.setCompanyId(_companyId);
055
056 while (true) {
057 BufferedIncreasableEntry bufferedIncreasableEntry =
058 (BufferedIncreasableEntry)_batchablePipe.take();
059
060 if (bufferedIncreasableEntry == null) {
061 break;
062 }
063
064 try {
065 bufferedIncreasableEntry.proceed();
066 }
067 catch (Throwable t) {
068 _log.error(
069 "Unable to persist buffered increment value: " +
070 bufferedIncreasableEntry,
071 t);
072 }
073
074 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
075 int queueLength = _queueLengthTracker.decrementAndGet();
076
077 long standbyTime =
078 _bufferedIncrementConfiguration.calculateStandbyTime(
079 queueLength);
080
081 try {
082 Thread.sleep(standbyTime);
083 }
084 catch (InterruptedException ie) {
085 break;
086 }
087 }
088 }
089
090 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
091
092 CentralizedThreadLocal.clearShortLivedThreadLocals();
093 }
094
095 private static Log _log = LogFactoryUtil.getLog(
096 BufferedIncrementRunnable.class);
097
098 private final BatchablePipe<Serializable, Increment<?>> _batchablePipe;
099 private final BufferedIncrementConfiguration
100 _bufferedIncrementConfiguration;
101 private final long _companyId;
102 private final AtomicInteger _queueLengthTracker;
103
104 }