diff --git a/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java b/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java index 1a3aeb276dc..a154b265042 100644 --- a/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java +++ b/src/main/java/org/elasticsearch/index/cache/filter/weighted/WeightedFilterCache.java @@ -83,12 +83,7 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte return; } seenReadersCount.dec(); - for (FilterCacheKey key : indicesFilterCache.cache().asMap().keySet()) { - if (key.readerKey() == readerKey) { - // invalidate will cause a removal and will be notified - indicesFilterCache.cache().invalidate(key); - } - } + indicesFilterCache.addReaderKeyToClean(readerKey); } } @@ -106,13 +101,7 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte return; } seenReadersCount.dec(); - Cache cache = indicesFilterCache.cache(); - for (FilterCacheKey key : cache.asMap().keySet()) { - if (key.readerKey() == reader.getCoreCacheKey()) { - // invalidate will cause a removal and will be notified - cache.invalidate(key); - } - } + indicesFilterCache.addReaderKeyToClean(reader.getCoreCacheKey()); } @Override @@ -247,7 +236,7 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte if (this == o) return true; // if (o == null || getClass() != o.getClass()) return false; FilterCacheKey that = (FilterCacheKey) o; - return (readerKey == that.readerKey && filterKey.equals(that.filterKey)); + return (readerKey.equals(that.readerKey) && filterKey.equals(that.filterKey)); } @Override diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index f0bab95d264..6b683466e1e 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -719,7 +719,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } - private class EngineRefresher implements Runnable { + class EngineRefresher implements Runnable { @Override public void run() { // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule @@ -767,7 +767,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } - private class EngineMerger implements Runnable { + class EngineMerger implements Runnable { @Override public void run() { if (!engine().possibleMergeNeeded()) { diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java index aa2f5d251b5..841fe1e336f 100644 --- a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java @@ -25,7 +25,9 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.ImmutableMap; +import gnu.trove.set.hash.THashSet; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.CacheRecycler; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -33,21 +35,33 @@ import org.elasticsearch.common.lucene.docset.DocSet; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.threadpool.ThreadPool; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; public class IndicesFilterCache extends AbstractComponent implements RemovalListener { + private final ThreadPool threadPool; + private Cache cache; private volatile String size; private volatile long sizeInBytes; private volatile TimeValue expire; + private final TimeValue cleanInterval; + + private final Set readersKeysToClean = ConcurrentCollections.newConcurrentSet(); + + private volatile boolean closed; + private volatile Map> removalListeners = ImmutableMap.of(); @@ -85,15 +99,19 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList } @Inject - public IndicesFilterCache(Settings settings, NodeSettingsService nodeSettingsService) { + public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) { super(settings); + this.threadPool = threadPool; this.size = componentSettings.get("size", "20%"); this.expire = componentSettings.getAsTime("expire", null); + this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(1)); computeSizeInBytes(); buildCache(); logger.debug("using [node] filter cache with size [{}], actual_size [{}]", size, new ByteSizeValue(sizeInBytes)); nodeSettingsService.addListener(new ApplySettings()); + + threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, new ReaderCleaner()); } private void buildCache() { @@ -102,7 +120,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList .maximumWeight(sizeInBytes).weigher(new WeightedFilterCache.FilterCacheValueWeigher()); // defaults to 4, but this is a busy map for all indices, increase it a bit - cacheBuilder.concurrencyLevel(8); + cacheBuilder.concurrencyLevel(16); if (expire != null) { cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS); @@ -128,7 +146,12 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList removalListeners = MapBuilder.newMapBuilder(removalListeners).remove(index).immutableMap(); } + public void addReaderKeyToClean(Object readerKey) { + readersKeysToClean.add(readerKey); + } + public void close() { + closed = true; cache.invalidateAll(); } @@ -147,4 +170,49 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList listener.onRemoval(removalNotification); } } + + /** + * The reason we need this class ie because we need to clean all the filters that are associated + * with a reader. We don't want to do it every time a reader closes, since iterating over all the map + * is expensive. There doesn't seem to be a nicer way to do it (and maintaining a list per reader + * of the filters will cost more). + */ + class ReaderCleaner implements Runnable { + + @Override + public void run() { + if (closed) { + return; + } + if (readersKeysToClean.isEmpty()) { + threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this); + return; + } + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + THashSet keys = CacheRecycler.popHashSet(); + try { + for (Iterator it = readersKeysToClean.iterator(); it.hasNext(); ) { + keys.add(it.next()); + it.remove(); + } + cache.cleanUp(); + if (!keys.isEmpty()) { + for (Iterator it = cache.asMap().keySet().iterator(); it.hasNext(); ) { + WeightedFilterCache.FilterCacheKey filterCacheKey = it.next(); + if (keys.contains(filterCacheKey.readerKey())) { + // same as invalidate + it.remove(); + } + } + } + threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, ReaderCleaner.this); + } finally { + CacheRecycler.pushHashSet(keys); + } + } + }); + } + } } \ No newline at end of file