Remove and forbid use of com.google.common.cache.*
This commit removes and now forbids all uses of com.google.common.cache.Cache, com.google.common.cache.CacheBuilder, com.google.common.cache.RemovalListener, com.google.common.cache.RemovalNotification, com.google.common.cache.Weigher across the codebase. This is a major step in the eventual removal of Guava as a dependency. Relates #13224
This commit is contained in:
parent
aa8bfeb88c
commit
5d340f5e6e
|
@ -19,11 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.cache.bitset;
|
package org.elasticsearch.index.cache.bitset;
|
||||||
|
|
||||||
import com.google.common.cache.Cache;
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
|
||||||
import com.google.common.cache.RemovalListener;
|
|
||||||
import com.google.common.cache.RemovalNotification;
|
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexReaderContext;
|
import org.apache.lucene.index.IndexReaderContext;
|
||||||
import org.apache.lucene.index.LeafReader;
|
import org.apache.lucene.index.LeafReader;
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
|
@ -38,6 +33,10 @@ import org.apache.lucene.util.BitDocIdSet;
|
||||||
import org.apache.lucene.util.BitSet;
|
import org.apache.lucene.util.BitSet;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.cache.Cache;
|
||||||
|
import org.elasticsearch.common.cache.CacheBuilder;
|
||||||
|
import org.elasticsearch.common.cache.RemovalListener;
|
||||||
|
import org.elasticsearch.common.cache.RemovalNotification;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lucene.search.Queries;
|
import org.elasticsearch.common.lucene.search.Queries;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -58,10 +57,11 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
|
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
|
||||||
|
@ -94,10 +94,11 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
|
||||||
public BitsetFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
public BitsetFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
||||||
super(index, indexSettings);
|
super(index, indexSettings);
|
||||||
this.loadRandomAccessFiltersEagerly = indexSettings.getAsBoolean(LOAD_RANDOM_ACCESS_FILTERS_EAGERLY, true);
|
this.loadRandomAccessFiltersEagerly = indexSettings.getAsBoolean(LOAD_RANDOM_ACCESS_FILTERS_EAGERLY, true);
|
||||||
this.loadedFilters = CacheBuilder.newBuilder().removalListener(this).build();
|
this.loadedFilters = CacheBuilder.<Object, Cache<Query, Value>>builder().removalListener(this).build();
|
||||||
this.warmer = new BitSetProducerWarmer();
|
this.warmer = new BitSetProducerWarmer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Inject(optional = true)
|
@Inject(optional = true)
|
||||||
public void setIndicesWarmer(IndicesWarmer indicesWarmer) {
|
public void setIndicesWarmer(IndicesWarmer indicesWarmer) {
|
||||||
this.indicesWarmer = indicesWarmer;
|
this.indicesWarmer = indicesWarmer;
|
||||||
|
@ -144,36 +145,37 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
|
||||||
private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext context) throws IOException, ExecutionException {
|
private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext context) throws IOException, ExecutionException {
|
||||||
final Object coreCacheReader = context.reader().getCoreCacheKey();
|
final Object coreCacheReader = context.reader().getCoreCacheKey();
|
||||||
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
||||||
Cache<Query, Value> filterToFbs = loadedFilters.get(coreCacheReader, new Callable<Cache<Query, Value>>() {
|
Cache<Query, Value> filterToFbs = loadedFilters.computeIfAbsent(coreCacheReader, key -> {
|
||||||
@Override
|
context.reader().addCoreClosedListener(BitsetFilterCache.this);
|
||||||
public Cache<Query, Value> call() throws Exception {
|
return CacheBuilder.<Query, Value>builder().build();
|
||||||
context.reader().addCoreClosedListener(BitsetFilterCache.this);
|
|
||||||
return CacheBuilder.newBuilder().build();
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
return filterToFbs.get(query, () -> {
|
|
||||||
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
|
|
||||||
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
|
|
||||||
searcher.setQueryCache(null);
|
|
||||||
final Weight weight = searcher.createNormalizedWeight(query, false);
|
|
||||||
final DocIdSetIterator it = weight.scorer(context);
|
|
||||||
final BitSet bitSet;
|
|
||||||
if (it == null) {
|
|
||||||
bitSet = null;
|
|
||||||
} else {
|
|
||||||
bitSet = BitSet.of(it, context.reader().maxDoc());
|
|
||||||
}
|
|
||||||
|
|
||||||
Value value = new Value(bitSet, shardId);
|
return filterToFbs.computeIfAbsent(query, key -> {
|
||||||
listener.onCache(shardId, value.bitset);
|
try {
|
||||||
return value;
|
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
|
||||||
|
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
|
||||||
|
searcher.setQueryCache(null);
|
||||||
|
final Weight weight = searcher.createNormalizedWeight(query, false);
|
||||||
|
final DocIdSetIterator it = weight.scorer(context);
|
||||||
|
final BitSet bitSet;
|
||||||
|
if (it == null) {
|
||||||
|
bitSet = null;
|
||||||
|
} else {
|
||||||
|
bitSet = BitSet.of(it, context.reader().maxDoc());
|
||||||
|
}
|
||||||
|
|
||||||
|
Value value = new Value(bitSet, shardId);
|
||||||
|
listener.onCache(shardId, value.bitset);
|
||||||
|
return value;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}).bitset;
|
}).bitset;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRemoval(RemovalNotification<Object, Cache<Query, Value>> notification) {
|
public void onRemoval(RemovalNotification<Object, Cache<Query, Value>> notification) {
|
||||||
Object key = notification.getKey();
|
if (notification.getKey() == null) {
|
||||||
if (key == null) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +184,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Value value : valueCache.asMap().values()) {
|
for (Value value : valueCache.values()) {
|
||||||
listener.onRemoval(value.shardId, value.bitset);
|
listener.onRemoval(value.shardId, value.bitset);
|
||||||
// if null then this means the shard has already been removed and the stats are 0 anyway for the shard this key belongs to
|
// if null then this means the shard has already been removed and the stats are 0 anyway for the shard this key belongs to
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,8 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.cache.request;
|
package org.elasticsearch.index.cache.request;
|
||||||
|
|
||||||
import com.google.common.cache.RemovalListener;
|
import org.elasticsearch.common.cache.RemovalListener;
|
||||||
import com.google.common.cache.RemovalNotification;
|
import org.elasticsearch.common.cache.RemovalNotification;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
|
||||||
import org.elasticsearch.common.metrics.CounterMetric;
|
import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
@ -61,7 +59,7 @@ public class ShardRequestCache extends AbstractIndexShardComponent implements Re
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> removalNotification) {
|
public void onRemoval(RemovalNotification<IndicesRequestCache.Key, IndicesRequestCache.Value> removalNotification) {
|
||||||
if (removalNotification.wasEvicted()) {
|
if (removalNotification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED) {
|
||||||
evictionsMetric.inc();
|
evictionsMetric.inc();
|
||||||
}
|
}
|
||||||
long dec = 0;
|
long dec = 0;
|
||||||
|
|
|
@ -21,12 +21,6 @@ package org.elasticsearch.indices.cache.request;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.ObjectHashSet;
|
import com.carrotsearch.hppc.ObjectHashSet;
|
||||||
import com.carrotsearch.hppc.ObjectSet;
|
import com.carrotsearch.hppc.ObjectSet;
|
||||||
import com.google.common.cache.Cache;
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
|
||||||
import com.google.common.cache.RemovalListener;
|
|
||||||
import com.google.common.cache.RemovalNotification;
|
|
||||||
import com.google.common.cache.Weigher;
|
|
||||||
|
|
||||||
import org.apache.lucene.index.DirectoryReader;
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.util.Accountable;
|
import org.apache.lucene.util.Accountable;
|
||||||
|
@ -35,6 +29,10 @@ import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.cache.Cache;
|
||||||
|
import org.elasticsearch.common.cache.CacheBuilder;
|
||||||
|
import org.elasticsearch.common.cache.RemovalListener;
|
||||||
|
import org.elasticsearch.common.cache.RemovalNotification;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
@ -51,14 +49,11 @@ import org.elasticsearch.search.query.QueryPhase;
|
||||||
import org.elasticsearch.search.query.QuerySearchResult;
|
import org.elasticsearch.search.query.QuerySearchResult;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.*;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.elasticsearch.common.Strings.hasLength;
|
import static org.elasticsearch.common.Strings.hasLength;
|
||||||
|
|
||||||
|
@ -162,25 +157,17 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
private void buildCache() {
|
private void buildCache() {
|
||||||
long sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size, INDICES_CACHE_QUERY_SIZE).bytes();
|
long sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size, INDICES_CACHE_QUERY_SIZE).bytes();
|
||||||
|
|
||||||
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.newBuilder()
|
CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.<Key, Value>builder()
|
||||||
.maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this);
|
.setMaximumWeight(sizeInBytes).weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()).removalListener(this);
|
||||||
cacheBuilder.concurrencyLevel(concurrencyLevel);
|
// cacheBuilder.concurrencyLevel(concurrencyLevel);
|
||||||
|
|
||||||
if (expire != null) {
|
if (expire != null) {
|
||||||
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
|
cacheBuilder.setExpireAfter(TimeUnit.MILLISECONDS.toNanos(expire.millis()));
|
||||||
}
|
}
|
||||||
|
|
||||||
cache = cacheBuilder.build();
|
cache = cacheBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class QueryCacheWeigher implements Weigher<Key, Value> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int weigh(Key key, Value value) {
|
|
||||||
return (int) (key.ramBytesUsed() + value.ramBytesUsed());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
reaper.close();
|
reaper.close();
|
||||||
cache.invalidateAll();
|
cache.invalidateAll();
|
||||||
|
@ -197,9 +184,6 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRemoval(RemovalNotification<Key, Value> notification) {
|
public void onRemoval(RemovalNotification<Key, Value> notification) {
|
||||||
if (notification.getKey() == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
notification.getKey().shard.requestCache().onRemoval(notification);
|
notification.getKey().shard.requestCache().onRemoval(notification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,8 +242,8 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
|
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
|
||||||
assert canCache(request, context);
|
assert canCache(request, context);
|
||||||
Key key = buildKey(request, context);
|
Key key = buildKey(request, context);
|
||||||
Loader loader = new Loader(queryPhase, context, key);
|
Loader loader = new Loader(queryPhase, context);
|
||||||
Value value = cache.get(key, loader);
|
Value value = cache.computeIfAbsent(key, loader);
|
||||||
if (loader.isLoaded()) {
|
if (loader.isLoaded()) {
|
||||||
key.shard.requestCache().onMiss();
|
key.shard.requestCache().onMiss();
|
||||||
// see if its the first time we see this reader, and make sure to register a cleanup key
|
// see if its the first time we see this reader, and make sure to register a cleanup key
|
||||||
|
@ -279,17 +263,15 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Loader implements Callable<Value> {
|
private static class Loader implements Function<Key, Value> {
|
||||||
|
|
||||||
private final QueryPhase queryPhase;
|
private final QueryPhase queryPhase;
|
||||||
private final SearchContext context;
|
private final SearchContext context;
|
||||||
private final IndicesRequestCache.Key key;
|
|
||||||
private boolean loaded;
|
private boolean loaded;
|
||||||
|
|
||||||
Loader(QueryPhase queryPhase, SearchContext context, IndicesRequestCache.Key key) {
|
Loader(QueryPhase queryPhase, SearchContext context) {
|
||||||
this.queryPhase = queryPhase;
|
this.queryPhase = queryPhase;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.key = key;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isLoaded() {
|
public boolean isLoaded() {
|
||||||
|
@ -297,7 +279,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Value call() throws Exception {
|
public Value apply(Key key) {
|
||||||
queryPhase.execute(context);
|
queryPhase.execute(context);
|
||||||
|
|
||||||
/* BytesStreamOutput allows to pass the expected size but by default uses
|
/* BytesStreamOutput allows to pass the expected size but by default uses
|
||||||
|
@ -317,6 +299,8 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
Value value = new Value(reference, out.ramBytesUsed());
|
Value value = new Value(reference, out.ramBytesUsed());
|
||||||
key.shard.requestCache().onCached(key, value);
|
key.shard.requestCache().onCached(key, value);
|
||||||
return value;
|
return value;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -473,7 +457,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
|
|
||||||
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
|
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
|
||||||
CleanupKey lookupKey = new CleanupKey(null, -1);
|
CleanupKey lookupKey = new CleanupKey(null, -1);
|
||||||
for (Iterator<Key> iterator = cache.asMap().keySet().iterator(); iterator.hasNext(); ) {
|
for (Iterator<Key> iterator = cache.keys().iterator(); iterator.hasNext(); ) {
|
||||||
Key key = iterator.next();
|
Key key = iterator.next();
|
||||||
if (currentFullClean.contains(key.shard)) {
|
if (currentFullClean.contains(key.shard)) {
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
|
@ -487,7 +471,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.cleanUp();
|
cache.refresh();
|
||||||
currentKeysToClean.clear();
|
currentKeysToClean.clear();
|
||||||
currentFullClean.clear();
|
currentFullClean.clear();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,15 @@
|
||||||
|
|
||||||
package org.elasticsearch.indices.fielddata.cache;
|
package org.elasticsearch.indices.fielddata.cache;
|
||||||
|
|
||||||
import com.google.common.cache.*;
|
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
|
import org.apache.lucene.index.LeafReaderContext;
|
||||||
import org.apache.lucene.index.SegmentReader;
|
import org.apache.lucene.index.SegmentReader;
|
||||||
import org.apache.lucene.util.Accountable;
|
import org.apache.lucene.util.Accountable;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.cache.Cache;
|
||||||
|
import org.elasticsearch.common.cache.CacheBuilder;
|
||||||
|
import org.elasticsearch.common.cache.RemovalListener;
|
||||||
|
import org.elasticsearch.common.cache.RemovalNotification;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
@ -43,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.function.ToLongBiFunction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -66,17 +70,11 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
|
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
|
||||||
final String size = settings.get(INDICES_FIELDDATA_CACHE_SIZE_KEY, "-1");
|
final String size = settings.get(INDICES_FIELDDATA_CACHE_SIZE_KEY, "-1");
|
||||||
final long sizeInBytes = settings.getAsMemory(INDICES_FIELDDATA_CACHE_SIZE_KEY, "-1").bytes();
|
final long sizeInBytes = settings.getAsMemory(INDICES_FIELDDATA_CACHE_SIZE_KEY, "-1").bytes();
|
||||||
CacheBuilder<Key, Accountable> cacheBuilder = CacheBuilder.newBuilder()
|
CacheBuilder<Key, Accountable> cacheBuilder = CacheBuilder.<Key, Accountable>builder()
|
||||||
.removalListener(this);
|
.removalListener(this);
|
||||||
if (sizeInBytes > 0) {
|
if (sizeInBytes > 0) {
|
||||||
cacheBuilder.maximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
|
cacheBuilder.setMaximumWeight(sizeInBytes).weigher(new FieldDataWeigher());
|
||||||
}
|
}
|
||||||
// defaults to 4, but this is a busy map for all indices, increase it a bit by default
|
|
||||||
final int concurrencyLevel = settings.getAsInt(FIELDDATA_CACHE_CONCURRENCY_LEVEL, 16);
|
|
||||||
if (concurrencyLevel <= 0) {
|
|
||||||
throw new IllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
|
|
||||||
}
|
|
||||||
cacheBuilder.concurrencyLevel(concurrencyLevel);
|
|
||||||
|
|
||||||
logger.debug("using size [{}] [{}]", size, new ByteSizeValue(sizeInBytes));
|
logger.debug("using size [{}] [{}]", size, new ByteSizeValue(sizeInBytes));
|
||||||
cache = cacheBuilder.build();
|
cache = cacheBuilder.build();
|
||||||
|
@ -108,7 +106,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
final Accountable value = notification.getValue();
|
final Accountable value = notification.getValue();
|
||||||
for (IndexFieldDataCache.Listener listener : key.listeners) {
|
for (IndexFieldDataCache.Listener listener : key.listeners) {
|
||||||
try {
|
try {
|
||||||
listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed());
|
listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.getRemovalReason() == RemovalNotification.RemovalReason.EVICTED, value.ramBytesUsed());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// load anyway since listeners should not throw exceptions
|
// load anyway since listeners should not throw exceptions
|
||||||
logger.error("Failed to call listener on field data cache unloading", e);
|
logger.error("Failed to call listener on field data cache unloading", e);
|
||||||
|
@ -116,10 +114,9 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FieldDataWeigher implements Weigher<Key, Accountable> {
|
public static class FieldDataWeigher implements ToLongBiFunction<Key, Accountable> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int weigh(Key key, Accountable ramUsage) {
|
public long applyAsLong(Key key, Accountable ramUsage) {
|
||||||
int weight = (int) Math.min(ramUsage.ramBytesUsed(), Integer.MAX_VALUE);
|
int weight = (int) Math.min(ramUsage.ramBytesUsed(), Integer.MAX_VALUE);
|
||||||
return weight == 0 ? 1 : weight;
|
return weight == 0 ? 1 : weight;
|
||||||
}
|
}
|
||||||
|
@ -150,13 +147,18 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
||||||
final Key key = new Key(this, context.reader().getCoreCacheKey(), shardId);
|
final Key key = new Key(this, context.reader().getCoreCacheKey(), shardId);
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
final Accountable accountable = cache.get(key, () -> {
|
final Accountable accountable = cache.computeIfAbsent(key, k -> {
|
||||||
context.reader().addCoreClosedListener(IndexFieldCache.this);
|
context.reader().addCoreClosedListener(IndexFieldCache.this);
|
||||||
for (Listener listener : this.listeners) {
|
for (Listener listener : this.listeners) {
|
||||||
key.listeners.add(listener);
|
k.listeners.add(listener);
|
||||||
}
|
}
|
||||||
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
|
final AtomicFieldData fieldData;
|
||||||
for (Listener listener : key.listeners) {
|
try {
|
||||||
|
fieldData = indexFieldData.loadDirect(context);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
for (Listener listener : k.listeners) {
|
||||||
try {
|
try {
|
||||||
listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
|
listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -174,13 +176,18 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
final ShardId shardId = ShardUtils.extractShardId(indexReader);
|
final ShardId shardId = ShardUtils.extractShardId(indexReader);
|
||||||
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
|
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
final Accountable accountable = cache.get(key, () -> {
|
final Accountable accountable = cache.computeIfAbsent(key, k -> {
|
||||||
indexReader.addReaderClosedListener(IndexFieldCache.this);
|
indexReader.addReaderClosedListener(IndexFieldCache.this);
|
||||||
for (Listener listener : this.listeners) {
|
for (Listener listener : this.listeners) {
|
||||||
key.listeners.add(listener);
|
k.listeners.add(listener);
|
||||||
}
|
}
|
||||||
final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
|
final Accountable ifd;
|
||||||
for (Listener listener : key.listeners) {
|
try {
|
||||||
|
ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
for (Listener listener : k.listeners) {
|
||||||
try {
|
try {
|
||||||
listener.onCache(shardId, fieldNames, fieldDataType, ifd);
|
listener.onCache(shardId, fieldNames, fieldDataType, ifd);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -207,38 +214,28 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
for (Key key : cache.asMap().keySet()) {
|
for (Key key : cache.keys()) {
|
||||||
if (key.indexCache.index.equals(index)) {
|
if (key.indexCache.index.equals(index)) {
|
||||||
cache.invalidate(key);
|
cache.invalidate(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Note that cache invalidation in Guava does not immediately remove
|
// force eviction
|
||||||
// values from the cache. In the case of a cache with a rare write or
|
cache.refresh();
|
||||||
// read rate, it's possible for values to persist longer than desired.
|
|
||||||
//
|
|
||||||
// Note this is intended by the Guava developers, see:
|
|
||||||
// https://code.google.com/p/guava-libraries/wiki/CachesExplained#Eviction
|
|
||||||
// (the "When Does Cleanup Happen" section)
|
|
||||||
|
|
||||||
// We call it explicitly here since it should be a "rare" operation, and
|
|
||||||
// if a user runs it he probably wants to see memory returned as soon as
|
|
||||||
// possible
|
|
||||||
cache.cleanUp();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear(String fieldName) {
|
public void clear(String fieldName) {
|
||||||
for (Key key : cache.asMap().keySet()) {
|
for (Key key : cache.keys()) {
|
||||||
if (key.indexCache.index.equals(index)) {
|
if (key.indexCache.index.equals(index)) {
|
||||||
if (key.indexCache.fieldNames.fullName().equals(fieldName)) {
|
if (key.indexCache.fieldNames.fullName().equals(fieldName)) {
|
||||||
cache.invalidate(key);
|
cache.invalidate(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// we call cleanUp() because this is a manual operation, should happen
|
// we call refresh because this is a manual operation, should happen
|
||||||
// rarely and probably means the user wants to see memory returned as
|
// rarely and probably means the user wants to see memory returned as
|
||||||
// soon as possible
|
// soon as possible
|
||||||
cache.cleanUp();
|
cache.refresh();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -305,7 +302,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
logger.trace("running periodic field data cache cleanup");
|
logger.trace("running periodic field data cache cleanup");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
this.cache.cleanUp();
|
this.cache.refresh();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("Exception during periodic field data cache cleanup:", e);
|
logger.warn("Exception during periodic field data cache cleanup:", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.script;
|
package org.elasticsearch.script;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import com.google.common.cache.Cache;
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
|
||||||
import com.google.common.cache.RemovalListener;
|
|
||||||
import com.google.common.cache.RemovalNotification;
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
|
@ -43,6 +37,10 @@ import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.ParseFieldMatcher;
|
import org.elasticsearch.common.ParseFieldMatcher;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.cache.Cache;
|
||||||
|
import org.elasticsearch.common.cache.CacheBuilder;
|
||||||
|
import org.elasticsearch.common.cache.RemovalListener;
|
||||||
|
import org.elasticsearch.common.cache.RemovalNotification;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -67,13 +65,13 @@ import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -153,12 +151,12 @@ public class ScriptService extends AbstractComponent implements Closeable {
|
||||||
|
|
||||||
this.defaultLang = settings.get(DEFAULT_SCRIPTING_LANGUAGE_SETTING, DEFAULT_LANG);
|
this.defaultLang = settings.get(DEFAULT_SCRIPTING_LANGUAGE_SETTING, DEFAULT_LANG);
|
||||||
|
|
||||||
CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
|
CacheBuilder<String, CompiledScript> cacheBuilder = CacheBuilder.builder();
|
||||||
if (cacheMaxSize >= 0) {
|
if (cacheMaxSize >= 0) {
|
||||||
cacheBuilder.maximumSize(cacheMaxSize);
|
cacheBuilder.setMaximumWeight(cacheMaxSize);
|
||||||
}
|
}
|
||||||
if (cacheExpire != null) {
|
if (cacheExpire != null) {
|
||||||
cacheBuilder.expireAfterAccess(cacheExpire.nanos(), TimeUnit.NANOSECONDS);
|
cacheBuilder.setExpireAfter(cacheExpire.nanos());
|
||||||
}
|
}
|
||||||
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
|
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();
|
||||||
|
|
||||||
|
@ -301,7 +299,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
String cacheKey = getCacheKey(scriptEngineService, type == ScriptType.INLINE ? null : name, code);
|
String cacheKey = getCacheKey(scriptEngineService, type == ScriptType.INLINE ? null : name, code);
|
||||||
CompiledScript compiledScript = cache.getIfPresent(cacheKey);
|
CompiledScript compiledScript = cache.get(cacheKey);
|
||||||
|
|
||||||
if (compiledScript == null) {
|
if (compiledScript == null) {
|
||||||
//Either an un-cached inline script or indexed script
|
//Either an un-cached inline script or indexed script
|
||||||
|
@ -493,12 +491,8 @@ public class ScriptService extends AbstractComponent implements Closeable {
|
||||||
* script has been removed from the cache
|
* script has been removed from the cache
|
||||||
*/
|
*/
|
||||||
private class ScriptCacheRemovalListener implements RemovalListener<String, CompiledScript> {
|
private class ScriptCacheRemovalListener implements RemovalListener<String, CompiledScript> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRemoval(RemovalNotification<String, CompiledScript> notification) {
|
public void onRemoval(RemovalNotification<String, CompiledScript> notification) {
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("notifying script services of script removal due to: [{}]", notification.getCause());
|
|
||||||
}
|
|
||||||
scriptMetrics.onCacheEviction();
|
scriptMetrics.onCacheEviction();
|
||||||
for (ScriptEngineService service : scriptEngines) {
|
for (ScriptEngineService service : scriptEngines) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
||||||
// now cached
|
// now cached
|
||||||
assertThat(matchCount(filter, reader), equalTo(3));
|
assertThat(matchCount(filter, reader), equalTo(3));
|
||||||
// There are 3 segments
|
// There are 3 segments
|
||||||
assertThat(cache.getLoadedFilters().size(), equalTo(3l));
|
assertThat(cache.getLoadedFilters().weight(), equalTo(3L));
|
||||||
|
|
||||||
writer.forceMerge(1);
|
writer.forceMerge(1);
|
||||||
reader.close();
|
reader.close();
|
||||||
|
@ -108,12 +108,12 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
||||||
// now cached
|
// now cached
|
||||||
assertThat(matchCount(filter, reader), equalTo(3));
|
assertThat(matchCount(filter, reader), equalTo(3));
|
||||||
// Only one segment now, so the size must be 1
|
// Only one segment now, so the size must be 1
|
||||||
assertThat(cache.getLoadedFilters().size(), equalTo(1l));
|
assertThat(cache.getLoadedFilters().weight(), equalTo(1L));
|
||||||
|
|
||||||
reader.close();
|
reader.close();
|
||||||
writer.close();
|
writer.close();
|
||||||
// There is no reference from readers and writer to any segment in the test index, so the size in the fbs cache must be 0
|
// There is no reference from readers and writer to any segment in the test index, so the size in the fbs cache must be 0
|
||||||
assertThat(cache.getLoadedFilters().size(), equalTo(0l));
|
assertThat(cache.getLoadedFilters().weight(), equalTo(0L));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testListener() throws IOException {
|
public void testListener() throws IOException {
|
||||||
|
|
|
@ -601,10 +601,10 @@ public abstract class AbstractStringFieldDataTestCase extends AbstractFieldDataI
|
||||||
assertThat(ifd.loadGlobal(topLevelReader), sameInstance(globalOrdinals));
|
assertThat(ifd.loadGlobal(topLevelReader), sameInstance(globalOrdinals));
|
||||||
// 3 b/c 1 segment level caches and 1 top level cache
|
// 3 b/c 1 segment level caches and 1 top level cache
|
||||||
// in case of doc values, we don't cache atomic FD, so only the top-level cache is there
|
// in case of doc values, we don't cache atomic FD, so only the top-level cache is there
|
||||||
assertThat(indicesFieldDataCache.getCache().size(), equalTo(hasDocValues() ? 1L : 4L));
|
assertThat(indicesFieldDataCache.getCache().weight(), equalTo(hasDocValues() ? 1L : 4L));
|
||||||
|
|
||||||
IndexOrdinalsFieldData cachedInstance = null;
|
IndexOrdinalsFieldData cachedInstance = null;
|
||||||
for (Accountable ramUsage : indicesFieldDataCache.getCache().asMap().values()) {
|
for (Accountable ramUsage : indicesFieldDataCache.getCache().values()) {
|
||||||
if (ramUsage instanceof IndexOrdinalsFieldData) {
|
if (ramUsage instanceof IndexOrdinalsFieldData) {
|
||||||
cachedInstance = (IndexOrdinalsFieldData) ramUsage;
|
cachedInstance = (IndexOrdinalsFieldData) ramUsage;
|
||||||
break;
|
break;
|
||||||
|
@ -613,12 +613,12 @@ public abstract class AbstractStringFieldDataTestCase extends AbstractFieldDataI
|
||||||
assertThat(cachedInstance, sameInstance(globalOrdinals));
|
assertThat(cachedInstance, sameInstance(globalOrdinals));
|
||||||
topLevelReader.close();
|
topLevelReader.close();
|
||||||
// Now only 3 segment level entries, only the toplevel reader has been closed, but the segment readers are still used by IW
|
// Now only 3 segment level entries, only the toplevel reader has been closed, but the segment readers are still used by IW
|
||||||
assertThat(indicesFieldDataCache.getCache().size(), equalTo(hasDocValues() ? 0L : 3L));
|
assertThat(indicesFieldDataCache.getCache().weight(), equalTo(hasDocValues() ? 0L : 3L));
|
||||||
|
|
||||||
refreshReader();
|
refreshReader();
|
||||||
assertThat(ifd.loadGlobal(topLevelReader), not(sameInstance(globalOrdinals)));
|
assertThat(ifd.loadGlobal(topLevelReader), not(sameInstance(globalOrdinals)));
|
||||||
|
|
||||||
ifdService.clear();
|
ifdService.clear();
|
||||||
assertThat(indicesFieldDataCache.getCache().size(), equalTo(0l));
|
assertThat(indicesFieldDataCache.getCache().weight(), equalTo(0l));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,7 +188,7 @@ public class RandomExceptionCircuitBreakerIT extends ESIntegTestCase {
|
||||||
for (String node : internalCluster().getNodeNames()) {
|
for (String node : internalCluster().getNodeNames()) {
|
||||||
final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesFieldDataCache.class, node);
|
final IndicesFieldDataCache fdCache = internalCluster().getInstance(IndicesFieldDataCache.class, node);
|
||||||
// Clean up the cache, ensuring that entries' listeners have been called
|
// Clean up the cache, ensuring that entries' listeners have been called
|
||||||
fdCache.getCache().cleanUp();
|
fdCache.getCache().refresh();
|
||||||
}
|
}
|
||||||
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
|
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
|
||||||
.clear().setBreaker(true).execute().actionGet();
|
.clear().setBreaker(true).execute().actionGet();
|
||||||
|
|
|
@ -1854,7 +1854,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||||
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesFieldDataCache.class, nodeAndClient.node);
|
final IndicesFieldDataCache fdCache = getInstanceFromNode(IndicesFieldDataCache.class, nodeAndClient.node);
|
||||||
// Clean up the cache, ensuring that entries' listeners have been called
|
// Clean up the cache, ensuring that entries' listeners have been called
|
||||||
fdCache.getCache().cleanUp();
|
fdCache.getCache().refresh();
|
||||||
|
|
||||||
final String name = nodeAndClient.name;
|
final String name = nodeAndClient.name;
|
||||||
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
|
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
|
||||||
|
|
Loading…
Reference in New Issue