diff --git a/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java b/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java index 166c7d7794a..42023732f09 100644 --- a/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java +++ b/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java @@ -209,12 +209,19 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte } + /** A weigher for the Guava filter cache that uses a minimum entry size */ public static class FilterCacheValueWeigher implements Weigher { + private final int minimumEntrySize; + + public FilterCacheValueWeigher(int minimumEntrySize) { + this.minimumEntrySize = minimumEntrySize; + } + @Override public int weigh(FilterCacheKey key, DocIdSet value) { int weight = (int) Math.min(DocIdSets.sizeInBytes(value), Integer.MAX_VALUE); - return weight == 0 ? 1 : weight; + return Math.max(weight, this.minimumEntrySize); } } diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java index df3e9165650..e344f2fa343 100644 --- a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java @@ -55,15 +55,17 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList private volatile int concurrencyLevel; private final TimeValue cleanInterval; + private final int minimumEntryWeight; private final Set readersKeysToClean = ConcurrentCollections.newConcurrentSet(); private volatile boolean closed; - public static final String INDICES_CACHE_FILTER_SIZE = "indices.cache.filter.size"; public static final String INDICES_CACHE_FILTER_EXPIRE = "indices.cache.filter.expire"; public static final String INDICES_CACHE_FILTER_CONCURRENCY_LEVEL = "indices.cache.filter.concurrency_level"; + public static final String INDICES_CACHE_FILTER_CLEAN_INTERVAL = "indices.cache.filter.clean_interval"; + public static final String INDICES_CACHE_FILTER_MINIMUM_ENTRY_WEIGHT = "indices.cache.filter.minimum_entry_weight"; class ApplySettings implements NodeSettingsService.Listener { @Override @@ -71,13 +73,15 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList boolean replace = false; String size = settings.get(INDICES_CACHE_FILTER_SIZE, IndicesFilterCache.this.size); if (!size.equals(IndicesFilterCache.this.size)) { - logger.info("updating [indices.cache.filter.size] from [{}] to [{}]", IndicesFilterCache.this.size, size); + logger.info("updating [{}] from [{}] to [{}]", + INDICES_CACHE_FILTER_SIZE, IndicesFilterCache.this.size, size); IndicesFilterCache.this.size = size; replace = true; } TimeValue expire = settings.getAsTime(INDICES_CACHE_FILTER_EXPIRE, IndicesFilterCache.this.expire); if (!Objects.equal(expire, IndicesFilterCache.this.expire)) { - logger.info("updating [indices.cache.filter.expire] from [{}] to [{}]", IndicesFilterCache.this.expire, expire); + logger.info("updating [{}] from [{}] to [{}]", + INDICES_CACHE_FILTER_EXPIRE, IndicesFilterCache.this.expire, expire); IndicesFilterCache.this.expire = expire; replace = true; } @@ -86,7 +90,8 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel); } if (!Objects.equal(concurrencyLevel, IndicesFilterCache.this.concurrencyLevel)) { - logger.info("updating [indices.cache.filter.concurrency_level] from [{}] to [{}]", IndicesFilterCache.this.concurrencyLevel, concurrencyLevel); + logger.info("updating [{}] from [{}] to [{}]", + INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, IndicesFilterCache.this.concurrencyLevel, concurrencyLevel); IndicesFilterCache.this.concurrencyLevel = concurrencyLevel; replace = true; } @@ -103,9 +108,14 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) { super(settings); this.threadPool = threadPool; - this.size = componentSettings.get("size", "10%"); - this.expire = componentSettings.getAsTime("expire", null); - this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60)); + this.size = settings.get(INDICES_CACHE_FILTER_SIZE, "10%"); + this.expire = settings.getAsTime(INDICES_CACHE_FILTER_EXPIRE, null); + this.minimumEntryWeight = settings.getAsInt(INDICES_CACHE_FILTER_MINIMUM_ENTRY_WEIGHT, 1024); // 1k per entry minimum + if (minimumEntryWeight <= 0) { + throw new ElasticsearchIllegalArgumentException("minimum_entry_weight must be > 0 but was: " + minimumEntryWeight); + } + this.cleanInterval = settings.getAsTime(INDICES_CACHE_FILTER_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60)); + // defaults to 4, but this is a busy map for all indices, increase it a bit this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, 16); if (concurrencyLevel <= 0) { throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel); @@ -122,10 +132,9 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList private void buildCache() { CacheBuilder cacheBuilder = CacheBuilder.newBuilder() .removalListener(this) - .maximumWeight(sizeInBytes).weigher(new WeightedFilterCache.FilterCacheValueWeigher()); + .maximumWeight(sizeInBytes).weigher(new WeightedFilterCache.FilterCacheValueWeigher(minimumEntryWeight)); - // defaults to 4, but this is a busy map for all indices, increase it a bit - cacheBuilder.concurrencyLevel(16); + cacheBuilder.concurrencyLevel(this.concurrencyLevel); if (expire != null) { cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS); @@ -145,6 +154,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList public void close() { closed = true; cache.invalidateAll(); + cache.cleanUp(); } public Cache cache() {