Close ShardFilterCache after Store is closed

The ShardFilterCache relies on the fact that it's
closed once the last reader on the shard is closed.
This is only guaranteed once the Store and all its
references are closed. This commit moves the closing
into the internal callback mechanism we use for deleting
shard data etc. to close the cache once we have all
searchers released.
This commit is contained in:
Simon Willnauer 2015-05-29 09:44:41 +02:00
parent 87a0c76e9c
commit 5cd6ced7ee
5 changed files with 37 additions and 20 deletions

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCacheModule;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.filter.ShardFilterCacheModule;
import org.elasticsearch.index.cache.query.ShardQueryCacheModule;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
@ -69,6 +70,7 @@ import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
@ -298,11 +300,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);
logger.debug("creating shard_id {}", shardId);
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
final ShardFilterCache shardFilterCache = new ShardFilterCache(shardId, injector.getInstance(IndicesFilterCache.class));
ModulesBuilder modules = new ModulesBuilder();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(shardId, primary, indexSettings));
@ -310,11 +312,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock,
new StoreCloseListener(shardId, canDeleteShardContent), path));
new StoreCloseListener(shardId, canDeleteShardContent, shardFilterCache), path));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
modules.add(new ShardFilterCacheModule());
modules.add(new ShardFilterCacheModule(shardFilterCache));
modules.add(new ShardQueryCacheModule());
modules.add(new ShardBitsetFilterCacheModule());
modules.add(new ShardFieldDataModule());
@ -465,16 +467,27 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;
private final Closeable[] toClose;
public StoreCloseListener(ShardId shardId, boolean ownsShard) {
public StoreCloseListener(ShardId shardId, boolean ownsShard, Closeable... toClose) {
this.shardId = shardId;
this.ownsShard = ownsShard;
this.toClose = toClose;
}
@Override
public void handle(ShardLock lock) {
assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock, ownsShard);
try {
assert lock.getShardId().equals(shardId) : "shard id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock, ownsShard);
} finally {
try {
IOUtils.close(toClose);
} catch (IOException ex) {
logger.debug("failed to close resource", ex);
}
}
}
}

View File

@ -28,17 +28,18 @@ import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*/
public class ShardFilterCache extends AbstractIndexShardComponent implements Closeable {
public class ShardFilterCache implements Closeable {
final IndicesFilterCache cache;
final ShardId shardId;
@Inject
public ShardFilterCache(ShardId shardId, @IndexSettings Settings indexSettings, IndicesFilterCache cache) {
super(shardId, indexSettings);
public ShardFilterCache(ShardId shardId, IndicesFilterCache cache) {
this.cache = cache;
this.shardId = shardId;
}
public FilterCacheStats stats() {

View File

@ -25,8 +25,14 @@ import org.elasticsearch.common.inject.AbstractModule;
*/
public class ShardFilterCacheModule extends AbstractModule {
private final ShardFilterCache shardFilterCache;
public ShardFilterCacheModule(ShardFilterCache shardFilterCache) {
this.shardFilterCache = shardFilterCache;
}
@Override
protected void configure() {
bind(ShardFilterCache.class).asEagerSingleton();
bind(ShardFilterCache.class).toInstance(shardFilterCache);
}
}

View File

@ -161,7 +161,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService;
private final MergePolicyProvider mergePolicyProvider;
private final BigArrays bigArrays;
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig;
@ -212,7 +211,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.deletionPolicy = deletionPolicy;
this.similarityService = similarityService;
this.mergePolicyProvider = mergePolicyProvider;
this.bigArrays = bigArrays;
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
this.engineFactory = factory;
@ -794,7 +792,7 @@ public class IndexShard extends AbstractIndexShardComponent {
engine.flushAndClose();
}
} finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
IOUtils.close(engine, shardFilterCache);
IOUtils.close(engine);
}
}
}

View File

@ -801,12 +801,11 @@ public final class InternalTestCluster extends TestCluster {
}
void resetClient() throws IOException {
if (closed.get()) {
throw new RuntimeException("already closed");
if (closed.get() == false) {
Releasables.close(nodeClient, transportClient);
nodeClient = null;
transportClient = null;
}
Releasables.close(nodeClient, transportClient);
nodeClient = null;
transportClient = null;
}
void closeNode() {