From 89e03910f426d334147d2d6edce1ea3d50186d72 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 24 Jul 2014 15:37:55 +0200 Subject: [PATCH] Add a periodic cleanup thread for IndexFieldCache caches Fixes #7010 --- .../fielddata/IndexFieldDataService.java | 5 +- .../cache/IndicesFieldDataCache.java | 68 +++++++++++++++++-- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index bac8fae03e2..7e7ba5158b0 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -42,6 +42,7 @@ import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collection; @@ -143,7 +144,9 @@ public class IndexFieldDataService extends AbstractIndexComponent { // public for testing public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) { - this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService)), circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService)); + this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, + new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService), new ThreadPool("testing-only")), + circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService)); } // public for testing diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 7241b1019ba..9764c1056ec 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -32,15 +32,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.fielddata.AtomicFieldData; -import org.elasticsearch.index.fielddata.FieldDataType; -import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; @@ -51,12 +49,18 @@ import java.util.concurrent.TimeUnit; */ public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener { + public static final String FIELDDATA_CLEAN_INTERVAL_SETTING = "indices.fielddata.cache.cleanup_interval"; + private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; private final Cache cache; + private final TimeValue cleanInterval; + private final ThreadPool threadPool; + private volatile boolean closed = false; @Inject - public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { + public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener, ThreadPool threadPool) { super(settings); + this.threadPool = threadPool; this.indicesFieldDataCacheListener = indicesFieldDataCacheListener; String size = componentSettings.get("size", "-1"); long sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes(); @@ -79,10 +83,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL } logger.debug("using size [{}] [{}], expire [{}]", size, new ByteSizeValue(sizeInBytes), expire); cache = cacheBuilder.build(); + + this.cleanInterval = settings.getAsTime(FIELDDATA_CLEAN_INTERVAL_SETTING, TimeValue.timeValueMinutes(1)); + // Start thread that will manage cleaning the field data cache periodically + threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, + new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval)); } public void close() { cache.invalidateAll(); + this.closed = true; } public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) { @@ -146,6 +156,13 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL assert indexService != null; } + /** + * Clean up the internal Guava cache + */ + public void cleanUp() { + cache.cleanUp(); + } + @Override public > FD load(final AtomicReaderContext context, final IFD indexFieldData) throws Exception { final Key key = new Key(this, context.reader().getCoreCacheKey()); @@ -288,4 +305,45 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL return result; } } + + /** + * FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache + * periodically. In this case it is the field data cache, because a cache that + * has an entry invalidated may not clean up the entry if it is not read from + * or written to after invalidation. + */ + public class FieldDataCacheCleaner implements Runnable { + + private final Cache cache; + private final ESLogger logger; + private final ThreadPool threadPool; + private final TimeValue interval; + + public FieldDataCacheCleaner(Cache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) { + this.cache = cache; + this.logger = logger; + this.threadPool = threadPool; + this.interval = interval; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + if (logger.isTraceEnabled()) { + logger.trace("running periodic field data cache cleanup"); + } + try { + this.cache.cleanUp(); + } catch (Exception e) { + logger.warn("Exception during periodic field data cache cleanup:", e); + } + if (logger.isTraceEnabled()) { + logger.trace("periodic field data cache cleanup finished in {} milliseconds", System.currentTimeMillis() - startTime); + } + // Reschedule itself to run again if not closed + if (closed == false) { + threadPool.schedule(interval, ThreadPool.Names.SAME, this); + } + } + } }