diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java index 2b6b9ace28c..024a892bde3 100644 --- a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java @@ -26,10 +26,8 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.lucene.search.DocIdSet; -import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.MemorySizeValue; @@ -47,7 +45,6 @@ import java.util.concurrent.TimeUnit; public class IndicesFilterCache extends AbstractComponent implements RemovalListener { private final ThreadPool threadPool; - private final CacheRecycler cacheRecycler; private Cache cache; @@ -91,10 +88,9 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList } @Inject - public IndicesFilterCache(Settings settings, ThreadPool threadPool, CacheRecycler cacheRecycler, NodeSettingsService nodeSettingsService) { + public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) { super(settings); this.threadPool = threadPool; - this.cacheRecycler = cacheRecycler; this.size = componentSettings.get("size", "10%"); this.expire = componentSettings.getAsTime("expire", null); this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60)); @@ -167,6 +163,10 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList */ class ReaderCleaner implements Runnable { + // this is thread safe since we only schedule the next cleanup once the current one is + // done, so no concurrent execution + private final ObjectOpenHashSet keys = ObjectOpenHashSet.newInstance(); + @Override public void run() { if (closed) { @@ -180,33 +180,30 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { @Override public void run() { - Recycler.V> keys = cacheRecycler.hashSet(-1); - try { - for (Iterator it = readersKeysToClean.iterator(); it.hasNext(); ) { - keys.v().add(it.next()); - it.remove(); - } - cache.cleanUp(); - if (!keys.v().isEmpty()) { - for (Iterator it = cache.asMap().keySet().iterator(); it.hasNext(); ) { - WeightedFilterCache.FilterCacheKey filterCacheKey = it.next(); - if (keys.v().contains(filterCacheKey.readerKey())) { - // same as invalidate - it.remove(); - } + keys.clear(); + for (Iterator it = readersKeysToClean.iterator(); it.hasNext(); ) { + keys.add(it.next()); + it.remove(); + } + cache.cleanUp(); + if (!keys.isEmpty()) { + for (Iterator it = cache.asMap().keySet().iterator(); it.hasNext(); ) { + WeightedFilterCache.FilterCacheKey filterCacheKey = it.next(); + if (keys.contains(filterCacheKey.readerKey())) { + // same as invalidate + it.remove(); } } - schedule(); - } finally { - keys.close(); } + schedule(); + keys.clear(); } }); } catch (EsRejectedExecutionException ex) { logger.debug("Can not run ReaderCleaner - execution rejected", ex); - } + } } - + private void schedule() { try { threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);