Merge pull request #13879 from jasontedor/straight-cache-homey

Replace Guava cache with simple concurrent LRU cache
This commit is contained in:
Jason Tedor 2015-10-09 11:56:15 -04:00
commit 50368b3704
15 changed files with 1492 additions and 120 deletions

View File

@ -0,0 +1,690 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.cache;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongBiFunction;
/**
* A simple concurrent cache.
* <p>
* Cache is a simple concurrent cache that supports time-based and weight-based evictions, with notifications for all
* evictions. The design goals for this cache were simplicity and read performance. This means that we are willing to
* accept reduced write performance in exchange for easy-to-understand code. Cache statistics for hits, misses and
* evictions are exposed.
* <p>
* The design of the cache is relatively simple. The cache is segmented into 256 segments which are backed by HashMaps.
* Each segment is protected by a re-entrant read/write lock. The read/write locks permit multiple concurrent readers
* without contention, and the segments gives us write throughput without impacting readers (so readers are blocked only
* if they are reading a segment that a writer is writing to).
* <p>
* The LRU functionality is backed by a single doubly-linked list chaining the entries in order of insertion. This
* LRU list is protected by a lock that serializes all writes to it. There are opportunities for improvements
* here if write throughput is a concern.
* <ol>
* <li>LRU list mutations could be inserted into a blocking queue that a single thread is reading from
* and applying to the LRU list.</li>
* <li>Promotions could be deferred for entries that were "recently" promoted.</li>
* <li>Locks on the list could be taken per node being modified instead of globally.</li>
* </ol>
* <p>
* Evictions only occur after a mutation to the cache (meaning an entry promotion, a cache insertion, or a manual
* invalidation) or an explicit call to {@link #refresh()}.
*
* @param <K> The type of the keys
* @param <V> The type of the values
*/
public class Cache<K, V> {
// positive if entries have an expiration
private long expireAfterAccess = -1;
// true if entries can expire after access
private boolean entriesExpireAfterAccess;
// positive if entries have an expiration after write
private long expireAfterWrite = -1;
// true if entries can expire after initial insertion
private boolean entriesExpireAfterWrite;
// the number of entries in the cache
private int count = 0;
// the weight of the entries in the cache
private long weight = 0;
// the maximum weight that this cache supports
private long maximumWeight = -1;
// the weigher of entries
private ToLongBiFunction<K, V> weigher = (k, v) -> 1;
// the removal callback
private RemovalListener<K, V> removalListener = notification -> {
};
// use CacheBuilder to construct
Cache() {
}
void setExpireAfterAccess(long expireAfterAccess) {
if (expireAfterAccess <= 0) {
throw new IllegalArgumentException("expireAfterAccess <= 0");
}
this.expireAfterAccess = expireAfterAccess;
this.entriesExpireAfterAccess = true;
}
void setExpireAfterWrite(long expireAfterWrite) {
if (expireAfterWrite <= 0) {
throw new IllegalArgumentException("expireAfterWrite <= 0");
}
this.expireAfterWrite = expireAfterWrite;
this.entriesExpireAfterWrite = true;
}
void setMaximumWeight(long maximumWeight) {
if (maximumWeight < 0) {
throw new IllegalArgumentException("maximumWeight < 0");
}
this.maximumWeight = maximumWeight;
}
void setWeigher(ToLongBiFunction<K, V> weigher) {
Objects.requireNonNull(weigher);
this.weigher = weigher;
}
void setRemovalListener(RemovalListener<K, V> removalListener) {
Objects.requireNonNull(removalListener);
this.removalListener = removalListener;
}
/**
* The relative time used to track time-based evictions.
*
* @return the current relative time
*/
protected long now() {
// System.nanoTime takes non-negligible time, so we only use it if we need it
// use System.nanoTime because we want relative time, not absolute time
return entriesExpireAfterAccess || entriesExpireAfterWrite ? System.nanoTime() : 0;
}
// the state of an entry in the LRU list
enum State {
NEW, EXISTING, DELETED
}
static class Entry<K, V> {
final K key;
final V value;
long writeTime;
volatile long accessTime;
Entry<K, V> before;
Entry<K, V> after;
State state = State.NEW;
public Entry(K key, V value, long writeTime) {
this.key = key;
this.value = value;
this.writeTime = this.accessTime = writeTime;
}
}
/**
* A cache segment.
* <p>
* A CacheSegment is backed by a HashMap and is protected by a read/write lock.
*
* @param <K> the type of the keys
* @param <V> the type of the values
*/
private static class CacheSegment<K, V> {
// read/write lock protecting mutations to the segment
ReadWriteLock segmentLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());
Map<K, Entry<K, V>> map = new HashMap<>();
SegmentStats segmentStats = new SegmentStats();
/**
* get an entry from the segment
*
* @param key the key of the entry to get from the cache
* @param now the access time of this entry
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now) {
Entry<K, V> entry;
try (ReleasableLock ignored = readLock.acquire()) {
entry = map.get(key);
}
if (entry != null) {
segmentStats.hit();
entry.accessTime = now;
} else {
segmentStats.miss();
}
return entry;
}
/**
* put an entry into the segment
*
* @param key the key of the entry to add to the cache
* @param value the value of the entry to add to the cache
* @param now the access time of this entry
* @return a tuple of the new entry and the existing entry, if there was one otherwise null
*/
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
Entry<K, V> entry = new Entry<>(key, value, now);
Entry<K, V> existing;
try (ReleasableLock ignored = writeLock.acquire()) {
existing = map.put(key, entry);
}
return Tuple.tuple(entry, existing);
}
/**
* remove an entry from the segment
*
* @param key the key of the entry to remove from the cache
* @return the removed entry if there was one, otherwise null
*/
Entry<K, V> remove(K key) {
Entry<K, V> entry;
try (ReleasableLock ignored = writeLock.acquire()) {
entry = map.remove(key);
}
if (entry != null) {
segmentStats.eviction();
}
return entry;
}
private static class SegmentStats {
private final LongAdder hits = new LongAdder();
private final LongAdder misses = new LongAdder();
private final LongAdder evictions = new LongAdder();
void hit() {
hits.increment();
}
void miss() {
misses.increment();
}
void eviction() {
evictions.increment();
}
}
}
public static final int NUMBER_OF_SEGMENTS = 256;
private final CacheSegment<K, V>[] segments = new CacheSegment[NUMBER_OF_SEGMENTS];
{
for (int i = 0; i < segments.length; i++) {
segments[i] = new CacheSegment<>();
}
}
Entry<K, V> head;
Entry<K, V> tail;
// lock protecting mutations to the LRU list
private ReleasableLock lruLock = new ReleasableLock(new ReentrantLock());
/**
* Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key.
*
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or null if this map contains no mapping for the key
*/
public V get(K key) {
return get(key, now());
}
private V get(K key, long now) {
CacheSegment<K, V> segment = getCacheSegment(key);
Entry<K, V> entry = segment.get(key, now);
if (entry == null || isExpired(entry, now)) {
return null;
} else {
promote(entry, now);
return entry.value;
}
}
/**
* If the specified key is not already associated with a value (or is mapped to null), attempts to compute its
* value using the given mapping function and enters it into this map unless null.
*
* @param key the key whose associated value is to be returned or computed for if non-existant
* @param loader the function to compute a value given a key
* @return the current (existing or computed) value associated with the specified key, or null if the computed
* value is null
* @throws ExecutionException thrown if loader throws an exception
*/
public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
V value = get(key, now);
if (value == null) {
CacheSegment<K, V> segment = getCacheSegment(key);
// we synchronize against the segment lock; this is to avoid a scenario where another thread is inserting
// a value for the same key via put which would not be observed on this thread without a mechanism
// synchronizing the two threads; it is possible that the segment lock will be too expensive here (it blocks
// readers too!) so consider this as a possible place to optimize should contention be observed
try (ReleasableLock ignored = segment.writeLock.acquire()) {
value = get(key, now);
if (value == null) {
try {
value = loader.load(key);
} catch (Exception e) {
throw new ExecutionException(e);
}
if (value == null) {
throw new ExecutionException(new NullPointerException("loader returned a null value"));
}
put(key, value, now);
}
}
}
return value;
}
/**
* Associates the specified value with the specified key in this map. If the map previously contained a mapping for
* the key, the old value is replaced.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
*/
public void put(K key, V value) {
long now = now();
put(key, value, now);
}
private void put(K key, V value, long now) {
CacheSegment<K, V> segment = getCacheSegment(key);
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
boolean replaced = false;
try (ReleasableLock ignored = lruLock.acquire()) {
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
if (unlink(tuple.v2())) {
replaced = true;
}
}
promote(tuple.v1(), now);
}
if (replaced) {
removalListener.onRemoval(new RemovalNotification(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED));
}
}
/**
* Invalidate the association for the specified key. A removal notification will be issued for invalidated
* entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
*
* @param key the key whose mapping is to be invalidated from the cache
*/
public void invalidate(K key) {
CacheSegment<K, V> segment = getCacheSegment(key);
Entry<K, V> entry = segment.remove(key);
if (entry != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
}
}
}
/**
* Invalidate all cache entries. A removal notification will be issued for invalidated entries with
* {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
*/
public void invalidateAll() {
Entry<K, V> h;
boolean[] haveSegmentLock = new boolean[NUMBER_OF_SEGMENTS];
try {
for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) {
segments[i].segmentLock.writeLock().lock();
haveSegmentLock[i] = true;
}
try (ReleasableLock ignored = lruLock.acquire()) {
h = head;
Arrays.stream(segments).forEach(segment -> segment.map = new HashMap<>());
Entry<K, V> current = head;
while (current != null) {
current.state = State.DELETED;
current = current.after;
}
head = tail = null;
count = 0;
weight = 0;
}
} finally {
for (int i = NUMBER_OF_SEGMENTS - 1; i >= 0; i--) {
if (haveSegmentLock[i]) {
segments[i].segmentLock.writeLock().unlock();
}
}
}
while (h != null) {
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED));
h = h.after;
}
}
/**
* Force any outstanding size-based and time-based evictions to occur
*/
public void refresh() {
long now = now();
try (ReleasableLock ignored = lruLock.acquire()) {
evict(now);
}
}
/**
* The number of entries in the cache.
*
* @return the number of entries in the cache
*/
public int count() {
return count;
}
/**
* The weight of the entries in the cache.
*
* @return the weight of the entries in the cache
*/
public long weight() {
return weight;
}
/**
* An LRU sequencing of the keys in the cache that supports removal. This sequence is not protected from mutations
* to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is
* undefined.
*
* @return an LRU-ordered {@link Iterable} over the keys in the cache
*/
public Iterable<K> keys() {
return () -> new Iterator<K>() {
private CacheIterator iterator = new CacheIterator(head);
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public K next() {
return iterator.next().key;
}
@Override
public void remove() {
iterator.remove();
}
};
}
/**
* An LRU sequencing of the values in the cache. This sequence is not protected from mutations
* to the cache. The result of iteration under mutation is undefined.
*
* @return an LRU-ordered {@link Iterable} over the values in the cache
*/
public Iterable<V> values() {
return () -> new Iterator<V>() {
private CacheIterator iterator = new CacheIterator(head);
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public V next() {
return iterator.next().value;
}
};
}
private class CacheIterator implements Iterator<Entry<K, V>> {
private Entry<K, V> current;
private Entry<K, V> next;
CacheIterator(Entry<K, V> head) {
current = null;
next = head;
}
@Override
public boolean hasNext() {
return next != null;
}
@Override
public Entry<K, V> next() {
current = next;
next = next.after;
return current;
}
@Override
public void remove() {
Entry<K, V> entry = current;
if (entry != null) {
CacheSegment<K, V> segment = getCacheSegment(entry.key);
segment.remove(entry.key);
try (ReleasableLock ignored = lruLock.acquire()) {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
}
}
}
}
/**
* The cache statistics tracking hits, misses and evictions. These are taken on a best-effort basis meaning that
* they could be out-of-date mid-flight.
*
* @return the current cache statistics
*/
public CacheStats stats() {
long hits = 0;
long misses = 0;
long evictions = 0;
for (int i = 0; i < segments.length; i++) {
hits += segments[i].segmentStats.hits.longValue();
misses += segments[i].segmentStats.misses.longValue();
evictions += segments[i].segmentStats.evictions.longValue();
}
return new CacheStats(hits, misses, evictions);
}
public static class CacheStats {
private long hits;
private long misses;
private long evictions;
public CacheStats(long hits, long misses, long evictions) {
this.hits = hits;
this.misses = misses;
this.evictions = evictions;
}
public long getHits() {
return hits;
}
public long getMisses() {
return misses;
}
public long getEvictions() {
return evictions;
}
}
private boolean promote(Entry<K, V> entry, long now) {
boolean promoted = true;
try (ReleasableLock ignored = lruLock.acquire()) {
switch (entry.state) {
case DELETED:
promoted = false;
break;
case EXISTING:
relinkAtHead(entry);
break;
case NEW:
linkAtHead(entry);
break;
}
if (promoted) {
evict(now);
}
}
return promoted;
}
private void evict(long now) {
assert lruLock.isHeldByCurrentThread();
while (tail != null && shouldPrune(tail, now)) {
CacheSegment<K, V> segment = getCacheSegment(tail.key);
Entry<K, V> entry = tail;
if (segment != null) {
segment.remove(tail.key);
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
}
}
private void delete(Entry<K, V> entry, RemovalNotification.RemovalReason removalReason) {
assert lruLock.isHeldByCurrentThread();
if (unlink(entry)) {
removalListener.onRemoval(new RemovalNotification<>(entry.key, entry.value, removalReason));
}
}
private boolean shouldPrune(Entry<K, V> entry, long now) {
return exceedsWeight() || isExpired(entry, now);
}
private boolean exceedsWeight() {
return maximumWeight != -1 && weight > maximumWeight;
}
private boolean isExpired(Entry<K, V> entry, long now) {
return (entriesExpireAfterAccess && now - entry.accessTime > expireAfterAccess) ||
(entriesExpireAfterWrite && now - entry.writeTime > expireAfterWrite);
}
private boolean unlink(Entry<K, V> entry) {
assert lruLock.isHeldByCurrentThread();
if (entry.state == State.EXISTING) {
final Entry<K, V> before = entry.before;
final Entry<K, V> after = entry.after;
if (before == null) {
// removing the head
assert head == entry;
head = after;
if (head != null) {
head.before = null;
}
} else {
// removing inner element
before.after = after;
entry.before = null;
}
if (after == null) {
// removing tail
assert tail == entry;
tail = before;
if (tail != null) {
tail.after = null;
}
} else {
// removing inner element
after.before = before;
entry.after = null;
}
count--;
weight -= weigher.applyAsLong(entry.key, entry.value);
entry.state = State.DELETED;
return true;
} else {
return false;
}
}
private void linkAtHead(Entry<K, V> entry) {
assert lruLock.isHeldByCurrentThread();
Entry<K, V> h = head;
entry.before = null;
entry.after = head;
head = entry;
if (h == null) {
tail = entry;
} else {
h.before = entry;
}
count++;
weight += weigher.applyAsLong(entry.key, entry.value);
entry.state = State.EXISTING;
}
private void relinkAtHead(Entry<K, V> entry) {
assert lruLock.isHeldByCurrentThread();
if (head != entry) {
unlink(entry);
linkAtHead(entry);
}
}
private CacheSegment<K, V> getCacheSegment(K key) {
return segments[key.hashCode() & 0xff];
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.cache;
import java.util.Objects;
import java.util.function.ToLongBiFunction;
public class CacheBuilder<K, V> {
private long maximumWeight = -1;
private long expireAfterAccess = -1;
private long expireAfterWrite = -1;
private ToLongBiFunction<K, V> weigher;
private RemovalListener<K, V> removalListener;
public static <K, V> CacheBuilder<K, V> builder() {
return new CacheBuilder<>();
}
private CacheBuilder() {
}
public CacheBuilder<K, V> setMaximumWeight(long maximumWeight) {
if (maximumWeight < 0) {
throw new IllegalArgumentException("maximumWeight < 0");
}
this.maximumWeight = maximumWeight;
return this;
}
public CacheBuilder<K, V> setExpireAfterAccess(long expireAfterAccess) {
if (expireAfterAccess <= 0) {
throw new IllegalArgumentException("expireAfterAccess <= 0");
}
this.expireAfterAccess = expireAfterAccess;
return this;
}
public CacheBuilder<K, V> setExpireAfterWrite(long expireAfterWrite) {
if (expireAfterWrite <= 0) {
throw new IllegalArgumentException("expireAfterWrite <= 0");
}
this.expireAfterWrite = expireAfterWrite;
return this;
}
public CacheBuilder<K, V> weigher(ToLongBiFunction<K, V> weigher) {
Objects.requireNonNull(weigher);
this.weigher = weigher;
return this;
}
public CacheBuilder<K, V> removalListener(RemovalListener<K, V> removalListener) {
Objects.requireNonNull(removalListener);
this.removalListener = removalListener;
return this;
}
public Cache<K, V> build() {
Cache<K, V> cache = new Cache();
if (maximumWeight != -1) {
cache.setMaximumWeight(maximumWeight);
}
if (expireAfterAccess != -1) {
cache.setExpireAfterAccess(expireAfterAccess);
}
if (expireAfterWrite != -1) {
cache.setExpireAfterWrite(expireAfterWrite);
}
if (weigher != null) {
cache.setWeigher(weigher);
}
if (removalListener != null) {
cache.setRemovalListener(removalListener);
}
return cache;
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.cache;
@FunctionalInterface
public interface CacheLoader<K, V> {
V load(K key) throws Exception;
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.cache;
@FunctionalInterface
public interface RemovalListener<K, V> {
void onRemoval(RemovalNotification<K, V> notification);
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.cache;
public class RemovalNotification<K, V> {
public enum RemovalReason {REPLACED, INVALIDATED, EVICTED}
private final K key;
private final V value;
private final RemovalReason removalReason;
public RemovalNotification(K key, V value, RemovalReason removalReason) {
this.key = key;
this.value = value;
this.removalReason = removalReason;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
public RemovalReason getRemovalReason() {
return removalReason;
}
}

View File

@ -19,11 +19,6 @@
package org.elasticsearch.index.cache.bitset;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
@ -38,6 +33,10 @@ import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.settings.Settings;
@ -58,10 +57,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
@ -94,10 +94,11 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
public BitsetFilterCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
this.loadRandomAccessFiltersEagerly = indexSettings.getAsBoolean(LOAD_RANDOM_ACCESS_FILTERS_EAGERLY, true);
this.loadedFilters = CacheBuilder.newBuilder().removalListener(this).build();
this.loadedFilters = CacheBuilder.<Object, Cache<Query, Value>>builder().removalListener(this).build();
this.warmer = new BitSetProducerWarmer();
}
@Inject(optional = true)
public void setIndicesWarmer(IndicesWarmer indicesWarmer) {
this.indicesWarmer = indicesWarmer;
@ -144,14 +145,12 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext context) throws IOException, ExecutionException {
final Object coreCacheReader = context.reader().getCoreCacheKey();
final ShardId shardId = ShardUtils.extractShardId(context.reader());
Cache<Query, Value> filterToFbs = loadedFilters.get(coreCacheReader, new Callable<Cache<Query, Value>>() {
@Override
public Cache<Query, Value> call() throws Exception {
Cache<Query, Value> filterToFbs = loadedFilters.computeIfAbsent(coreCacheReader, key -> {
context.reader().addCoreClosedListener(BitsetFilterCache.this);
return CacheBuilder.newBuilder().build();
}
return CacheBuilder.<Query, Value>builder().build();
});
return filterToFbs.get(query, () -> {
return filterToFbs.computeIfAbsent(query, key -> {
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
@ -172,8 +171,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
@Override
public void onRemoval(RemovalNotification<Object, Cache<Query, Value>> notification) {
Object key = notification.getKey();
if (key == null) {
if (notification.getKey() == null) {
return;
}
@ -182,7 +180,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
return;
}
for (Value value : valueCache.asMap().values()) {
for (Value value : valueCache.values()) {
listener.onRemoval(value.shardId, value.bitset);
// if null then this means the shard has already been removed and the stats are 0 anyway for the shard this key belongs to
}

View File

@ -19,10 +19,8 @@
package org.elasticsearch.index.cache.request;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
@ -61,7 +59,7 @@ public class ShardRequestCache extends AbstractIndexShardComponent implements Re
@Override
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> removalNotification) {
if (removalNotification.wasEvicted()) {
if (removalNotification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED) {
evictionsMetric.inc();
}
long dec = 0;

View File

@ -21,12 +21,6 @@ package org.elasticsearch.indices.cache.request;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
@ -35,6 +29,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -51,14 +46,11 @@ import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.elasticsearch.common.Strings.hasLength;
@ -162,25 +154,17 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
private void buildCache() {
long sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size, INDICES_CACHE_QUERY_SIZE).bytes();
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.newBuilder()
.maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this);
cacheBuilder.concurrencyLevel(concurrencyLevel);
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.<Key, Value>builder()
.setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this);
// cacheBuilder.concurrencyLevel(concurrencyLevel);
if (expire != null) {
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
cacheBuilder.setExpireAfterAccess(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
}
cache = cacheBuilder.build();
}
private static class QueryCacheWeigher implements Weigher<Key, Value> {
@Override
public int weigh(Key key, Value value) {
return (int) (key.ramBytesUsed() + value.ramBytesUsed());
}
}
public void close() {
reaper.close();
cache.invalidateAll();
@ -197,9 +181,6 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
if (notification.getKey() == null) {
return;
}
notification.getKey().shard.requestCache().onRemoval(notification);
}
@ -258,8 +239,8 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
Key key = buildKey(request, context);
Loader loader = new Loader(queryPhase, context, key);
Value value = cache.get(key, loader);
Loader loader = new Loader(queryPhase, context);
Value value = cache.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
key.shard.requestCache().onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
@ -279,17 +260,15 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
}
}
private static class Loader implements Callable<Value> {
private static class Loader implements CacheLoader<Key, Value> {
private final QueryPhase queryPhase;
private final SearchContext context;
private final IndicesRequestCache.Key key;
private boolean loaded;
Loader(QueryPhase queryPhase, SearchContext context, IndicesRequestCache.Key key) {
Loader(QueryPhase queryPhase, SearchContext context) {
this.queryPhase = queryPhase;
this.context = context;
this.key = key;
}
public boolean isLoaded() {
@ -297,7 +276,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
}
@Override
public Value call() throws Exception {
public Value load(Key key) throws Exception {
queryPhase.execute(context);
/* BytesStreamOutput allows to pass the expected size but by default uses
@ -473,7 +452,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
CleanupKey lookupKey = new CleanupKey(null, -1);
for (Iterator<Key> iterator = cache.asMap().keySet().iterator(); iterator.hasNext(); ) {
for (Iterator<Key> iterator = cache.keys().iterator(); iterator.hasNext(); ) {
Key key = iterator.next();
if (currentFullClean.contains(key.shard)) {
iterator.remove();
@ -487,7 +466,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
}
}
cache.cleanUp();
cache.refresh();
currentKeysToClean.clear();
currentFullClean.clear();
}

View File

@ -19,12 +19,15 @@
package org.elasticsearch.indices.fielddata.cache;
import com.google.common.cache.*;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -43,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.function.ToLongBiFunction;
/**
*/
@ -66,17 +70,11 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
final String size = settings.get(INDICES_FIELDDATA_CACHE_SIZE_KEY, "-1");
final long sizeInBytes = settings.getAsMemory(INDICES_FIELDDATA_CACHE_SIZE_KEY, "-1").bytes();
CacheBuilder<Key, Accountable> cacheBuilder = CacheBuilder.newBuilder()
CacheBuilder<Key, Accountable> cacheBuilder = CacheBuilder.<Key, Accountable>builder()
.removalListener(this);
if (sizeInBytes > 0) {
cacheBuilder.maximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
cacheBuilder.setMaximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
}
// defaults to 4, but this is a busy map for all indices, increase it a bit by default
final int concurrencyLevel = settings.getAsInt(FIELDDATA_CACHE_CONCURRENCY_LEVEL, 16);
if (concurrencyLevel <= 0) {
throw new IllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
}
cacheBuilder.concurrencyLevel(concurrencyLevel);
logger.debug("using size [{}] [{}]", size, new ByteSizeValue(sizeInBytes));
cache = cacheBuilder.build();
@ -108,7 +106,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
final Accountable value = notification.getValue();
for (IndexFieldDataCache.Listener listener : key.listeners) {
try {
listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed());
listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED, value.ramBytesUsed());
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on field data cache unloading", e);
@ -116,10 +114,9 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
}
}
public static class FieldDataWeigher implements Weigher<Key, Accountable> {
public static class FieldDataWeigher implements ToLongBiFunction<Key, Accountable> {
@Override
public int weigh(Key key, Accountable ramUsage) {
public long applyAsLong(Key key, Accountable ramUsage) {
int weight = (int) Math.min(ramUsage.ramBytesUsed(), Integer.MAX_VALUE);
return weight == 0 ? 1 : weight;
}
@ -150,13 +147,13 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
final ShardId shardId = ShardUtils.extractShardId(context.reader());
final Key key = new Key(this, context.reader().getCoreCacheKey(), shardId);
//noinspection unchecked
final Accountable accountable = cache.get(key, () -> {
final Accountable accountable = cache.computeIfAbsent(key, k -> {
context.reader().addCoreClosedListener(IndexFieldCache.this);
for (Listener listener : this.listeners) {
key.listeners.add(listener);
k.listeners.add(listener);
}
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
for (Listener listener : key.listeners) {
for (Listener listener : k.listeners) {
try {
listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
} catch (Throwable e) {
@ -174,13 +171,13 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
final ShardId shardId = ShardUtils.extractShardId(indexReader);
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
//noinspection unchecked
final Accountable accountable = cache.get(key, () -> {
final Accountable accountable = cache.computeIfAbsent(key, k -> {
indexReader.addReaderClosedListener(IndexFieldCache.this);
for (Listener listener : this.listeners) {
key.listeners.add(listener);
k.listeners.add(listener);
}
final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
for (Listener listener : key.listeners) {
for (Listener listener : k.listeners) {
try {
listener.onCache(shardId, fieldNames, fieldDataType, ifd);
} catch (Throwable e) {
@ -207,38 +204,28 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
@Override
public void clear() {
for (Key key : cache.asMap().keySet()) {
for (Key key : cache.keys()) {
if (key.indexCache.index.equals(index)) {
cache.invalidate(key);
}
}
// Note that cache invalidation in Guava does not immediately remove
// values from the cache. In the case of a cache with a rare write or
// read rate, it's possible for values to persist longer than desired.
//
// Note this is intended by the Guava developers, see:
// https://code.google.com/p/guava-libraries/wiki/CachesExplained#Eviction
// (the "When Does Cleanup Happen" section)
// We call it explicitly here since it should be a "rare" operation, and
// if a user runs it he probably wants to see memory returned as soon as
// possible
cache.cleanUp();
// force eviction
cache.refresh();
}
@Override
public void clear(String fieldName) {
for (Key key : cache.asMap().keySet()) {
for (Key key : cache.keys()) {
if (key.indexCache.index.equals(index)) {
if (key.indexCache.fieldNames.fullName().equals(fieldName)) {
cache.invalidate(key);
}
}
}
// we call cleanUp() because this is a manual operation, should happen
// we call refresh because this is a manual operation, should happen
// rarely and probably means the user wants to see memory returned as
// soon as possible
cache.cleanUp();
cache.refresh();
}
@Override
@ -305,7 +292,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
logger.trace("running periodic field data cache cleanup");
}
try {
this.cache.cleanUp();
this.cache.refresh();
} catch (Exception e) {
logger.warn("Exception during periodic field data cache cleanup:", e);
}

View File

@ -19,13 +19,7 @@
package org.elasticsearch.script;
import java.nio.charset.StandardCharsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
@ -43,6 +37,10 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -67,13 +65,13 @@ import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
*
@ -153,12 +151,12 @@ public class ScriptService extends AbstractComponent implements Closeable {
this.defaultLang = settings.get(DEFAULT_SCRIPTING_LANGUAGE_SETTING, DEFAULT_LANG);
CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
CacheBuilder<String, CompiledScript> cacheBuilder = CacheBuilder.builder();
if (cacheMaxSize >= 0) {
cacheBuilder.maximumSize(cacheMaxSize);
cacheBuilder.setMaximumWeight(cacheMaxSize);
}
if (cacheExpire != null) {
cacheBuilder.expireAfterAccess(cacheExpire.nanos(), TimeUnit.NANOSECONDS);
cacheBuilder.setExpireAfterAccess(cacheExpire.nanos());
}
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
@ -301,7 +299,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
}
String cacheKey = getCacheKey(scriptEngineService, type == ScriptType.INLINE ? null : name, code);
CompiledScript compiledScript = cache.getIfPresent(cacheKey);
CompiledScript compiledScript = cache.get(cacheKey);
if (compiledScript == null) {
//Either an un-cached inline script or indexed script
@ -493,12 +491,8 @@ public class ScriptService extends AbstractComponent implements Closeable {
* script has been removed from the cache
*/
private class ScriptCacheRemovalListener implements RemovalListener<String, CompiledScript> {
@Override
public void onRemoval(RemovalNotification<String, CompiledScript> notification) {
if (logger.isDebugEnabled()) {
logger.debug("notifying script services of script removal due to: [{}]", notification.getCause());
}
scriptMetrics.onCacheEviction();
for (ScriptEngineService service : scriptEngines) {
try {

View File

@ -0,0 +1,536 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.cache;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.hamcrest.CoreMatchers.instanceOf;
public class CacheTests extends ESTestCase {
private int numberOfEntries;
@Before
public void setUp() throws Exception {
super.setUp();
numberOfEntries = randomIntBetween(1000, 10000);
logger.debug("numberOfEntries: " + numberOfEntries);
}
// cache some entries, then randomly lookup keys that do not exist, then check the stats
public void testCacheStats() {
AtomicLong evictions = new AtomicLong();
Set<Integer> keys = new HashSet<>();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.setMaximumWeight(numberOfEntries / 2)
.removalListener(notification -> {
keys.remove(notification.getKey());
evictions.incrementAndGet();
})
.build();
for (int i = 0; i < numberOfEntries; i++) {
// track the keys, which will be removed upon eviction (see the RemovalListener)
keys.add(i);
cache.put(i, Integer.toString(i));
}
long hits = 0;
long misses = 0;
Integer missingKey = 0;
for (Integer key : keys) {
--missingKey;
if (rarely()) {
misses++;
cache.get(missingKey);
} else {
hits++;
cache.get(key);
}
}
assertEquals(hits, cache.stats().getHits());
assertEquals(misses, cache.stats().getMisses());
assertEquals((long) Math.ceil(numberOfEntries / 2.0), evictions.get());
assertEquals(evictions.get(), cache.stats().getEvictions());
}
// cache some entries in batches of size maximumWeight; for each batch, touch the even entries to affect the
// ordering; upon the next caching of entries, the entries from the previous batch will be evicted; we can then
// check that the evicted entries were evicted in LRU order (first the odds in a batch, then the evens in a batch)
// for each batch
public void testCacheEvictions() {
int maximumWeight = randomIntBetween(1, numberOfEntries);
AtomicLong evictions = new AtomicLong();
List<Integer> evictedKeys = new ArrayList<>();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.setMaximumWeight(maximumWeight)
.removalListener(notification -> {
evictions.incrementAndGet();
evictedKeys.add(notification.getKey());
})
.build();
// cache entries up to numberOfEntries - maximumWeight; all of these entries will ultimately be evicted in
// batches of size maximumWeight, first the odds in the batch, then the evens in the batch
List<Integer> expectedEvictions = new ArrayList<>();
int iterations = (int)Math.ceil((numberOfEntries - maximumWeight) / (1.0 * maximumWeight));
for (int i = 0; i < iterations; i++) {
for (int j = i * maximumWeight; j < (i + 1) * maximumWeight && j < numberOfEntries - maximumWeight; j++) {
cache.put(j, Integer.toString(j));
if (j % 2 == 1) {
expectedEvictions.add(j);
}
}
for (int j = i * maximumWeight; j < (i + 1) * maximumWeight && j < numberOfEntries - maximumWeight; j++) {
if (j % 2 == 0) {
cache.get(j);
expectedEvictions.add(j);
}
}
}
// finish filling the cache
for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
assertEquals(numberOfEntries - maximumWeight, evictions.get());
assertEquals(evictions.get(), cache.stats().getEvictions());
// assert that the keys were evicted in LRU order
Set<Integer> keys = new HashSet<>();
List<Integer> remainingKeys = new ArrayList<>();
for (Integer key : cache.keys()) {
keys.add(key);
remainingKeys.add(key);
}
assertEquals(expectedEvictions.size(), evictedKeys.size());
for (int i = 0; i < expectedEvictions.size(); i++) {
assertFalse(keys.contains(expectedEvictions.get(i)));
assertEquals(expectedEvictions.get(i), evictedKeys.get(i));
}
for (int i = numberOfEntries - maximumWeight; i < numberOfEntries; i++) {
assertTrue(keys.contains(i));
assertEquals(
numberOfEntries - i + (numberOfEntries - maximumWeight) - 1,
(int) remainingKeys.get(i - (numberOfEntries - maximumWeight))
);
}
}
// cache some entries and exceed the maximum weight, then check that the cache has the expected weight and the
// expected evictions occurred
public void testWeigher() {
int maximumWeight = 2 * numberOfEntries;
int weight = randomIntBetween(2, 10);
AtomicLong evictions = new AtomicLong();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.setMaximumWeight(maximumWeight)
.weigher((k, v) -> weight)
.removalListener(notification -> evictions.incrementAndGet())
.build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
// cache weight should be the largest multiple of weight less than maximumWeight
assertEquals(weight * (maximumWeight / weight), cache.weight());
// the number of evicted entries should be the number of entries that fit in the excess weight
assertEquals((int) Math.ceil((weight - 2) * numberOfEntries / (1.0 * weight)), evictions.get());
assertEquals(evictions.get(), cache.stats().getEvictions());
}
// cache some entries, randomly invalidate some of them, then check that the weight of the cache is correct
public void testWeight() {
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.weigher((k, v) -> k)
.build();
int weight = 0;
for (int i = 0; i < numberOfEntries; i++) {
weight += i;
cache.put(i, Integer.toString(i));
}
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
weight -= i;
cache.invalidate(i);
}
}
assertEquals(weight, cache.weight());
}
// cache some entries, randomly invalidate some of them, then check that the number of cached entries is correct
public void testCount() {
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
int count = 0;
for (int i = 0; i < numberOfEntries; i++) {
count++;
cache.put(i, Integer.toString(i));
}
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
count--;
cache.invalidate(i);
}
}
assertEquals(count, cache.count());
}
// cache some entries, step the clock forward, cache some more entries, step the clock forward and then check that
// the first batch of cached entries expired and were removed
public void testExpirationAfterAccess() {
AtomicLong now = new AtomicLong();
Cache<Integer, String> cache = new Cache<Integer, String>() {
@Override
protected long now() {
return now.get();
}
};
cache.setExpireAfterAccess(1);
List<Integer> evictedKeys = new ArrayList<>();
cache.setRemovalListener(notification -> {
assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason());
evictedKeys.add(notification.getKey());
});
now.set(0);
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
now.set(1);
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
now.set(2);
cache.refresh();
assertEquals(numberOfEntries, cache.count());
for (int i = 0; i < evictedKeys.size(); i++) {
assertEquals(i, (int) evictedKeys.get(i));
}
Set<Integer> remainingKeys = new HashSet<>();
for (Integer key : cache.keys()) {
remainingKeys.add(key);
}
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
assertTrue(remainingKeys.contains(i));
}
}
public void testExpirationAfterWrite() {
AtomicLong now = new AtomicLong();
Cache<Integer, String> cache = new Cache<Integer, String>() {
@Override
protected long now() {
return now.get();
}
};
cache.setExpireAfterWrite(1);
List<Integer> evictedKeys = new ArrayList<>();
cache.setRemovalListener(notification -> {
assertEquals(RemovalNotification.RemovalReason.EVICTED, notification.getRemovalReason());
evictedKeys.add(notification.getKey());
});
now.set(0);
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
now.set(1);
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
now.set(2);
for (int i = 0; i < numberOfEntries; i++) {
cache.get(i);
}
cache.refresh();
assertEquals(numberOfEntries, cache.count());
for (int i = 0; i < evictedKeys.size(); i++) {
assertEquals(i, (int) evictedKeys.get(i));
}
Set<Integer> remainingKeys = new HashSet<>();
for (Integer key : cache.keys()) {
remainingKeys.add(key);
}
for (int i = numberOfEntries; i < 2 * numberOfEntries; i++) {
assertTrue(remainingKeys.contains(i));
}
}
// randomly promote some entries, step the clock forward, then check that the promoted entries remain and the
// non-promoted entries were removed
public void testPromotion() {
AtomicLong now = new AtomicLong();
Cache<Integer, String> cache = new Cache<Integer, String>() {
@Override
protected long now() {
return now.get();
}
};
cache.setExpireAfterAccess(1);
now.set(0);
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
now.set(1);
Set<Integer> promotedKeys = new HashSet<>();
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
cache.get(i);
promotedKeys.add(i);
}
}
now.set(2);
cache.refresh();
assertEquals(promotedKeys.size(), cache.count());
for (int i = 0; i < numberOfEntries; i++) {
if (promotedKeys.contains(i)) {
assertNotNull(cache.get(i));
} else {
assertNull(cache.get(i));
}
}
}
// randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null
public void testInvalidate() {
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
Set<Integer> keys = new HashSet<>();
for (Integer key : cache.keys()) {
if (rarely()) {
cache.invalidate(key);
keys.add(key);
}
}
for (int i = 0; i < numberOfEntries; i++) {
if (keys.contains(i)) {
assertNull(cache.get(i));
} else {
assertNotNull(cache.get(i));
}
}
}
// randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only
// those entries
public void testNotificationOnInvalidate() {
Set<Integer> notifications = new HashSet<>();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.removalListener(notification -> {
assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason());
notifications.add(notification.getKey());
})
.build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
Set<Integer> invalidated = new HashSet<>();
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
cache.invalidate(i);
invalidated.add(i);
}
}
assertEquals(notifications, invalidated);
}
// invalidate all cached entries, then check that the cache is empty
public void testInvalidateAll() {
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
cache.invalidateAll();
assertEquals(0, cache.count());
assertEquals(0, cache.weight());
}
// invalidate all cached entries, then check that we receive invalidate notifications for all entries
public void testNotificationOnInvalidateAll() {
Set<Integer> notifications = new HashSet<>();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.removalListener(notification -> {
assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason());
notifications.add(notification.getKey());
})
.build();
Set<Integer> invalidated = new HashSet<>();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
invalidated.add(i);
}
cache.invalidateAll();
assertEquals(invalidated, notifications);
}
// randomly replace some entries, increasing the weight by 1 for each replacement, then count that the cache size
// is correct
public void testReplaceRecomputesSize() {
class Key {
private int key;
private long weight;
public Key(int key, long weight) {
this.key = key;
this.weight = weight;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key1 = (Key) o;
return key == key1.key;
}
@Override
public int hashCode() {
return key;
}
}
Cache<Key, String> cache = CacheBuilder.<Key, String>builder().weigher((k, s) -> k.weight).build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(new Key(i, 1), Integer.toString(i));
}
assertEquals(numberOfEntries, cache.count());
assertEquals(numberOfEntries, cache.weight());
int replaced = 0;
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
replaced++;
cache.put(new Key(i, 2), Integer.toString(i));
}
}
assertEquals(numberOfEntries, cache.count());
assertEquals(numberOfEntries + replaced, cache.weight());
}
// randomly replace some entries, then check that we received replacement notifications for those and only those
// entries
public void testNotificationOnReplace() {
Set<Integer> notifications = new HashSet<>();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.removalListener(notification -> {
assertEquals(RemovalNotification.RemovalReason.REPLACED, notification.getRemovalReason());
notifications.add(notification.getKey());
})
.build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
Set<Integer> replacements = new HashSet<>();
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
cache.put(i, Integer.toString(i) + Integer.toString(i));
replacements.add(i);
}
}
assertEquals(replacements, notifications);
}
public void testComputeIfAbsentCallsOnce() throws InterruptedException {
int numberOfThreads = randomIntBetween(2, 200);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
List<Thread> threads = new ArrayList<>();
AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries);
for (int j = 0; j < numberOfEntries; j++) {
flags.set(j, false);
}
CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
latch.countDown();
for (int j = 0; j < numberOfEntries; j++) {
try {
cache.computeIfAbsent(j, key -> {
assertTrue(flags.compareAndSet(key, false, true));
return Integer.toString(key);
});
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
});
threads.add(thread);
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() {
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
try {
cache.computeIfAbsent(1, k -> null);
fail("expected ExecutionException");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(NullPointerException.class));
}
}
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
// here be dragons: this test did catch one subtle bug during development; do not remove lightly
public void testTorture() throws InterruptedException {
int numberOfThreads = randomIntBetween(2, 200);
final Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.setMaximumWeight(1000)
.weigher((k, v) -> 2)
.build();
CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
Random random = new Random(random().nextLong());
latch.countDown();
for (int j = 0; j < numberOfEntries; j++) {
Integer key = random.nextInt(numberOfEntries);
cache.put(key, Integer.toString(j));
}
});
threads.add(thread);
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
cache.refresh();
assertEquals(500, cache.count());
}
}

View File

@ -96,7 +96,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
// now cached
assertThat(matchCount(filter, reader), equalTo(3));
// There are 3 segments
assertThat(cache.getLoadedFilters().size(), equalTo(3l));
assertThat(cache.getLoadedFilters().weight(), equalTo(3L));
writer.forceMerge(1);
reader.close();
@ -108,12 +108,12 @@ public class BitSetFilterCacheTests extends ESTestCase {
// now cached
assertThat(matchCount(filter, reader), equalTo(3));
// Only one segment now, so the size must be 1
assertThat(cache.getLoadedFilters().size(), equalTo(1l));
assertThat(cache.getLoadedFilters().weight(), equalTo(1L));
reader.close();
writer.close();
// There is no reference from readers and writer to any segment in the test index, so the size in the fbs cache must be 0
assertThat(cache.getLoadedFilters().size(), equalTo(0l));
assertThat(cache.getLoadedFilters().weight(), equalTo(0L));
}
public void testListener() throws IOException {

View File

@ -601,10 +601,10 @@ public abstract class AbstractStringFieldDataTestCase extends AbstractFieldDataI
assertThat(ifd.loadGlobal(topLevelReader), sameInstance(globalOrdinals));
// 3 b/c 1 segment level caches and 1 top level cache
// in case of doc values, we don't cache atomic FD, so only the top-level cache is there
assertThat(indicesFieldDataCache.getCache().size(), equalTo(hasDocValues() ? 1L : 4L));
assertThat(indicesFieldDataCache.getCache().weight(), equalTo(hasDocValues() ? 1L : 4L));
IndexOrdinalsFieldData cachedInstance = null;
for (Accountable ramUsage : indicesFieldDataCache.getCache().asMap().values()) {
for (Accountable ramUsage : indicesFieldDataCache.getCache().values()) {
if (ramUsage instanceof IndexOrdinalsFieldData) {
cachedInstance = (IndexOrdinalsFieldData) ramUsage;
break;
@ -613,12 +613,12 @@ public abstract class AbstractStringFieldDataTestCase extends AbstractFieldDataI
assertThat(cachedInstance, sameInstance(globalOrdinals));
topLevelReader.close();
// Now only 3 segment level entries, only the toplevel reader has been closed, but the segment readers are still used by IW
assertThat(indicesFieldDataCache.getCache().size(), equalTo(hasDocValues() ? 0L : 3L));
assertThat(indicesFieldDataCache.getCache().weight(), equalTo(hasDocValues() ? 0L : 3L));
refreshReader();
assertThat(ifd.loadGlobal(topLevelReader), not(sameInstance(globalOrdinals)));
ifdService.clear();
assertThat(indicesFieldDataCache.getCache().size(), equalTo(0l));
assertThat(indicesFieldDataCache.getCache().weight(), equalTo(0l));
}
}

View File

@ -188,7 +188,7 @@ public class RandomExceptionCircuitBreakerIT extends ESIntegTestCase {
for (String node : internalCluster().getNodeNames()) {
final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesFieldDataCache.class, node);
// Clean up the cache, ensuring that entries' listeners have been called
fdCache.getCache().cleanUp();
fdCache.getCache().refresh();
}
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
.clear().setBreaker(true).execute().actionGet();

View File

@ -1852,7 +1852,7 @@ public final class InternalTestCluster extends TestCluster {
for (NodeAndClient nodeAndClient : nodes.values()) {
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesFieldDataCache.class, nodeAndClient.node);
// Clean up the cache, ensuring that entries' listeners have been called
fdCache.getCache().cleanUp();
fdCache.getCache().refresh();
final String name = nodeAndClient.name;
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);