001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Queue;
018 import java.util.concurrent.ConcurrentHashMap;
019 import java.util.concurrent.ConcurrentLinkedQueue;
020 import java.util.concurrent.ConcurrentMap;
021
022
025 public class BatchablePipe<K, V> {
026
027 public boolean put(IncreasableEntry<K, V> increasableEntry) {
028 K key = increasableEntry.getKey();
029
030 while (true) {
031 IncreasableEntryWrapper<K, V> previousIncreasableEntryWrapper =
032 concurrentMap.putIfAbsent(
033 key, new IncreasableEntryWrapper<K, V>(increasableEntry));
034
035 if (previousIncreasableEntryWrapper == null) {
036 queue.offer(increasableEntry);
037
038 return true;
039 }
040
041 IncreasableEntry<K, V> previousIncreasableEntry =
042 previousIncreasableEntryWrapper.increasableEntry;
043
044 IncreasableEntry<K, V> newIncreasableEntry =
045 increasableEntry.increase(previousIncreasableEntry.getValue());
046
047 if (concurrentMap.replace(
048 key, previousIncreasableEntryWrapper,
049 new IncreasableEntryWrapper<K, V>(newIncreasableEntry))) {
050
051 queue.offer(newIncreasableEntry);
052
053 return false;
054 }
055 }
056 }
057
058 public IncreasableEntry<K, V> take() {
059 while (true) {
060 IncreasableEntry<K, V> increasableEntry = queue.poll();
061
062 if (increasableEntry == null) {
063 return null;
064 }
065
066 if (concurrentMap.remove(
067 increasableEntry.getKey(),
068 new IncreasableEntryWrapper<K, V>(increasableEntry))) {
069
070 return increasableEntry;
071 }
072 }
073 }
074
075 protected final ConcurrentMap<K, IncreasableEntryWrapper<K, V>>
076 concurrentMap =
077 new ConcurrentHashMap<K, IncreasableEntryWrapper<K, V>>();
078 protected final Queue<IncreasableEntry<K, V>> queue =
079 new ConcurrentLinkedQueue<IncreasableEntry<K, V>>();
080
081 protected static class IncreasableEntryWrapper<K, V> {
082
083 public IncreasableEntryWrapper(
084 IncreasableEntry<K, V> increasableEntry) {
085
086 this.increasableEntry = increasableEntry;
087 }
088
089 @Override
090 public boolean equals(Object obj) {
091 IncreasableEntryWrapper<K, V> increasableEntryWrapper =
092 (IncreasableEntryWrapper<K, V>)obj;
093
094 if (increasableEntry == increasableEntryWrapper.increasableEntry) {
095 return true;
096 }
097
098 return false;
099 }
100
101 @Override
102 public int hashCode() {
103 return increasableEntry.hashCode();
104 }
105
106 @Override
107 public String toString() {
108 return increasableEntry.toString();
109 }
110
111 protected final IncreasableEntry<K, V> increasableEntry;
112
113 }
114
115 }