001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Queue;
018 import java.util.concurrent.ConcurrentHashMap;
019 import java.util.concurrent.ConcurrentLinkedQueue;
020 import java.util.concurrent.ConcurrentMap;
021
022
025 public class BatchablePipe<K, V> {
026
027 public boolean put(IncreasableEntry<K, V> increasableEntry) {
028 K key = increasableEntry.getKey();
029
030 while (true) {
031 IncreasableEntryWrapper<K, V> previousIncreasableEntryWrapper =
032 concurrentMap.putIfAbsent(
033 key, new IncreasableEntryWrapper<K, V>(increasableEntry));
034
035 if (previousIncreasableEntryWrapper == null) {
036 queue.offer(increasableEntry);
037
038 return true;
039 }
040
041 IncreasableEntry<K, V> previousIncreasableEntry =
042 previousIncreasableEntryWrapper.increasableEntry;
043
044 IncreasableEntry<K, V> newIncreasableEntry =
045 increasableEntry.increase(previousIncreasableEntry.getValue());
046
047 if (concurrentMap.replace(
048 key, previousIncreasableEntryWrapper,
049 new IncreasableEntryWrapper<K, V>(newIncreasableEntry))) {
050
051 queue.offer(newIncreasableEntry);
052
053 return false;
054 }
055 }
056 }
057
058 public IncreasableEntry<K, V> take() {
059 while (true) {
060 IncreasableEntry<K, V> increasableEntry = queue.poll();
061
062 if (increasableEntry == null) {
063 return null;
064 }
065
066 if (concurrentMap.remove(
067 increasableEntry.getKey(),
068 new IncreasableEntryWrapper<K, V>(increasableEntry))) {
069
070 return increasableEntry;
071 }
072 }
073 }
074
075 protected final ConcurrentMap<K, IncreasableEntryWrapper<K, V>>
076 concurrentMap = new ConcurrentHashMap<>();
077 protected final Queue<IncreasableEntry<K, V>> queue =
078 new ConcurrentLinkedQueue<>();
079
080 protected static class IncreasableEntryWrapper<K, V> {
081
082 public IncreasableEntryWrapper(
083 IncreasableEntry<K, V> increasableEntry) {
084
085 this.increasableEntry = increasableEntry;
086 }
087
088 @Override
089 public boolean equals(Object obj) {
090 IncreasableEntryWrapper<K, V> increasableEntryWrapper =
091 (IncreasableEntryWrapper<K, V>)obj;
092
093 if (increasableEntry == increasableEntryWrapper.increasableEntry) {
094 return true;
095 }
096
097 return false;
098 }
099
100 @Override
101 public int hashCode() {
102 return increasableEntry.hashCode();
103 }
104
105 @Override
106 public String toString() {
107 return increasableEntry.toString();
108 }
109
110 protected final IncreasableEntry<K, V> increasableEntry;
111
112 }
113
114 }