From aa8bfeb88c704206a87e35ed69b485dbdbfbce6a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 30 Sep 2015 21:43:00 +0200 Subject: [PATCH] Create concurrent cache with flexible eviction policies This commit adds a concurrent cache with flexible eviction policies. In particular, this cache supports: 1. concurrency 2. weight-based evictions 3. time-based evictions 4. manual invalidation 5. removal notification 6. cache statistics Closes #13717 --- .../org/elasticsearch/common/cache/Cache.java | 642 ++++++++++++++++++ .../common/cache/CacheBuilder.java | 82 +++ .../common/cache/RemovalListener.java | 25 + .../common/cache/RemovalNotification.java | 46 ++ .../common/cache/CacheTests.java | 435 ++++++++++++ 5 files changed, 1230 insertions(+) create mode 100644 core/src/main/java/org/elasticsearch/common/cache/Cache.java create mode 100644 core/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/common/cache/RemovalListener.java create mode 100644 core/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java create mode 100644 core/src/test/java/org/elasticsearch/common/cache/CacheTests.java diff --git a/core/src/main/java/org/elasticsearch/common/cache/Cache.java b/core/src/main/java/org/elasticsearch/common/cache/Cache.java new file mode 100644 index 00000000000..f0f72428989 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -0,0 +1,642 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.cache; + +import org.elasticsearch.common.collect.Tuple; + +import java.util.*; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.function.ToLongBiFunction; + +/** + * A simple concurrent cache. + *

+ * Cache is a simple concurrent cache that supports time-based and weight-based evictions, with notifications for all + * evictions. The design goals for this cache were simplicity and read performance. This means that we are willing to + * accept reduced write performance in exchange for easy-to-understand code. Cache statistics for hits, misses and + * evictions are exposed. + *

+ * The design of the cache is relatively simple. The cache is segmented into 256 segments which are backed by HashMaps. + * The segments are protected by a re-entrant read/write lock. The read/write locks permit multiple concurrent readers + * without contention, and the segments gives us write throughput without impacting readers (so readers are blocked only + * if they are reading a segment that a writer is writing to). + *

+ * The LRU functionality is backed by a single doubly-linked list chaining the entries in order of insertion. This + * LRU list is protected by a lock that serializes all writes to it. There are opportunities for improvements + * here if write throughput is a concern. + *

    + *
  1. LRU list mutations could be inserted into a blocking queue that a single thread is reading from + * and applying to the LRU list.
  2. + *
  3. Promotions could be deferred for entries that were "recently" promoted.
  4. + *
  5. Locks on the list could be taken per node being modified instead of globally.
  6. + *
