Ensure that computeIfAbsent loader is invoked at-most once
This commit is contained in:
parent
eb2ea01106
commit
11d75226a9
|
@ -187,17 +187,14 @@ public class Cache<K, V> {
|
||||||
* @param key the key of the entry to add to the cache
|
* @param key the key of the entry to add to the cache
|
||||||
* @param value the value of the entry to add to the cache
|
* @param value the value of the entry to add to the cache
|
||||||
* @param now the access time of this entry
|
* @param now the access time of this entry
|
||||||
* @param onlyIfAbsent whether or not an existing entry should be replaced
|
|
||||||
* @return a tuple of the new entry and the existing entry, if there was one otherwise null
|
* @return a tuple of the new entry and the existing entry, if there was one otherwise null
|
||||||
*/
|
*/
|
||||||
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now, boolean onlyIfAbsent) {
|
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
|
||||||
Entry<K, V> entry = new Entry<>(key, value, now);
|
Entry<K, V> entry = new Entry<>(key, value, now);
|
||||||
Entry<K, V> existing = null;
|
Entry<K, V> existing;
|
||||||
try (ReleasableLock ignored = writeLock.acquire()) {
|
try (ReleasableLock ignored = writeLock.acquire()) {
|
||||||
if (!onlyIfAbsent || (onlyIfAbsent && map.get(key) == null)) {
|
|
||||||
existing = map.put(key, entry);
|
existing = map.put(key, entry);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return Tuple.tuple(entry, existing);
|
return Tuple.tuple(entry, existing);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,10 +278,16 @@ public class Cache<K, V> {
|
||||||
public V computeIfAbsent(K key, Function<K, V> mappingFunction) {
|
public V computeIfAbsent(K key, Function<K, V> mappingFunction) {
|
||||||
long now = now();
|
long now = now();
|
||||||
V value = get(key);
|
V value = get(key);
|
||||||
|
if (value == null) {
|
||||||
|
CacheSegment<K, V> segment = getCacheSegment(key);
|
||||||
|
try (ReleasableLock ignored = segment.writeLock.acquire()) {
|
||||||
|
value = get(key);
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
value = mappingFunction.apply(key);
|
value = mappingFunction.apply(key);
|
||||||
|
}
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
put(key, value, now, true);
|
put(key, value, now);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return value;
|
return value;
|
||||||
|
@ -299,12 +302,12 @@ public class Cache<K, V> {
|
||||||
*/
|
*/
|
||||||
public void put(K key, V value) {
|
public void put(K key, V value) {
|
||||||
long now = now();
|
long now = now();
|
||||||
put(key, value, now, false);
|
put(key, value, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void put(K key, V value, long now, boolean onlyIfAbsent) {
|
private void put(K key, V value, long now) {
|
||||||
CacheSegment<K, V> segment = getCacheSegment(key);
|
CacheSegment<K, V> segment = getCacheSegment(key);
|
||||||
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now, onlyIfAbsent);
|
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
|
||||||
boolean replaced = false;
|
boolean replaced = false;
|
||||||
try (ReleasableLock ignored = lock.acquire()) {
|
try (ReleasableLock ignored = lock.acquire()) {
|
||||||
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
|
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.junit.Before;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
|
|
||||||
|
@ -435,6 +436,31 @@ public class CacheTests extends ESTestCase {
|
||||||
assertEquals(replacements, notifications);
|
assertEquals(replacements, notifications);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testComputeIfAbsentCallsOnce() throws InterruptedException {
|
||||||
|
int numberOfThreads = randomIntBetween(2, 200);
|
||||||
|
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
|
||||||
|
List<Thread> threads = new ArrayList<>();
|
||||||
|
AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries);
|
||||||
|
for (int j = 0; j < numberOfEntries; j++) {
|
||||||
|
flags.set(j, false);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < numberOfThreads; i++) {
|
||||||
|
Thread thread = new Thread(() -> {
|
||||||
|
for (int j = 0; j < numberOfEntries; j++) {
|
||||||
|
cache.computeIfAbsent(j, key -> {
|
||||||
|
assertTrue(flags.compareAndSet(key, false, true));
|
||||||
|
return Integer.toString(key);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
threads.add(thread);
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
|
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
|
||||||
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
|
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
|
||||||
public void testTorture() throws InterruptedException {
|
public void testTorture() throws InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue