diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index f8340ebc820..a4d03929cbb 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -675,24 +675,14 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private void maybeRefreshEngine() { if (indexSettings.getRefreshInterval().millis() > 0) { for (IndexShard shard : this.shards.values()) { - switch (shard.state()) { - case CREATED: - case RECOVERING: - case CLOSED: - continue; - case POST_RECOVERY: - case STARTED: - case RELOCATED: - try { - if (shard.isRefreshNeeded()) { - shard.refresh("schedule"); - } - } catch (IndexShardClosedException | AlreadyClosedException ex) { - // fine - continue; + if (shard.isReadAllowed()) { + try { + if (shard.isRefreshNeeded()) { + shard.refresh("schedule"); } - continue; - default: - throw new IllegalStateException("unknown state: " + shard.state()); + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // fine - continue; + } } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b1644e5f219..af2c45e42cd 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -847,8 +847,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } public RefreshStats refreshStats() { - // Null refreshListeners means this shard doesn't support them so there can't be any. - int listeners = refreshListeners == null ? 0 : refreshListeners.pendingCount(); + int listeners = refreshListeners.pendingCount(); return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()), listeners); } @@ -1155,6 +1154,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (state == IndexShardState.RELOCATED) { throw new IndexShardRelocatedException(shardId); } + // we need to refresh again to expose all operations that were index until now. Otherwise + // we may not expose operations that were indexed with a refresh listener that was immediately + // responded to in addRefreshListener. + getEngine().refresh("post_recovery"); recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); } @@ -1324,6 +1327,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } + assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; final Engine engine = this.currentEngineReference.getAndSet(null); IOUtils.close(engine); recoveryState().setStage(RecoveryState.Stage.INIT); @@ -1372,6 +1376,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + /** returns true if the {@link IndexShardState} allows reading */ + public boolean isReadAllowed() { + return readAllowedStates.contains(state); + } + private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read @@ -2356,7 +2365,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * false otherwise. */ public void addRefreshListener(Translog.Location location, Consumer listener) { - refreshListeners.addOrNotify(location, listener); + final boolean readAllowed; + if (isReadAllowed()) { + readAllowed = true; + } else { + // check again under mutex. this is important to create a happens before relationship + // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond + // to a listener before a refresh actually happened that contained that operation. + synchronized (mutex) { + readAllowed = isReadAllowed(); + } + } + if (readAllowed) { + refreshListeners.addOrNotify(location, listener); + } else { + // we're not yet ready fo ready for reads, just ignore refresh cycles + listener.accept(false); + } } private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 20411c70f4f..1332df658c4 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -993,7 +993,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard test = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(test); + recoverShardFromStore(test); indexDoc(test, "test", "test"); assertEquals(versionCreated.luceneVersion, test.minimumCompatibleVersion()); @@ -1040,14 +1040,14 @@ public class IndexShardTests extends IndexShardTestCase { public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); - assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // one refresh on end of recovery, one on starting shard + assertThat(shard.refreshStats().getTotal(), equalTo(3L)); // refresh on: finalize, end of recovery and on starting shard long initialTotalTime = shard.refreshStats().getTotalTimeInMillis(); // check time advances for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) { indexDoc(shard, "test", "test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1)); + assertThat(shard.refreshStats().getTotal(), equalTo(3L + i - 1)); shard.refresh("test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i)); + assertThat(shard.refreshStats().getTotal(), equalTo(3L + i)); assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); } long refreshCount = shard.refreshStats().getTotal(); @@ -1130,7 +1130,7 @@ public class IndexShardTests extends IndexShardTestCase { } }); - recoveryShardFromStore(shard); + recoverShardFromStore(shard); indexDoc(shard, "test", "1"); assertEquals(1, preIndex.get()); @@ -1679,7 +1679,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); @@ -1718,7 +1718,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, wrapper); - recoveryShardFromStore(shard); + recoverShardFromStore(shard); indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); shard.refresh("created segment 1"); indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}"); @@ -1788,7 +1788,7 @@ public class IndexShardTests extends IndexShardTestCase { } }; final IndexShard newShard = reinitShard(shard, listener); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); IndexingStats indexingStats = newShard.indexingStats(); // ensure we are not influencing the indexing stats assertEquals(0, indexingStats.getTotal().getDeleteCount()); @@ -1824,7 +1824,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); try { newShard.acquireSearcher("test"); @@ -1845,7 +1845,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(primary); + recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); @@ -1947,7 +1947,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(primary); + recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); @@ -1978,6 +1978,58 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(primary, replica); } + public void testRefreshListenersDuringPeerRecovery() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + Consumer assertListenerCalled = shard -> { + AtomicBoolean called = new AtomicBoolean(); + shard.addRefreshListener(null, b -> { + assertFalse(b); + called.set(true); + }); + assertTrue(called.get()); + }; + IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + assertListenerCalled.accept(replica); + recoverReplica(replica, primary, (shard, discoveryNode) -> + new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { + }) { + // we're only checking that listeners are called when the engine is open, before there is no point + @Override + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(totalTranslogOps); + assertListenerCalled.accept(replica); + } + + @Override + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + assertListenerCalled.accept(replica); + return localCheckpoint; + } + + @Override + public void finalizeRecovery(long globalCheckpoint) { + super.finalizeRecovery(globalCheckpoint); + assertListenerCalled.accept(replica); + } + }, false); + + closeShards(primary, replica); + } + public void testRecoverFromLocalShard() throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -1989,7 +2041,7 @@ public class IndexShardTests extends IndexShardTestCase { .primaryTerm(0, 1).build(); IndexShard sourceShard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(sourceShard); + recoverShardFromStore(sourceShard); indexDoc(sourceShard, "test", "0", "{\"foo\" : \"bar\"}"); indexDoc(sourceShard, "test", "1", "{\"foo\" : \"bar\"}"); @@ -2011,7 +2063,7 @@ public class IndexShardTests extends IndexShardTestCase { }; final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true); - recoveryShardFromStore(differentIndex); + recoverShardFromStore(differentIndex); expectThrows(IllegalArgumentException.class, () -> { targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex)); }); @@ -2038,7 +2090,7 @@ public class IndexShardTests extends IndexShardTestCase { // now check that it's persistent ie. that the added shards are committed { final IndexShard newShard = reinitShard(targetShard); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); assertDocCount(newShard, 2); closeShards(newShard); } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml index 7f2a630c083..6326b9464ca 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml @@ -1,24 +1,3 @@ ---- - setup: - - do: - cluster.put_settings: - body: - persistent: - logger._root: debug - ---- - teardown: - - do: - cluster.put_settings: - body: - persistent: - # this is not exactly correct as tests could be running - # under a different logging level; we sacrifice correctness - # here for now in the hopes of quickly understanding what is - # causing this test to fail and simply reverting the change - # here - logger._root: null - --- "refresh=true immediately makes changes are visible in search": - do: diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 96d92d3da9f..48523570933 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -329,7 +329,7 @@ public abstract class IndexShardTestCase extends ESTestCase { protected IndexShard newStartedShard(boolean primary) throws IOException { IndexShard shard = newShard(primary); if (primary) { - recoveryShardFromStore(shard); + recoverShardFromStore(shard); } else { recoveryEmptyReplica(shard); } @@ -352,7 +352,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } } - protected void recoveryShardFromStore(IndexShard primary) throws IOException { + protected void recoverShardFromStore(IndexShard primary) throws IOException { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null));