+ * + * Evictions only occur after a mutation to the cache (meaning an entry promotion, a cache insertion, or a manual + * invalidation) or an explicit call to {@link #refresh()}. + * + * @param The type of the keys + * @param The type of the values + */ +public class Cache { + // positive if entries have an expiration + private long expireAfter = -1; + + // the number of entries in the cache + private int count = 0; + + // the weight of the entries in the cache + private long weight = 0; + + // the maximum weight that this cache supports + private long maximumWeight = -1; + + // the weigher of entries + private ToLongBiFunction weigher = (k, v) -> 1; + + // the removal callback + private RemovalListener removalListener = notification -> { + }; + + // use CacheBuilder to construct + Cache() { + } + + void setExpireAfter(long expireAfter) { + if (expireAfter <= 0) { + throw new IllegalArgumentException("expireAfter <= 0"); + } + this.expireAfter = expireAfter; + } + + void setMaximumWeight(long maximumWeight) { + if (maximumWeight < 0) { + throw new IllegalArgumentException("maximumWeight < 0"); + } + this.maximumWeight = maximumWeight; + } + + void setWeigher(ToLongBiFunction weigher) { + Objects.requireNonNull(weigher); + this.weigher = weigher; + } + + void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + /** + * The relative time used to track time-based evictions. + * + * @return the current relative time + */ + protected long now() { + // System.nanoTime takes non-negligible time, so we only use it if we need it + return expireAfter == -1 ? 0 : System.nanoTime(); + } + + // the state of an entry in the LRU list + enum State {NEW, EXISTING, DELETED} + + static class Entry { + final K key; + final V value; + long accessTime; + Entry before; + Entry after; + State state = State.NEW; + + public Entry(K key, V value, long accessTime) { + this.key = key; + this.value = value; + this.accessTime = accessTime; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } else if (!(obj instanceof Entry)) { + return false; + } else { + @SuppressWarnings("unchecked") + Entry e = (Entry) obj; + return Objects.equals(key, e.key); + } + } + + @Override + public int hashCode() { + return Objects.hashCode(key); + } + } + + /** + * A cache segment. + * + * A CacheSegment is backed by a HashMap and is protected by a read/write lock. + * + * @param the type of the keys + * @param the type of the values + */ + private static class CacheSegment { + // read/write lock protecting mutations to the segment + ReadWriteLock lock = new ReentrantReadWriteLock(); + Map> map = new HashMap<>(); + SegmentStats segmentStats = new SegmentStats(); + + /** + * get an entry from the segment + * + * @param key the key of the entry to get from the cache + * @param now the access time of this entry + * @return the entry if there was one, otherwise null + */ + Entry get(K key, long now) { + lock.readLock().lock(); + Entry entry = map.get(key); + lock.readLock().unlock(); + if (entry != null) { + segmentStats.hit(); + entry.accessTime = now; + } else { + segmentStats.miss(); + } + return entry; + } + + /** + * put an entry into the segment + * + * @param key the key of the entry to add to the cache + * @param value the value of the entry to add to the cache + * @param now the access time of this entry + * @param onlyIfAbsent whether or not an existing entry should be replaced + * @return a tuple of the new entry and the existing entry, if there was one otherwise null + */ + Tuple, Entry> put(K key, V value, long now, boolean onlyIfAbsent) { + Entry entry = new Entry<>(key, value, now); + lock.writeLock().lock(); + Entry existing = null; + if (!onlyIfAbsent || (onlyIfAbsent && map.get(key) == null)) { + existing = map.put(key, entry); + } + lock.writeLock().unlock(); + return Tuple.tuple(entry, existing); + } + + /** + * remove an entry from the segment + * + * @param key the key of the entry to remove from the cache + * @return the removed entry if there was one, otherwise null + */ + Entry remove(K key) { + lock.writeLock().lock(); + Entry entry = map.remove(key); + lock.writeLock().unlock(); + if (entry != null) { + segmentStats.eviction(); + } + return entry; + } + + private static class SegmentStats { + private final LongAdder hits = new LongAdder(); + private final LongAdder misses = new LongAdder(); + private final LongAdder evictions = new LongAdder(); + + void hit() { + hits.increment(); + } + + void miss() { + misses.increment(); + } + + void eviction() { + evictions.increment(); + } + } + } + + private CacheSegment[] segments = new CacheSegment[256]; + + { + for (int i = 0; i < segments.length; i++) { + segments[i] = new CacheSegment<>(); + } + } + + Entry head; + Entry tail; + + // lock protecting mutations to the LRU list + private ReentrantLock lock = new ReentrantLock(); + + /** + * Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key. + * + * @param key the key whose associated value is to be returned + * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key + */ + public V get(K key) { + long now = now(); + CacheSegment segment = getCacheSegment(key); + Entry entry = segment.get(key, now); + if (entry == null || isExpired(entry, now)) { + return null; + } else { + promote(entry, now); + return entry.value; + } + } + + /** + * If the specified key is not already associated with a value (or is mapped to null), attempts to compute its + * value using the given mapping function and enters it into this map unless null. + * + * @param key the key whose associated value is to be returned or computed for if non-existant + * @param mappingFunction the function to compute a value given a key + * @return the current (existing or computed) value associated with the specified key, or null if the computed + * value is null + */ + public V computeIfAbsent(K key, Function mappingFunction) { + long now = now(); + V value = get(key); + if (value == null) { + value = mappingFunction.apply(key); + if (value != null) { + put(key, value, now, true); + } + } + return value; + } + + /** + * Associates the specified value with the specified key in this map. If the map previously contained a mapping for + * the key, the old value is replaced. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + */ + public void put(K key, V value) { + long now = now(); + put(key, value, now, false); + } + + private void put(K key, V value, long now, boolean onlyIfAbsent) { + CacheSegment segment = getCacheSegment(key); + Tuple, Entry> tuple = segment.put(key, value, now, onlyIfAbsent); + lock.lock(); + boolean replaced = false; + if (tuple.v2() != null && tuple.v2().state == State.EXISTING) { + if (unlink(tuple.v2())) { + replaced = true; + } + } + promote(tuple.v1(), now); + lock.unlock(); + if (replaced) { + removalListener.onRemoval(new RemovalNotification(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED)); + } + } + + /** + * Invalidate the association for the specified key. A removal notification will be issued for invalidated + * entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + * + * @param key the key whose mapping is to be invalidated from the cache + */ + public void invalidate(K key) { + CacheSegment segment = getCacheSegment(key); + Entry entry = segment.remove(key); + if (entry != null) { + lock.lock(); + delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + lock.unlock(); + } + } + + /** + * Invalidate all cache entries. A removal notification will be issued for invalidated entries with + * {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED. + */ + public void invalidateAll() { + Entry h = head; + Arrays.stream(segments).forEach(segment -> segment.lock.writeLock().lock()); + lock.lock(); + Arrays.stream(segments).forEach(segment -> segment.map = new HashMap<>()); + Entry current = head; + while (current != null) { + current.state = State.DELETED; + current = current.after; + } + head = tail = null; + count = 0; + weight = 0; + lock.unlock(); + Arrays.stream(segments).forEach(segment -> segment.lock.writeLock().unlock()); + while (h != null) { + removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED)); + h = h.after; + } + } + + /** + * Force any outstanding size-based and time-based evictions to occur + */ + public void refresh() { + long now = now(); + lock.lock(); + evict(now); + lock.unlock(); + } + + /** + * The number of entries in the cache. + * + * @return the number of entries in the cache + */ + public int count() { + return count; + } + + /** + * The weight of the entries in the cache. + * + * @return the weight of the entries in the cache + */ + public long weight() { + return weight; + } + + /** + * An LRU sequencing of the keys in the cache that supports removal. + * + * @return an LRU-ordered {@link Iterable} over the keys in the cache + */ + public Iterable keys() { + return () -> new Iterator() { + private CacheIterator iterator = new CacheIterator(head); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public K next() { + return iterator.next().key; + } + + @Override + public void remove() { + iterator.remove(); + } + }; + } + + /** + * An LRU sequencing of the values in the cache. + * + * @return an LRU-ordered {@link Iterable} over the values in the cache + */ + public Iterable values() { + return () -> new Iterator() { + private CacheIterator iterator = new CacheIterator(head); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public V next() { + return iterator.next().value; + } + }; + } + + private class CacheIterator implements Iterator> { + private Entry current; + private Entry next; + + CacheIterator(Entry head) { + current = null; + next = head; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Entry next() { + current = next; + next = next.after; + return current; + } + + @Override + public void remove() { + Entry entry = current; + if (entry != null) { + CacheSegment segment = getCacheSegment(entry.key); + segment.remove(entry.key); + lock.lock(); + current = null; + delete(entry, RemovalNotification.RemovalReason.INVALIDATED); + lock.unlock(); + } + } + } + + /** + * The cache statistics tracking hits, misses and evictions. These are taken on a best-effort basis meaning that + * they could be out-of-date mid-flight. + * + * @return the current cache statistics + */ + public CacheStats stats() { + int hits = 0; + int misses = 0; + int evictions = 0; + for (int i = 0; i < segments.length; i++) { + hits += segments[i].segmentStats.hits.intValue(); + misses += segments[i].segmentStats.misses.intValue(); + evictions += segments[i].segmentStats.evictions.intValue(); + } + return new CacheStats(hits, misses, evictions); + } + + public static class CacheStats { + private int hits; + private int misses; + private int evictions; + + public CacheStats(int hits, int misses, int evictions) { + this.hits = hits; + this.misses = misses; + this.evictions = evictions; + } + + public int getHits() { + return hits; + } + + public int getMisses() { + return misses; + } + + public int getEvictions() { + return evictions; + } + } + + private boolean promote(Entry entry, long now) { + boolean promoted = true; + lock.lock(); + switch (entry.state) { + case DELETED: + promoted = false; + break; + case EXISTING: + relinkAtHead(entry); + break; + case NEW: + linkAtHead(entry); + break; + } + if (promoted) { + evict(now); + } + lock.unlock(); + return promoted; + } + + private void evict(long now) { + assert lock.isHeldByCurrentThread(); + + while (tail != null && shouldPrune(tail, now)) { + CacheSegment segment = getCacheSegment(tail.key); + Entry entry = tail; + if (segment != null) { + segment.remove(tail.key); + } + delete(entry, RemovalNotification.RemovalReason.EVICTED); + } + } + + private void delete(Entry entry, RemovalNotification.RemovalReason removalReason) { + assert lock.isHeldByCurrentThread(); + + if (unlink(entry)) { + removalListener.onRemoval(new RemovalNotification<>(entry.key, entry.value, removalReason)); + } + } + + private boolean shouldPrune(Entry entry, long now) { + return exceedsSize() || isExpired(entry, now); + } + + private boolean exceedsSize() { + return maximumWeight != -1 && weight > maximumWeight; + } + + private boolean isExpired(Entry entry, long now) { + return expireAfter != -1 && now - entry.accessTime > expireAfter; + } + + private boolean unlink(Entry entry) { + assert lock.isHeldByCurrentThread(); + + if (entry.state == State.EXISTING) { + final Entry before = entry.before; + final Entry after = entry.after; + + if (before == null) { + // removing the head + head = after; + if (head != null) { + head.before = null; + } + } else { + // removing inner element + before.after = after; + entry.before = null; + } + + if (after == null) { + // removing tail + tail = before; + if (tail != null) { + tail.after = null; + } + } else { + // removing inner element + after.before = before; + entry.after = null; + } + + count--; + weight -= weigher.applyAsLong(entry.key, entry.value); + entry.state = State.DELETED; + return true; + } else { + return false; + } + } + + private void linkAtHead(Entry entry) { + assert lock.isHeldByCurrentThread(); + + Entry h = head; + entry.before = null; + entry.after = head; + head = entry; + if (h == null) { + tail = entry; + } else { + h.before = entry; + } + + count++; + weight += weigher.applyAsLong(entry.key, entry.value); + entry.state = State.EXISTING; + } + + private void relinkAtHead(Entry entry) { + assert lock.isHeldByCurrentThread(); + + if (head != entry) { + unlink(entry); + linkAtHead(entry); + } + } + + private CacheSegment getCacheSegment(K key) { + return segments[key.hashCode() & 0xff]; + } +} diff --git a/core/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java b/core/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java new file mode 100644 index 00000000000..3803f264454 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/cache/CacheBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.cache; + +import java.util.Objects; +import java.util.function.ToLongBiFunction; + +public class CacheBuilder { + private long maximumWeight = -1; + private long expireAfter = -1; + private ToLongBiFunction weigher; + private RemovalListener removalListener; + + public static CacheBuilder builder() { + return new CacheBuilder<>(); + } + + private CacheBuilder() { + } + + public CacheBuilder setMaximumWeight(long maximumWeight) { + if (maximumWeight < 0) { + throw new IllegalArgumentException("maximumWeight < 0"); + } + this.maximumWeight = maximumWeight; + return this; + } + + public CacheBuilder setExpireAfter(long expireAfter) { + if (expireAfter <= 0) { + throw new IllegalArgumentException("expireAfter <= 0"); + } + this.expireAfter = expireAfter; + return this; + } + + public CacheBuilder weigher(ToLongBiFunction weigher) { + Objects.requireNonNull(weigher); + this.weigher = weigher; + return this; + } + + public CacheBuilder removalListener(RemovalListener removalListener) { + Objects.requireNonNull(removalListener); + this.removalListener = removalListener; + return this; + } + + public Cache build() { + Cache cache = new Cache(); + if (maximumWeight != -1) { + cache.setMaximumWeight(maximumWeight); + } + if (expireAfter != -1) { + cache.setExpireAfter(expireAfter); + } + if (weigher != null) { + cache.setWeigher(weigher); + } + if (removalListener != null) { + cache.setRemovalListener(removalListener); + } + return cache; + } +} diff --git a/core/src/main/java/org/elasticsearch/common/cache/RemovalListener.java b/core/src/main/java/org/elasticsearch/common/cache/RemovalListener.java new file mode 100644 index 00000000000..ae133000f76 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/cache/RemovalListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.cache; + +@FunctionalInterface +public interface RemovalListener { + void onRemoval(RemovalNotification notification); +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java b/core/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java new file mode 100644 index 00000000000..afea5a54480 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/cache/RemovalNotification.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.cache; + +public class RemovalNotification { + public enum RemovalReason {REPLACED, INVALIDATED, EVICTED} + + private final K key; + private final V value; + private final RemovalReason removalReason; + + public RemovalNotification(K key, V value, RemovalReason removalReason) { + this.key = key; + this.value = value; + this.removalReason = removalReason; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + public RemovalReason getRemovalReason() { + return removalReason; + } +} diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java new file mode 100644 index 00000000000..c0903110d4a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -0,0 +1,435 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.cache; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; + +public class CacheTests extends ESTestCase { + private int numberOfEntries; + + @Before + public void setUp() throws Exception { + super.setUp(); + numberOfEntries = randomIntBetween(1000, 10000); + logger.debug("numberOfEntries: " + numberOfEntries); + } + + // cache some entries, then randomly lookup keys that do not exist, then check the stats + public void testCacheStats() { + AtomicInteger evictions = new AtomicInteger(); + Set keys = new HashSet<>(); + Cache cache = + CacheBuilder.builder() + .setMaximumWeight(numberOfEntries / 2) + .removalListener(notification -> { + keys.remove(notification.getKey()); + evictions.incrementAndGet(); + }) + .build(); + + for (int i = 0; i < numberOfEntries; i++) { + keys.add(i); + cache.put(i, Integer.toString(i)); + } + int hits = 0; + int misses = 0; + int missingKey = 0; + for (Integer key : keys) { + --missingKey; + if (rarely()) { + misses++; + cache.get(missingKey); + } else { + hits++; + cache.get(key); + } + } + assertEquals(hits, cache.stats().getHits()); + assertEquals(misses, cache.stats().getMisses()); + assertEquals((int)Math.ceil(numberOfEntries / 2.0), evictions.get()); + assertEquals(evictions.get(), cache.stats().getEvictions()); + } + + // cache some entries, numberOfEntries - maximumWeight evictions should occur, then check that the evicted + // entries were evicted in LRU order + public void testCacheEvictions() { + int maximumWeight = randomIntBetween(1, numberOfEntries); + AtomicInteger evictions = new AtomicInteger(); + List evictedKeys = new ArrayList<>(); + Cache cache = + CacheBuilder.builder() + .setMaximumWeight(maximumWeight) + .removalListener(notification -> { + evictions.incrementAndGet(); + evictedKeys.add(notification.getKey()); + }) + .build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + assertEquals(numberOfEntries - maximumWeight, evictions.get()); + assertEquals(evictions.get(), cache.stats().getEvictions()); + + // assert that the keys were evicted in LRU order + Set keys = new HashSet<>(); + List remainingKeys = new ArrayList<>(); + for (Integer key : cache.keys()) { + keys.add(key); + remainingKeys.add(key); + } + for (int i = 0; i < numberOfEntries - maximumWeight; i++) { + assertFalse(keys.contains(i)); + assertEquals(i, (int) evictedKeys.get(i)); + } + for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) { + assertTrue(keys.contains(i)); + assertEquals( + numberOfEntries - i + (numberOfEntries - maximumWeight) - 1, + (int) remainingKeys.get(i - (numberOfEntries - maximumWeight)) + ); + } + } + + // cache some entries and exceed the maximum weight, then check that the cache has the expected weight and the + // expected evictions occurred + public void testWeigher() { + int maximumWeight = 2 * numberOfEntries; + int weight = randomIntBetween(2, 10); + AtomicInteger evictions = new AtomicInteger(); + Cache cache = + CacheBuilder.builder() + .setMaximumWeight(maximumWeight) + .weigher((k, v) -> weight) + .removalListener(notification -> evictions.incrementAndGet()) + .build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + // cache weight should be the largest multiple of weight less than maximumWeight + assertEquals(weight * (maximumWeight / weight), cache.weight()); + + // the number of evicted entries should be the number of entries that fit in the excess weight + assertEquals((int) Math.ceil((weight - 2) * numberOfEntries / (1.0 * weight)), evictions.get()); + + assertEquals(evictions.get(), cache.stats().getEvictions()); + } + + // cache some entries, randomly invalidate some of them, then check that the weight of the cache is correct + public void testWeight() { + Cache cache = + CacheBuilder.builder() + .weigher((k, v) -> k) + .build(); + int weight = 0; + for (int i = 0; i < numberOfEntries; i++) { + weight += i; + cache.put(i, Integer.toString(i)); + } + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + weight -= i; + cache.invalidate(i); + } + } + assertEquals(weight, cache.weight()); + } + + // cache some entries, randomly invalidate some of them, then check that the number of cached entries is correct + public void testCount() { + Cache cache = CacheBuilder.builder().build(); + int count = 0; + for (int i = 0; i < numberOfEntries; i++) { + count++; + cache.put(i, Integer.toString(i)); + } + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + count--; + cache.invalidate(i); + } + } + assertEquals(count, cache.count()); + } + + // cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that + // the first batch of cached entries expired and were removed + public void testExpiration() { + AtomicLong now = new AtomicLong(); + Cache cache = new Cache() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfter(1); + List evictedKeys = new ArrayList<>(); + cache.setRemovalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason()); + evictedKeys.add(notification.getKey()); + }); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(1); + for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(2); + cache.refresh(); + assertEquals(numberOfEntries, cache.count()); + for (int i = 0; i < evictedKeys.size(); i++) { + assertEquals(i, (int) evictedKeys.get(i)); + } + Set remainingKeys = new HashSet<>(); + for (Integer key : cache.keys()) { + remainingKeys.add(key); + } + for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) { + assertTrue(remainingKeys.contains(i)); + } + } + + // randomly promote some entries, step the clock forward, then check that the promoted entries remain and the + // non-promoted entries were removed + public void testPromotion() { + AtomicLong now = new AtomicLong(); + Cache cache = new Cache() { + @Override + protected long now() { + return now.get(); + } + }; + cache.setExpireAfter(1); + now.set(0); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + now.set(1); + Set promotedKeys = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + cache.get(i); + promotedKeys.add(i); + } + } + now.set(2); + cache.refresh(); + assertEquals(promotedKeys.size(), cache.count()); + for (int i = 0; i < numberOfEntries; i++) { + if (promotedKeys.contains(i)) { + assertNotNull(cache.get(i)); + } else { + assertNull(cache.get(i)); + } + } + } + + + // randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null + public void testInvalidate() { + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set keys = new HashSet<>(); + for (Integer key : cache.keys()) { + if (rarely()) { + cache.invalidate(key); + keys.add(key); + } + } + for (int i = 0; i < numberOfEntries; i++) { + if (keys.contains(i)) { + assertNull(cache.get(i)); + } else { + assertNotNull(cache.get(i)); + } + } + } + + // randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only + // those entries + public void testNotificationOnInvalidate() { + Set notifications = new HashSet<>(); + Cache cache = + CacheBuilder.builder() + .removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }) + .build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set invalidated = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + cache.invalidate(i); + invalidated.add(i); + } + } + assertEquals(notifications, invalidated); + } + + // invalidate all cached entries, then check that the cache is empty + public void testInvalidateAll() { + Cache cache = CacheBuilder.builder().build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + cache.invalidateAll(); + assertEquals(0, cache.count()); + assertEquals(0, cache.weight()); + } + + // invalidate all cached entries, then check that we receive invalidate notifications for all entries + public void testNotificationOnInvalidateAll() { + Set notifications = new HashSet<>(); + Cache cache = + CacheBuilder.builder() + .removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }) + .build(); + Set invalidated = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + invalidated.add(i); + } + cache.invalidateAll(); + assertEquals(invalidated, notifications); + } + + // randomly replace some entries, increasing the weight by 1 for each replacement, then count that the cache size + // is correct + public void testReplaceRecomputesSize() { + class Key { + private int key; + private long weight; + + public Key(int key, long weight) { + this.key = key; + this.weight = weight; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Key key1 = (Key) o; + + return key == key1.key; + + } + + @Override + public int hashCode() { + return key; + } + } + Cache cache = CacheBuilder.builder().weigher((k, s) -> k.weight).build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(new Key(i, 1), Integer.toString(i)); + } + assertEquals(numberOfEntries, cache.count()); + assertEquals(numberOfEntries, cache.weight()); + int replaced = 0; + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + replaced++; + cache.put(new Key(i, 2), Integer.toString(i)); + } + } + assertEquals(numberOfEntries, cache.count()); + assertEquals(numberOfEntries + replaced, cache.weight()); + } + + // randomly replace some entries, then check that we received replacement notifications for those and only those + // entries + public void testNotificationOnReplace() { + Set notifications = new HashSet<>(); + Cache cache = + CacheBuilder.builder() + .removalListener(notification -> { + assertEquals(RemovalNotification.RemovalReason.REPLACED, notification.getRemovalReason()); + notifications.add(notification.getKey()); + }) + .build(); + for (int i = 0; i < numberOfEntries; i++) { + cache.put(i, Integer.toString(i)); + } + Set replacements = new HashSet<>(); + for (int i = 0; i < numberOfEntries; i++) { + if (rarely()) { + cache.put(i, Integer.toString(i) + Integer.toString(i)); + replacements.add(i); + } + } + assertEquals(replacements, notifications); + } + + // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key + // here be dragons: this test did catch one subtle bug during development; do not remove lightly + public void testTorture() throws InterruptedException { + int numberOfThreads = randomIntBetween(1, 200); + AtomicInteger count = new AtomicInteger(); + final Cache cache = + CacheBuilder.builder() + .setMaximumWeight(1000) + .removalListener(notification -> count.decrementAndGet()) + .weigher((k, v) -> 2) + .build(); + + List threads = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + Thread thread = new Thread(() -> { + for (int j = 0; j < numberOfEntries; j++) { + Integer key = randomIntBetween(1, numberOfEntries); + cache.put(key, randomAsciiOfLength(10)); + count.incrementAndGet(); + if (rarely()) { + cache.invalidate(key); + } else { + cache.get(key); + } + } + }); + threads.add(thread); + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + cache.refresh(); + assertEquals(count.get(), cache.count()); + } +}