diff --git a/core/src/main/java/org/elasticsearch/action/fieldstats/TransportFieldStatsTransportAction.java b/core/src/main/java/org/elasticsearch/action/fieldstats/TransportFieldStatsTransportAction.java index bf70fa4910e..d0fb90f455c 100644 --- a/core/src/main/java/org/elasticsearch/action/fieldstats/TransportFieldStatsTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/fieldstats/TransportFieldStatsTransportAction.java @@ -133,7 +133,6 @@ public class TransportFieldStatsTransportAction extends TransportBroadcastAction IndexService indexServices = indicesService.indexServiceSafe(shardId.getIndex()); MapperService mapperService = indexServices.mapperService(); IndexShard shard = indexServices.shardSafe(shardId.id()); - shard.readAllowed(); try (Engine.Searcher searcher = shard.acquireSearcher("fieldstats")) { for (String field : request.getFields()) { MappedFieldType fieldType = mapperService.fullName(field); diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java index 7c194f00c64..22ca67b9c62 100644 --- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java +++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java @@ -257,7 +257,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple } @Override - public void afterIndexShardPostRecovery(IndexShard indexShard) { + public void beforeIndexShardPostRecovery(IndexShard indexShard) { if (hasPercolatorType(indexShard)) { // percolator index has started, fetch what we can from it and initialize the indices // we have @@ -274,8 +274,9 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple private int loadQueries(IndexShard shard) { shard.refresh("percolator_load_queries"); - // Maybe add a mode load? This isn't really a write. We need write b/c state=post_recovery - try (Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries", true)) { + // NOTE: we acquire the searcher via the engine directly here since this is executed right + // before the shard is marked as POST_RECOVERY + try (Engine.Searcher searcher = shard.engine().acquireSearcher("percolator_load_queries")) { Query query = new TermQuery(new Term(TypeFieldMapper.NAME, PercolatorService.TYPE_NAME)); QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, mapperService, indexFieldDataService); searcher.searcher().search(query, queryCollector); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 74a3dae27bb..af121a6f707 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -742,11 +742,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public Engine.Searcher acquireSearcher(String source) { - return acquireSearcher(source, false); - } - - public Engine.Searcher acquireSearcher(String source, boolean searcherForWriteOperation) { - readAllowed(searcherForWriteOperation); + readAllowed(); return engine().acquireSearcher(source); } @@ -776,6 +772,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { + indicesLifecycle.beforeIndexShardPostRecovery(this); synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); @@ -907,20 +904,9 @@ public class IndexShard extends AbstractIndexShardComponent { } public void readAllowed() throws IllegalIndexShardStateException { - readAllowed(false); - } - - - private void readAllowed(boolean writeOperation) throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read - if (writeOperation) { - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { - throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated"); - } - } else { - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { - throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated"); - } + if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { + throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated"); } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java b/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java index 42262c81e9e..211b6d4869d 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesLifecycle.java @@ -97,9 +97,16 @@ public interface IndicesLifecycle { } - public void afterIndexShardPostRecovery(IndexShard indexShard) { + /** + * Called right after the shard is moved into POST_RECOVERY mode + */ + public void afterIndexShardPostRecovery(IndexShard indexShard) {} - } + /** + * Called right before the shard is moved into POST_RECOVERY mode. + * The shard is ready to be used but not yet marked as POST_RECOVERY. + */ + public void beforeIndexShardPostRecovery(IndexShard indexShard) {} /** * Called after the index shard has been started. diff --git a/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java b/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java index 046acb7f625..77050714db2 100644 --- a/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java +++ b/core/src/main/java/org/elasticsearch/indices/InternalIndicesLifecycle.java @@ -121,6 +121,18 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic } } + public void beforeIndexShardPostRecovery(IndexShard indexShard) { + for (Listener listener : listeners) { + try { + listener.beforeIndexShardPostRecovery(indexShard); + } catch (Throwable t) { + logger.warn("{} failed to invoke before shard post recovery callback", t, indexShard.shardId()); + throw t; + } + } + } + + public void afterIndexShardPostRecovery(IndexShard indexShard) { for (Listener listener : listeners) { try {