001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.concurrent.atomic.AtomicMarkableReference;
018 import java.util.concurrent.atomic.AtomicReference;
019
020
023 public class BatchablePipe<K, V> {
024
025 public BatchablePipe() {
026 _headEntry = new Entry<K, V>(null);
027 _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028 }
029
030 public boolean put(IncreasableEntry<K, V> increasableEntry) {
031 Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032
033 while (true) {
034 if (_doIncrease(increasableEntry)) {
035 return false;
036 }
037
038 Entry<K, V> lastEntryLink = _lastEntryReference.get();
039 Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040
041 if (nextEntryLink == null) {
042 if (lastEntryLink._nextEntry.compareAndSet(
043 null, newEntry, false, false)) {
044
045 _lastEntryReference.set(newEntry);
046
047 return true;
048 }
049 }
050 else {
051 _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052 }
053 }
054 }
055
056 public IncreasableEntry<K, V> take() {
057 boolean[] marked = {false};
058
059 take:
060 while (true) {
061 Entry<K, V> predecessorEntry = _headEntry;
062 Entry<K, V> currentEntry =
063 predecessorEntry._nextEntry.getReference();
064
065 while (currentEntry != null) {
066 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
067 marked);
068
069 if (marked[0]) {
070 if (!predecessorEntry._nextEntry.compareAndSet(
071 currentEntry, successorEntry, false, false)) {
072
073 continue take;
074 }
075
076 currentEntry = predecessorEntry._nextEntry.getReference();
077
078 continue;
079 }
080
081 if (currentEntry._nextEntry.compareAndSet(
082 successorEntry, successorEntry, false, true)) {
083
084 return currentEntry._increasableEntry;
085 }
086
087 continue take;
088 }
089
090 return null;
091 }
092 }
093
094 private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
095 boolean[] marked = {false};
096
097 Retry:
098
099 while (true) {
100 Entry<K, V> predecessorEntry = _headEntry;
101 Entry<K, V> currentEntry =
102 predecessorEntry._nextEntry.getReference();
103
104 while (currentEntry != null) {
105 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
106 marked);
107
108 if (marked[0]) {
109 if (!predecessorEntry._nextEntry.compareAndSet(
110 currentEntry, successorEntry, false, false)) {
111
112 continue Retry;
113 }
114
115 currentEntry = predecessorEntry._nextEntry.getReference();
116
117 continue;
118 }
119
120 if (currentEntry._increasableEntry.getKey().equals(
121 increasableEntry.getKey())) {
122
123 return currentEntry._increasableEntry.increase(
124 increasableEntry.getValue());
125 }
126
127 predecessorEntry = currentEntry;
128 currentEntry = successorEntry;
129 }
130
131 _lastEntryReference.set(predecessorEntry);
132
133 return false;
134 }
135 }
136
137 private final Entry<K, V> _headEntry;
138 private final AtomicReference<Entry<K, V>> _lastEntryReference;
139
140 private static class Entry<K, V> {
141
142 private Entry(IncreasableEntry<K, V> increasableEntry) {
143 _increasableEntry = increasableEntry;
144 _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
145 }
146
147 private IncreasableEntry<K, V> _increasableEntry;
148 private AtomicMarkableReference<Entry<K, V>> _nextEntry;
149
150 }
151
152 }