Create concurrent cache with flexible eviction policies
This commit adds a concurrent cache with flexible eviction policies. In particular, this cache supports: 1. concurrency 2. weight-based evictions 3. time-based evictions 4. manual invalidation 5. removal notification 6. cache statistics Closes #13717
This commit is contained in:
parent
97db3d5ef6
commit
aa8bfeb88c
|
@ -0,0 +1,642 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.cache;
|
||||
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.ToLongBiFunction;
|
||||
|
||||
/**
|
||||
* A simple concurrent cache.
|
||||
*<p>
|
||||
* Cache is a simple concurrent cache that supports time-based and weight-based evictions, with notifications for all
|
||||
* evictions. The design goals for this cache were simplicity and read performance. This means that we are willing to
|
||||
* accept reduced write performance in exchange for easy-to-understand code. Cache statistics for hits, misses and
|
||||
* evictions are exposed.
|
||||
*<p>
|
||||
* The design of the cache is relatively simple. The cache is segmented into 256 segments which are backed by HashMaps.
|
||||
* The segments are protected by a re-entrant read/write lock. The read/write locks permit multiple concurrent readers
|
||||
* without contention, and the segments gives us write throughput without impacting readers (so readers are blocked only
|
||||
* if they are reading a segment that a writer is writing to).
|
||||
* <p>
|
||||
* The LRU functionality is backed by a single doubly-linked list chaining the entries in order of insertion. This
|
||||
* LRU list is protected by a lock that serializes all writes to it. There are opportunities for improvements
|
||||
* here if write throughput is a concern.
|
||||
* <ol>
|
||||
* <li>LRU list mutations could be inserted into a blocking queue that a single thread is reading from
|
||||
* and applying to the LRU list.</li>
|
||||
* <li>Promotions could be deferred for entries that were "recently" promoted.</li>
|
||||
* <li>Locks on the list could be taken per node being modified instead of globally.</li>
|
||||
* </ol>
|
||||
*
|
||||
* Evictions only occur after a mutation to the cache (meaning an entry promotion, a cache insertion, or a manual
|
||||
* invalidation) or an explicit call to {@link #refresh()}.
|
||||
*
|
||||
* @param <K> The type of the keys
|
||||
* @param <V> The type of the values
|
||||
*/
|
||||
public class Cache<K, V> {
|
||||
// positive if entries have an expiration
|
||||
private long expireAfter = -1;
|
||||
|
||||
// the number of entries in the cache
|
||||
private int count = 0;
|
||||
|
||||
// the weight of the entries in the cache
|
||||
private long weight = 0;
|
||||
|
||||
// the maximum weight that this cache supports
|
||||
private long maximumWeight = -1;
|
||||
|
||||
// the weigher of entries
|
||||
private ToLongBiFunction<K, V> weigher = (k, v) -> 1;
|
||||
|
||||
// the removal callback
|
||||
private RemovalListener<K, V> removalListener = notification -> {
|
||||
};
|
||||
|
||||
// use CacheBuilder to construct
|
||||
Cache() {
|
||||
}
|
||||
|
||||
void setExpireAfter(long expireAfter) {
|
||||
if (expireAfter <= 0) {
|
||||
throw new IllegalArgumentException("expireAfter <= 0");
|
||||
}
|
||||
this.expireAfter = expireAfter;
|
||||
}
|
||||
|
||||
void setMaximumWeight(long maximumWeight) {
|
||||
if (maximumWeight < 0) {
|
||||
throw new IllegalArgumentException("maximumWeight < 0");
|
||||
}
|
||||
this.maximumWeight = maximumWeight;
|
||||
}
|
||||
|
||||
void setWeigher(ToLongBiFunction<K, V> weigher) {
|
||||
Objects.requireNonNull(weigher);
|
||||
this.weigher = weigher;
|
||||
}
|
||||
|
||||
void setRemovalListener(RemovalListener<K, V> removalListener) {
|
||||
this.removalListener = removalListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* The relative time used to track time-based evictions.
|
||||
*
|
||||
* @return the current relative time
|
||||
*/
|
||||
protected long now() {
|
||||
// System.nanoTime takes non-negligible time, so we only use it if we need it
|
||||
return expireAfter == -1 ? 0 : System.nanoTime();
|
||||
}
|
||||
|
||||
// the state of an entry in the LRU list
|
||||
enum State {NEW, EXISTING, DELETED}
|
||||
|
||||
static class Entry<K, V> {
|
||||
final K key;
|
||||
final V value;
|
||||
long accessTime;
|
||||
Entry<K, V> before;
|
||||
Entry<K, V> after;
|
||||
State state = State.NEW;
|
||||
|
||||
public Entry(K key, V value, long accessTime) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.accessTime = accessTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
} else if (!(obj instanceof Entry)) {
|
||||
return false;
|
||||
} else {
|
||||
@SuppressWarnings("unchecked")
|
||||
Entry<K, V> e = (Entry<K, V>) obj;
|
||||
return Objects.equals(key, e.key);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A cache segment.
|
||||
*
|
||||
* A CacheSegment is backed by a HashMap and is protected by a read/write lock.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
private static class CacheSegment<K, V> {
|
||||
// read/write lock protecting mutations to the segment
|
||||
ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
Map<K, Entry<K, V>> map = new HashMap<>();
|
||||
SegmentStats segmentStats = new SegmentStats();
|
||||
|
||||
/**
|
||||
* get an entry from the segment
|
||||
*
|
||||
* @param key the key of the entry to get from the cache
|
||||
* @param now the access time of this entry
|
||||
* @return the entry if there was one, otherwise null
|
||||
*/
|
||||
Entry<K, V> get(K key, long now) {
|
||||
lock.readLock().lock();
|
||||
Entry<K, V> entry = map.get(key);
|
||||
lock.readLock().unlock();
|
||||
if (entry != null) {
|
||||
segmentStats.hit();
|
||||
entry.accessTime = now;
|
||||
} else {
|
||||
segmentStats.miss();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* put an entry into the segment
|
||||
*
|
||||
* @param key the key of the entry to add to the cache
|
||||
* @param value the value of the entry to add to the cache
|
||||
* @param now the access time of this entry
|
||||
* @param onlyIfAbsent whether or not an existing entry should be replaced
|
||||
* @return a tuple of the new entry and the existing entry, if there was one otherwise null
|
||||
*/
|
||||
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now, boolean onlyIfAbsent) {
|
||||
Entry<K, V> entry = new Entry<>(key, value, now);
|
||||
lock.writeLock().lock();
|
||||
Entry<K, V> existing = null;
|
||||
if (!onlyIfAbsent || (onlyIfAbsent && map.get(key) == null)) {
|
||||
existing = map.put(key, entry);
|
||||
}
|
||||
lock.writeLock().unlock();
|
||||
return Tuple.tuple(entry, existing);
|
||||
}
|
||||
|
||||
/**
|
||||
* remove an entry from the segment
|
||||
*
|
||||
* @param key the key of the entry to remove from the cache
|
||||
* @return the removed entry if there was one, otherwise null
|
||||
*/
|
||||
Entry<K, V> remove(K key) {
|
||||
lock.writeLock().lock();
|
||||
Entry<K, V> entry = map.remove(key);
|
||||
lock.writeLock().unlock();
|
||||
if (entry != null) {
|
||||
segmentStats.eviction();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
private static class SegmentStats {
|
||||
private final LongAdder hits = new LongAdder();
|
||||
private final LongAdder misses = new LongAdder();
|
||||
private final LongAdder evictions = new LongAdder();
|
||||
|
||||
void hit() {
|
||||
hits.increment();
|
||||
}
|
||||
|
||||
void miss() {
|
||||
misses.increment();
|
||||
}
|
||||
|
||||
void eviction() {
|
||||
evictions.increment();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private CacheSegment<K, V>[] segments = new CacheSegment[256];
|
||||
|
||||
{
|
||||
for (int i = 0; i < segments.length; i++) {
|
||||
segments[i] = new CacheSegment<>();
|
||||
}
|
||||
}
|
||||
|
||||
Entry<K, V> head;
|
||||
Entry<K, V> tail;
|
||||
|
||||
// lock protecting mutations to the LRU list
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key.
|
||||
*
|
||||
* @param key the key whose associated value is to be returned
|
||||
* @return the value to which the specified key is mapped, or null if this map contains no mapping for the key
|
||||
*/
|
||||
public V get(K key) {
|
||||
long now = now();
|
||||
CacheSegment<K, V> segment = getCacheSegment(key);
|
||||
Entry<K, V> entry = segment.get(key, now);
|
||||
if (entry == null || isExpired(entry, now)) {
|
||||
return null;
|
||||
} else {
|
||||
promote(entry, now);
|
||||
return entry.value;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the specified key is not already associated with a value (or is mapped to null), attempts to compute its
|
||||
* value using the given mapping function and enters it into this map unless null.
|
||||
*
|
||||
* @param key the key whose associated value is to be returned or computed for if non-existant
|
||||
* @param mappingFunction the function to compute a value given a key
|
||||
* @return the current (existing or computed) value associated with the specified key, or null if the computed
|
||||
* value is null
|
||||
*/
|
||||
public V computeIfAbsent(K key, Function<K, V> mappingFunction) {
|
||||
long now = now();
|
||||
V value = get(key);
|
||||
if (value == null) {
|
||||
value = mappingFunction.apply(key);
|
||||
if (value != null) {
|
||||
put(key, value, now, true);
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Associates the specified value with the specified key in this map. If the map previously contained a mapping for
|
||||
* the key, the old value is replaced.
|
||||
*
|
||||
* @param key key with which the specified value is to be associated
|
||||
* @param value value to be associated with the specified key
|
||||
*/
|
||||
public void put(K key, V value) {
|
||||
long now = now();
|
||||
put(key, value, now, false);
|
||||
}
|
||||
|
||||
private void put(K key, V value, long now, boolean onlyIfAbsent) {
|
||||
CacheSegment<K, V> segment = getCacheSegment(key);
|
||||
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now, onlyIfAbsent);
|
||||
lock.lock();
|
||||
boolean replaced = false;
|
||||
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
|
||||
if (unlink(tuple.v2())) {
|
||||
replaced = true;
|
||||
}
|
||||
}
|
||||
promote(tuple.v1(), now);
|
||||
lock.unlock();
|
||||
if (replaced) {
|
||||
removalListener.onRemoval(new RemovalNotification(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate the association for the specified key. A removal notification will be issued for invalidated
|
||||
* entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
|
||||
*
|
||||
* @param key the key whose mapping is to be invalidated from the cache
|
||||
*/
|
||||
public void invalidate(K key) {
|
||||
CacheSegment<K, V> segment = getCacheSegment(key);
|
||||
Entry<K, V> entry = segment.remove(key);
|
||||
if (entry != null) {
|
||||
lock.lock();
|
||||
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate all cache entries. A removal notification will be issued for invalidated entries with
|
||||
* {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
|
||||
*/
|
||||
public void invalidateAll() {
|
||||
Entry<K, V> h = head;
|
||||
Arrays.stream(segments).forEach(segment -> segment.lock.writeLock().lock());
|
||||
lock.lock();
|
||||
Arrays.stream(segments).forEach(segment -> segment.map = new HashMap<>());
|
||||
Entry<K, V> current = head;
|
||||
while (current != null) {
|
||||
current.state = State.DELETED;
|
||||
current = current.after;
|
||||
}
|
||||
head = tail = null;
|
||||
count = 0;
|
||||
weight = 0;
|
||||
lock.unlock();
|
||||
Arrays.stream(segments).forEach(segment -> segment.lock.writeLock().unlock());
|
||||
while (h != null) {
|
||||
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED));
|
||||
h = h.after;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force any outstanding size-based and time-based evictions to occur
|
||||
*/
|
||||
public void refresh() {
|
||||
long now = now();
|
||||
lock.lock();
|
||||
evict(now);
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of entries in the cache.
|
||||
*
|
||||
* @return the number of entries in the cache
|
||||
*/
|
||||
public int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* The weight of the entries in the cache.
|
||||
*
|
||||
* @return the weight of the entries in the cache
|
||||
*/
|
||||
public long weight() {
|
||||
return weight;
|
||||
}
|
||||
|
||||
/**
|
||||
* An LRU sequencing of the keys in the cache that supports removal.
|
||||
*
|
||||
* @return an LRU-ordered {@link Iterable} over the keys in the cache
|
||||
*/
|
||||
public Iterable<K> keys() {
|
||||
return () -> new Iterator<K>() {
|
||||
private CacheIterator iterator = new CacheIterator(head);
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public K next() {
|
||||
return iterator.next().key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
iterator.remove();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* An LRU sequencing of the values in the cache.
|
||||
*
|
||||
* @return an LRU-ordered {@link Iterable} over the values in the cache
|
||||
*/
|
||||
public Iterable<V> values() {
|
||||
return () -> new Iterator<V>() {
|
||||
private CacheIterator iterator = new CacheIterator(head);
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public V next() {
|
||||
return iterator.next().value;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private class CacheIterator implements Iterator<Entry<K, V>> {
|
||||
private Entry<K, V> current;
|
||||
private Entry<K, V> next;
|
||||
|
||||
CacheIterator(Entry<K, V> head) {
|
||||
current = null;
|
||||
next = head;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return next != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Entry<K, V> next() {
|
||||
current = next;
|
||||
next = next.after;
|
||||
return current;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
Entry<K, V> entry = current;
|
||||
if (entry != null) {
|
||||
CacheSegment<K, V> segment = getCacheSegment(entry.key);
|
||||
segment.remove(entry.key);
|
||||
lock.lock();
|
||||
current = null;
|
||||
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The cache statistics tracking hits, misses and evictions. These are taken on a best-effort basis meaning that
|
||||
* they could be out-of-date mid-flight.
|
||||
*
|
||||
* @return the current cache statistics
|
||||
*/
|
||||
public CacheStats stats() {
|
||||
int hits = 0;
|
||||
int misses = 0;
|
||||
int evictions = 0;
|
||||
for (int i = 0; i < segments.length; i++) {
|
||||
hits += segments[i].segmentStats.hits.intValue();
|
||||
misses += segments[i].segmentStats.misses.intValue();
|
||||
evictions += segments[i].segmentStats.evictions.intValue();
|
||||
}
|
||||
return new CacheStats(hits, misses, evictions);
|
||||
}
|
||||
|
||||
public static class CacheStats {
|
||||
private int hits;
|
||||
private int misses;
|
||||
private int evictions;
|
||||
|
||||
public CacheStats(int hits, int misses, int evictions) {
|
||||
this.hits = hits;
|
||||
this.misses = misses;
|
||||
this.evictions = evictions;
|
||||
}
|
||||
|
||||
public int getHits() {
|
||||
return hits;
|
||||
}
|
||||
|
||||
public int getMisses() {
|
||||
return misses;
|
||||
}
|
||||
|
||||
public int getEvictions() {
|
||||
return evictions;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean promote(Entry<K, V> entry, long now) {
|
||||
boolean promoted = true;
|
||||
lock.lock();
|
||||
switch (entry.state) {
|
||||
case DELETED:
|
||||
promoted = false;
|
||||
break;
|
||||
case EXISTING:
|
||||
relinkAtHead(entry);
|
||||
break;
|
||||
case NEW:
|
||||
linkAtHead(entry);
|
||||
break;
|
||||
}
|
||||
if (promoted) {
|
||||
evict(now);
|
||||
}
|
||||
lock.unlock();
|
||||
return promoted;
|
||||
}
|
||||
|
||||
private void evict(long now) {
|
||||
assert lock.isHeldByCurrentThread();
|
||||
|
||||
while (tail != null && shouldPrune(tail, now)) {
|
||||
CacheSegment<K, V> segment = getCacheSegment(tail.key);
|
||||
Entry<K, V> entry = tail;
|
||||
if (segment != null) {
|
||||
segment.remove(tail.key);
|
||||
}
|
||||
delete(entry, RemovalNotification.RemovalReason.EVICTED);
|
||||
}
|
||||
}
|
||||
|
||||
private void delete(Entry<K, V> entry, RemovalNotification.RemovalReason removalReason) {
|
||||
assert lock.isHeldByCurrentThread();
|
||||
|
||||
if (unlink(entry)) {
|
||||
removalListener.onRemoval(new RemovalNotification<>(entry.key, entry.value, removalReason));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldPrune(Entry<K, V> entry, long now) {
|
||||
return exceedsSize() || isExpired(entry, now);
|
||||
}
|
||||
|
||||
private boolean exceedsSize() {
|
||||
return maximumWeight != -1 && weight > maximumWeight;
|
||||
}
|
||||
|
||||
private boolean isExpired(Entry<K, V> entry, long now) {
|
||||
return expireAfter != -1 && now - entry.accessTime > expireAfter;
|
||||
}
|
||||
|
||||
private boolean unlink(Entry<K, V> entry) {
|
||||
assert lock.isHeldByCurrentThread();
|
||||
|
||||
if (entry.state == State.EXISTING) {
|
||||
final Entry<K, V> before = entry.before;
|
||||
final Entry<K, V> after = entry.after;
|
||||
|
||||
if (before == null) {
|
||||
// removing the head
|
||||
head = after;
|
||||
if (head != null) {
|
||||
head.before = null;
|
||||
}
|
||||
} else {
|
||||
// removing inner element
|
||||
before.after = after;
|
||||
entry.before = null;
|
||||
}
|
||||
|
||||
if (after == null) {
|
||||
// removing tail
|
||||
tail = before;
|
||||
if (tail != null) {
|
||||
tail.after = null;
|
||||
}
|
||||
} else {
|
||||
// removing inner element
|
||||
after.before = before;
|
||||
entry.after = null;
|
||||
}
|
||||
|
||||
count--;
|
||||
weight -= weigher.applyAsLong(entry.key, entry.value);
|
||||
entry.state = State.DELETED;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void linkAtHead(Entry<K, V> entry) {
|
||||
assert lock.isHeldByCurrentThread();
|
||||
|
||||
Entry<K, V> h = head;
|
||||
entry.before = null;
|
||||
entry.after = head;
|
||||
head = entry;
|
||||
if (h == null) {
|
||||
tail = entry;
|
||||
} else {
|
||||
h.before = entry;
|
||||
}
|
||||
|
||||
count++;
|
||||
weight += weigher.applyAsLong(entry.key, entry.value);
|
||||
entry.state = State.EXISTING;
|
||||
}
|
||||
|
||||
private void relinkAtHead(Entry<K, V> entry) {
|
||||
assert lock.isHeldByCurrentThread();
|
||||
|
||||
if (head != entry) {
|
||||
unlink(entry);
|
||||
linkAtHead(entry);
|
||||
}
|
||||
}
|
||||
|
||||
private CacheSegment<K, V> getCacheSegment(K key) {
|
||||
return segments[key.hashCode() & 0xff];
|
||||
}
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.cache;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.ToLongBiFunction;
|
||||
|
||||
public class CacheBuilder<K, V> {
|
||||
private long maximumWeight = -1;
|
||||
private long expireAfter = -1;
|
||||
private ToLongBiFunction<K, V> weigher;
|
||||
private RemovalListener<K, V> removalListener;
|
||||
|
||||
public static <K, V> CacheBuilder<K, V> builder() {
|
||||
return new CacheBuilder<>();
|
||||
}
|
||||
|
||||
private CacheBuilder() {
|
||||
}
|
||||
|
||||
public CacheBuilder<K, V> setMaximumWeight(long maximumWeight) {
|
||||
if (maximumWeight < 0) {
|
||||
throw new IllegalArgumentException("maximumWeight < 0");
|
||||
}
|
||||
this.maximumWeight = maximumWeight;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheBuilder<K, V> setExpireAfter(long expireAfter) {
|
||||
if (expireAfter <= 0) {
|
||||
throw new IllegalArgumentException("expireAfter <= 0");
|
||||
}
|
||||
this.expireAfter = expireAfter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheBuilder<K, V> weigher(ToLongBiFunction<K, V> weigher) {
|
||||
Objects.requireNonNull(weigher);
|
||||
this.weigher = weigher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CacheBuilder<K, V> removalListener(RemovalListener<K, V> removalListener) {
|
||||
Objects.requireNonNull(removalListener);
|
||||
this.removalListener = removalListener;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Cache<K, V> build() {
|
||||
Cache<K, V> cache = new Cache();
|
||||
if (maximumWeight != -1) {
|
||||
cache.setMaximumWeight(maximumWeight);
|
||||
}
|
||||
if (expireAfter != -1) {
|
||||
cache.setExpireAfter(expireAfter);
|
||||
}
|
||||
if (weigher != null) {
|
||||
cache.setWeigher(weigher);
|
||||
}
|
||||
if (removalListener != null) {
|
||||
cache.setRemovalListener(removalListener);
|
||||
}
|
||||
return cache;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.cache;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface RemovalListener<K, V> {
|
||||
void onRemoval(RemovalNotification<K, V> notification);
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.cache;
|
||||
|
||||
public class RemovalNotification<K, V> {
|
||||
public enum RemovalReason {REPLACED, INVALIDATED, EVICTED}
|
||||
|
||||
private final K key;
|
||||
private final V value;
|
||||
private final RemovalReason removalReason;
|
||||
|
||||
public RemovalNotification(K key, V value, RemovalReason removalReason) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.removalReason = removalReason;
|
||||
}
|
||||
|
||||
public K getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public V getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public RemovalReason getRemovalReason() {
|
||||
return removalReason;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,435 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.cache;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class CacheTests extends ESTestCase {
|
||||
private int numberOfEntries;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
numberOfEntries = randomIntBetween(1000, 10000);
|
||||
logger.debug("numberOfEntries: " + numberOfEntries);
|
||||
}
|
||||
|
||||
// cache some entries, then randomly lookup keys that do not exist, then check the stats
|
||||
public void testCacheStats() {
|
||||
AtomicInteger evictions = new AtomicInteger();
|
||||
Set<Integer> keys = new HashSet<>();
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.setMaximumWeight(numberOfEntries / 2)
|
||||
.removalListener(notification -> {
|
||||
keys.remove(notification.getKey());
|
||||
evictions.incrementAndGet();
|
||||
})
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
keys.add(i);
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
int hits = 0;
|
||||
int misses = 0;
|
||||
int missingKey = 0;
|
||||
for (Integer key : keys) {
|
||||
--missingKey;
|
||||
if (rarely()) {
|
||||
misses++;
|
||||
cache.get(missingKey);
|
||||
} else {
|
||||
hits++;
|
||||
cache.get(key);
|
||||
}
|
||||
}
|
||||
assertEquals(hits, cache.stats().getHits());
|
||||
assertEquals(misses, cache.stats().getMisses());
|
||||
assertEquals((int)Math.ceil(numberOfEntries / 2.0), evictions.get());
|
||||
assertEquals(evictions.get(), cache.stats().getEvictions());
|
||||
}
|
||||
|
||||
// cache some entries, numberOfEntries - maximumWeight evictions should occur, then check that the evicted
|
||||
// entries were evicted in LRU order
|
||||
public void testCacheEvictions() {
|
||||
int maximumWeight = randomIntBetween(1, numberOfEntries);
|
||||
AtomicInteger evictions = new AtomicInteger();
|
||||
List<Integer> evictedKeys = new ArrayList<>();
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.setMaximumWeight(maximumWeight)
|
||||
.removalListener(notification -> {
|
||||
evictions.incrementAndGet();
|
||||
evictedKeys.add(notification.getKey());
|
||||
})
|
||||
.build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
assertEquals(numberOfEntries - maximumWeight, evictions.get());
|
||||
assertEquals(evictions.get(), cache.stats().getEvictions());
|
||||
|
||||
// assert that the keys were evicted in LRU order
|
||||
Set<Integer> keys = new HashSet<>();
|
||||
List<Integer> remainingKeys = new ArrayList<>();
|
||||
for (Integer key : cache.keys()) {
|
||||
keys.add(key);
|
||||
remainingKeys.add(key);
|
||||
}
|
||||
for (int i = 0; i < numberOfEntries - maximumWeight; i++) {
|
||||
assertFalse(keys.contains(i));
|
||||
assertEquals(i, (int) evictedKeys.get(i));
|
||||
}
|
||||
for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) {
|
||||
assertTrue(keys.contains(i));
|
||||
assertEquals(
|
||||
numberOfEntries - i + (numberOfEntries - maximumWeight) - 1,
|
||||
(int) remainingKeys.get(i - (numberOfEntries - maximumWeight))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// cache some entries and exceed the maximum weight, then check that the cache has the expected weight and the
|
||||
// expected evictions occurred
|
||||
public void testWeigher() {
|
||||
int maximumWeight = 2 * numberOfEntries;
|
||||
int weight = randomIntBetween(2, 10);
|
||||
AtomicInteger evictions = new AtomicInteger();
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.setMaximumWeight(maximumWeight)
|
||||
.weigher((k, v) -> weight)
|
||||
.removalListener(notification -> evictions.incrementAndGet())
|
||||
.build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
// cache weight should be the largest multiple of weight less than maximumWeight
|
||||
assertEquals(weight * (maximumWeight / weight), cache.weight());
|
||||
|
||||
// the number of evicted entries should be the number of entries that fit in the excess weight
|
||||
assertEquals((int) Math.ceil((weight - 2) * numberOfEntries / (1.0 * weight)), evictions.get());
|
||||
|
||||
assertEquals(evictions.get(), cache.stats().getEvictions());
|
||||
}
|
||||
|
||||
// cache some entries, randomly invalidate some of them, then check that the weight of the cache is correct
|
||||
public void testWeight() {
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.weigher((k, v) -> k)
|
||||
.build();
|
||||
int weight = 0;
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
weight += i;
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (rarely()) {
|
||||
weight -= i;
|
||||
cache.invalidate(i);
|
||||
}
|
||||
}
|
||||
assertEquals(weight, cache.weight());
|
||||
}
|
||||
|
||||
// cache some entries, randomly invalidate some of them, then check that the number of cached entries is correct
|
||||
public void testCount() {
|
||||
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
|
||||
int count = 0;
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
count++;
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (rarely()) {
|
||||
count--;
|
||||
cache.invalidate(i);
|
||||
}
|
||||
}
|
||||
assertEquals(count, cache.count());
|
||||
}
|
||||
|
||||
// cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that
|
||||
// the first batch of cached entries expired and were removed
|
||||
public void testExpiration() {
|
||||
AtomicLong now = new AtomicLong();
|
||||
Cache<Integer, String> cache = new Cache<Integer, String>() {
|
||||
@Override
|
||||
protected long now() {
|
||||
return now.get();
|
||||
}
|
||||
};
|
||||
cache.setExpireAfter(1);
|
||||
List<Integer> evictedKeys = new ArrayList<>();
|
||||
cache.setRemovalListener(notification -> {
|
||||
assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason());
|
||||
evictedKeys.add(notification.getKey());
|
||||
});
|
||||
now.set(0);
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
now.set(1);
|
||||
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
now.set(2);
|
||||
cache.refresh();
|
||||
assertEquals(numberOfEntries, cache.count());
|
||||
for (int i = 0; i < evictedKeys.size(); i++) {
|
||||
assertEquals(i, (int) evictedKeys.get(i));
|
||||
}
|
||||
Set<Integer> remainingKeys = new HashSet<>();
|
||||
for (Integer key : cache.keys()) {
|
||||
remainingKeys.add(key);
|
||||
}
|
||||
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
|
||||
assertTrue(remainingKeys.contains(i));
|
||||
}
|
||||
}
|
||||
|
||||
// randomly promote some entries, step the clock forward, then check that the promoted entries remain and the
|
||||
// non-promoted entries were removed
|
||||
public void testPromotion() {
|
||||
AtomicLong now = new AtomicLong();
|
||||
Cache<Integer, String> cache = new Cache<Integer, String>() {
|
||||
@Override
|
||||
protected long now() {
|
||||
return now.get();
|
||||
}
|
||||
};
|
||||
cache.setExpireAfter(1);
|
||||
now.set(0);
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
now.set(1);
|
||||
Set<Integer> promotedKeys = new HashSet<>();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (rarely()) {
|
||||
cache.get(i);
|
||||
promotedKeys.add(i);
|
||||
}
|
||||
}
|
||||
now.set(2);
|
||||
cache.refresh();
|
||||
assertEquals(promotedKeys.size(), cache.count());
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (promotedKeys.contains(i)) {
|
||||
assertNotNull(cache.get(i));
|
||||
} else {
|
||||
assertNull(cache.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null
|
||||
public void testInvalidate() {
|
||||
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
Set<Integer> keys = new HashSet<>();
|
||||
for (Integer key : cache.keys()) {
|
||||
if (rarely()) {
|
||||
cache.invalidate(key);
|
||||
keys.add(key);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (keys.contains(i)) {
|
||||
assertNull(cache.get(i));
|
||||
} else {
|
||||
assertNotNull(cache.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only
|
||||
// those entries
|
||||
public void testNotificationOnInvalidate() {
|
||||
Set<Integer> notifications = new HashSet<>();
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.removalListener(notification -> {
|
||||
assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason());
|
||||
notifications.add(notification.getKey());
|
||||
})
|
||||
.build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
Set<Integer> invalidated = new HashSet<>();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (rarely()) {
|
||||
cache.invalidate(i);
|
||||
invalidated.add(i);
|
||||
}
|
||||
}
|
||||
assertEquals(notifications, invalidated);
|
||||
}
|
||||
|
||||
// invalidate all cached entries, then check that the cache is empty
|
||||
public void testInvalidateAll() {
|
||||
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
cache.invalidateAll();
|
||||
assertEquals(0, cache.count());
|
||||
assertEquals(0, cache.weight());
|
||||
}
|
||||
|
||||
// invalidate all cached entries, then check that we receive invalidate notifications for all entries
|
||||
public void testNotificationOnInvalidateAll() {
|
||||
Set<Integer> notifications = new HashSet<>();
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.removalListener(notification -> {
|
||||
assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason());
|
||||
notifications.add(notification.getKey());
|
||||
})
|
||||
.build();
|
||||
Set<Integer> invalidated = new HashSet<>();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
invalidated.add(i);
|
||||
}
|
||||
cache.invalidateAll();
|
||||
assertEquals(invalidated, notifications);
|
||||
}
|
||||
|
||||
// randomly replace some entries, increasing the weight by 1 for each replacement, then count that the cache size
|
||||
// is correct
|
||||
public void testReplaceRecomputesSize() {
|
||||
class Key {
|
||||
private int key;
|
||||
private long weight;
|
||||
|
||||
public Key(int key, long weight) {
|
||||
this.key = key;
|
||||
this.weight = weight;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Key key1 = (Key) o;
|
||||
|
||||
return key == key1.key;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
Cache<Key, String> cache = CacheBuilder.<Key, String>builder().weigher((k, s) -> k.weight).build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(new Key(i, 1), Integer.toString(i));
|
||||
}
|
||||
assertEquals(numberOfEntries, cache.count());
|
||||
assertEquals(numberOfEntries, cache.weight());
|
||||
int replaced = 0;
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (rarely()) {
|
||||
replaced++;
|
||||
cache.put(new Key(i, 2), Integer.toString(i));
|
||||
}
|
||||
}
|
||||
assertEquals(numberOfEntries, cache.count());
|
||||
assertEquals(numberOfEntries + replaced, cache.weight());
|
||||
}
|
||||
|
||||
// randomly replace some entries, then check that we received replacement notifications for those and only those
|
||||
// entries
|
||||
public void testNotificationOnReplace() {
|
||||
Set<Integer> notifications = new HashSet<>();
|
||||
Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.removalListener(notification -> {
|
||||
assertEquals(RemovalNotification.RemovalReason.REPLACED, notification.getRemovalReason());
|
||||
notifications.add(notification.getKey());
|
||||
})
|
||||
.build();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
cache.put(i, Integer.toString(i));
|
||||
}
|
||||
Set<Integer> replacements = new HashSet<>();
|
||||
for (int i = 0; i < numberOfEntries; i++) {
|
||||
if (rarely()) {
|
||||
cache.put(i, Integer.toString(i) + Integer.toString(i));
|
||||
replacements.add(i);
|
||||
}
|
||||
}
|
||||
assertEquals(replacements, notifications);
|
||||
}
|
||||
|
||||
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
|
||||
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
|
||||
public void testTorture() throws InterruptedException {
|
||||
int numberOfThreads = randomIntBetween(1, 200);
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
final Cache<Integer, String> cache =
|
||||
CacheBuilder.<Integer, String>builder()
|
||||
.setMaximumWeight(1000)
|
||||
.removalListener(notification -> count.decrementAndGet())
|
||||
.weigher((k, v) -> 2)
|
||||
.build();
|
||||
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
Thread thread = new Thread(() -> {
|
||||
for (int j = 0; j < numberOfEntries; j++) {
|
||||
Integer key = randomIntBetween(1, numberOfEntries);
|
||||
cache.put(key, randomAsciiOfLength(10));
|
||||
count.incrementAndGet();
|
||||
if (rarely()) {
|
||||
cache.invalidate(key);
|
||||
} else {
|
||||
cache.get(key);
|
||||
}
|
||||
}
|
||||
});
|
||||
threads.add(thread);
|
||||
thread.start();
|
||||
}
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
cache.refresh();
|
||||
assertEquals(count.get(), cache.count());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue