From 5cd6ced7eefaea10882ea510d9b1f473c7f45860 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 29 May 2015 09:44:41 +0200 Subject: [PATCH] Close ShardFilterCache after Store is closed The ShardFilterCache relies on the fact that it's closed once the last reader on the shard is closed. This is only guaranteed once the Store and all its references are closed. This commit moves the closing into the internal callback mechanism we use for deleting shard data etc. to close the cache once we have all searchers released. --- .../org/elasticsearch/index/IndexService.java | 25 ++++++++++++++----- .../index/cache/filter/ShardFilterCache.java | 11 ++++---- .../cache/filter/ShardFilterCacheModule.java | 8 +++++- .../elasticsearch/index/shard/IndexShard.java | 4 +-- .../test/InternalTestCluster.java | 9 +++---- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index 6276e682123..09335126c73 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCacheModule; +import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.filter.ShardFilterCacheModule; import org.elasticsearch.index.cache.query.ShardQueryCacheModule; import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; @@ -69,6 +70,7 @@ import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; +import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; @@ -298,11 +300,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings); - logger.debug("creating shard_id {}", shardId); // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false || (primary && IndexMetaData.isOnSharedFilesystem(indexSettings)); + final ShardFilterCache shardFilterCache = new ShardFilterCache(shardId, injector.getInstance(IndicesFilterCache.class)); ModulesBuilder modules = new ModulesBuilder(); modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); modules.add(new IndexShardModule(shardId, primary, indexSettings)); @@ -310,11 +312,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone modules.add(new ShardSearchModule()); modules.add(new ShardGetModule()); modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock, - new StoreCloseListener(shardId, canDeleteShardContent), path)); + new StoreCloseListener(shardId, canDeleteShardContent, shardFilterCache), path)); modules.add(new DeletionPolicyModule(indexSettings)); modules.add(new MergePolicyModule(indexSettings)); modules.add(new MergeSchedulerModule(indexSettings)); - modules.add(new ShardFilterCacheModule()); + modules.add(new ShardFilterCacheModule(shardFilterCache)); modules.add(new ShardQueryCacheModule()); modules.add(new ShardBitsetFilterCacheModule()); modules.add(new ShardFieldDataModule()); @@ -465,16 +467,27 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; private final boolean ownsShard; + private final Closeable[] toClose; - public StoreCloseListener(ShardId shardId, boolean ownsShard) { + public StoreCloseListener(ShardId shardId, boolean ownsShard, Closeable... toClose) { this.shardId = shardId; this.ownsShard = ownsShard; + this.toClose = toClose; } @Override public void handle(ShardLock lock) { - assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId(); - onShardClose(lock, ownsShard); + try { + assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId(); + onShardClose(lock, ownsShard); + } finally { + try { + IOUtils.close(toClose); + } catch (IOException ex) { + logger.debug("failed to close resource", ex); + } + } + } } diff --git a/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java b/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java index 97f75094580..550c25bdbf6 100644 --- a/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java +++ b/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCache.java @@ -28,17 +28,18 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** */ -public class ShardFilterCache extends AbstractIndexShardComponent implements Closeable { - +public class ShardFilterCache implements Closeable { final IndicesFilterCache cache; + final ShardId shardId; - @Inject - public ShardFilterCache(ShardId shardId, @IndexSettings Settings indexSettings, IndicesFilterCache cache) { - super(shardId, indexSettings); + public ShardFilterCache(ShardId shardId, IndicesFilterCache cache) { this.cache = cache; + this.shardId = shardId; } public FilterCacheStats stats() { diff --git a/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCacheModule.java b/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCacheModule.java index 749fd8e392c..37bcb805768 100644 --- a/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCacheModule.java +++ b/src/main/java/org/elasticsearch/index/cache/filter/ShardFilterCacheModule.java @@ -25,8 +25,14 @@ import org.elasticsearch.common.inject.AbstractModule; */ public class ShardFilterCacheModule extends AbstractModule { + private final ShardFilterCache shardFilterCache; + + public ShardFilterCacheModule(ShardFilterCache shardFilterCache) { + this.shardFilterCache = shardFilterCache; + } + @Override protected void configure() { - bind(ShardFilterCache.class).asEagerSingleton(); + bind(ShardFilterCache.class).toInstance(shardFilterCache); } } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 07b4bf1f940..06400a3bf82 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -161,7 +161,6 @@ public class IndexShard extends AbstractIndexShardComponent { private final SnapshotDeletionPolicy deletionPolicy; private final SimilarityService similarityService; private final MergePolicyProvider mergePolicyProvider; - private final BigArrays bigArrays; private final EngineConfig engineConfig; private final TranslogConfig translogConfig; @@ -212,7 +211,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.deletionPolicy = deletionPolicy; this.similarityService = similarityService; this.mergePolicyProvider = mergePolicyProvider; - this.bigArrays = bigArrays; Preconditions.checkNotNull(store, "Store must be provided to the index shard"); Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard"); this.engineFactory = factory; @@ -794,7 +792,7 @@ public class IndexShard extends AbstractIndexShardComponent { engine.flushAndClose(); } } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times - IOUtils.close(engine, shardFilterCache); + IOUtils.close(engine); } } } diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 079008ef8cb..285463a176b 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -801,12 +801,11 @@ public final class InternalTestCluster extends TestCluster { } void resetClient() throws IOException { - if (closed.get()) { - throw new RuntimeException("already closed"); + if (closed.get() == false) { + Releasables.close(nodeClient, transportClient); + nodeClient = null; + transportClient = null; } - Releasables.close(nodeClient, transportClient); - nodeClient = null; - transportClient = null; } void closeNode() {