001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.cache.thread.local.Lifecycle;
018 import com.liferay.portal.kernel.cache.thread.local.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.concurrent.BatchablePipe;
020 import com.liferay.portal.kernel.increment.Increment;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
024 import com.liferay.portal.security.auth.CompanyThreadLocal;
025
026 import java.io.Serializable;
027
028 import java.util.concurrent.atomic.AtomicInteger;
029
030
033 public class BufferedIncrementRunnable implements Runnable {
034
035 public BufferedIncrementRunnable(
036 BufferedIncrementConfiguration bufferedIncrementConfiguration,
037 BatchablePipe<Serializable, Increment<?>> batchablePipe,
038 AtomicInteger queueLengthTracker, Thread dispatchThread) {
039
040 _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
041 _batchablePipe = batchablePipe;
042 _queueLengthTracker = queueLengthTracker;
043 _dispatchThread = dispatchThread;
044
045 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
046 _queueLengthTracker.incrementAndGet();
047 }
048
049 _companyId = CompanyThreadLocal.getCompanyId();
050 }
051
052 @Override
053 @SuppressWarnings("rawtypes")
054 public void run() {
055 CompanyThreadLocal.setCompanyId(_companyId);
056
057 while (true) {
058 BufferedIncreasableEntry bufferedIncreasableEntry =
059 (BufferedIncreasableEntry)_batchablePipe.take();
060
061 if (bufferedIncreasableEntry == null) {
062 break;
063 }
064
065 try {
066 bufferedIncreasableEntry.proceed();
067 }
068 catch (Throwable t) {
069 _log.error(
070 "Unable to write buffered increment value to the database",
071 t);
072 }
073
074 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
075 int queueLength = _queueLengthTracker.decrementAndGet();
076
077 long standbyTime =
078 _bufferedIncrementConfiguration.calculateStandbyTime(
079 queueLength);
080
081 try {
082 Thread.sleep(standbyTime);
083 }
084 catch (InterruptedException ie) {
085 break;
086 }
087 }
088 }
089
090 if (_dispatchThread != Thread.currentThread()) {
091 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
092
093 CentralizedThreadLocal.clearShortLivedThreadLocals();
094 }
095 }
096
097 private static final Log _log = LogFactoryUtil.getLog(
098 BufferedIncrementRunnable.class);
099
100 private final BatchablePipe<Serializable, Increment<?>> _batchablePipe;
101 private final BufferedIncrementConfiguration
102 _bufferedIncrementConfiguration;
103 private final long _companyId;
104 private final Thread _dispatchThread;
105 private final AtomicInteger _queueLengthTracker;
106
107 }