From e11cbed5348cdeac30e8e5d306f34d5f0b1d7dc4 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 4 Aug 2017 19:51:15 +0200 Subject: [PATCH] Adding a refresh listener to a recovering shard should be a noop (#26055) When `refresh=wait_for` is set on an indexing request, we register a listener on the shards that are call during the next refresh. During the recover translog phase, when the engine is open, we have a window of time when indexing operations succeed and they can add their listeners. Those listeners will only be called when the recovery finishes as we do not refresh during recoveries (unless the indexing buffer is full). Next to being a bad user experience, it can also cause deadlocks with an ongoing peer recovery that may wait for those operations to mark the replica in sync (details below). To fix this, this PR changes refresh listeners to be a noop when the shard is not yet serving reads (implicitly covering the recovery period). It doesn't matter anyway. Deadlock with recovery: When finalizing a peer recovery we mark the peer as "in sync". To do so we wait until the peer's local checkpoint is at least as high as the global checkpoint. If an operation with `refresh=wait_for` is added as a listener on that peer during recovery, it is not completed from the perspective of the primary. The primary than may wait for it to complete before advancing the local checkpoint for that peer. Since that peer is not considered in sync, the global checkpoint on the primary can be higher, causing a deadlock. Operation waits for recovery to finish and a refresh to happen. Recovery waits on the operation. --- .../org/elasticsearch/index/IndexService.java | 24 ++---- .../elasticsearch/index/shard/IndexShard.java | 31 ++++++- .../index/shard/IndexShardTests.java | 80 +++++++++++++++---- .../rest-api-spec/test/bulk/50_refresh.yml | 21 ----- .../index/shard/IndexShardTestCase.java | 4 +- 5 files changed, 103 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index f8340ebc820..a4d03929cbb 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -675,24 +675,14 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private void maybeRefreshEngine() { if (indexSettings.getRefreshInterval().millis() > 0) { for (IndexShard shard : this.shards.values()) { - switch (shard.state()) { - case CREATED: - case RECOVERING: - case CLOSED: - continue; - case POST_RECOVERY: - case STARTED: - case RELOCATED: - try { - if (shard.isRefreshNeeded()) { - shard.refresh("schedule"); - } - } catch (IndexShardClosedException | AlreadyClosedException ex) { - // fine - continue; + if (shard.isReadAllowed()) { + try { + if (shard.isRefreshNeeded()) { + shard.refresh("schedule"); } - continue; - default: - throw new IllegalStateException("unknown state: " + shard.state()); + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // fine - continue; + } } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b1644e5f219..af2c45e42cd 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -847,8 +847,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } public RefreshStats refreshStats() { - // Null refreshListeners means this shard doesn't support them so there can't be any. - int listeners = refreshListeners == null ? 0 : refreshListeners.pendingCount(); + int listeners = refreshListeners.pendingCount(); return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()), listeners); } @@ -1155,6 +1154,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (state == IndexShardState.RELOCATED) { throw new IndexShardRelocatedException(shardId); } + // we need to refresh again to expose all operations that were index until now. Otherwise + // we may not expose operations that were indexed with a refresh listener that was immediately + // responded to in addRefreshListener. + getEngine().refresh("post_recovery"); recoveryState.setStage(RecoveryState.Stage.DONE); changeState(IndexShardState.POST_RECOVERY, reason); } @@ -1324,6 +1327,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } + assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; final Engine engine = this.currentEngineReference.getAndSet(null); IOUtils.close(engine); recoveryState().setStage(RecoveryState.Stage.INIT); @@ -1372,6 +1376,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + /** returns true if the {@link IndexShardState} allows reading */ + public boolean isReadAllowed() { + return readAllowedStates.contains(state); + } + private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read @@ -2356,7 +2365,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * false otherwise. */ public void addRefreshListener(Translog.Location location, Consumer listener) { - refreshListeners.addOrNotify(location, listener); + final boolean readAllowed; + if (isReadAllowed()) { + readAllowed = true; + } else { + // check again under mutex. this is important to create a happens before relationship + // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond + // to a listener before a refresh actually happened that contained that operation. + synchronized (mutex) { + readAllowed = isReadAllowed(); + } + } + if (readAllowed) { + refreshListeners.addOrNotify(location, listener); + } else { + // we're not yet ready fo ready for reads, just ignore refresh cycles + listener.accept(false); + } } private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 20411c70f4f..1332df658c4 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -993,7 +993,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard test = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(test); + recoverShardFromStore(test); indexDoc(test, "test", "test"); assertEquals(versionCreated.luceneVersion, test.minimumCompatibleVersion()); @@ -1040,14 +1040,14 @@ public class IndexShardTests extends IndexShardTestCase { public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); - assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // one refresh on end of recovery, one on starting shard + assertThat(shard.refreshStats().getTotal(), equalTo(3L)); // refresh on: finalize, end of recovery and on starting shard long initialTotalTime = shard.refreshStats().getTotalTimeInMillis(); // check time advances for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) { indexDoc(shard, "test", "test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1)); + assertThat(shard.refreshStats().getTotal(), equalTo(3L + i - 1)); shard.refresh("test"); - assertThat(shard.refreshStats().getTotal(), equalTo(2L + i)); + assertThat(shard.refreshStats().getTotal(), equalTo(3L + i)); assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime)); } long refreshCount = shard.refreshStats().getTotal(); @@ -1130,7 +1130,7 @@ public class IndexShardTests extends IndexShardTestCase { } }); - recoveryShardFromStore(shard); + recoverShardFromStore(shard); indexDoc(shard, "test", "1"); assertEquals(1, preIndex.get()); @@ -1679,7 +1679,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); try (Engine.Searcher searcher = newShard.acquireSearcher("test")) { TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10); @@ -1718,7 +1718,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, wrapper); - recoveryShardFromStore(shard); + recoverShardFromStore(shard); indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); shard.refresh("created segment 1"); indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}"); @@ -1788,7 +1788,7 @@ public class IndexShardTests extends IndexShardTestCase { } }; final IndexShard newShard = reinitShard(shard, listener); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); IndexingStats indexingStats = newShard.indexingStats(); // ensure we are not influencing the indexing stats assertEquals(0, indexingStats.getTotal().getDeleteCount()); @@ -1824,7 +1824,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()), shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); try { newShard.acquireSearcher("test"); @@ -1845,7 +1845,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(primary); + recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); @@ -1947,7 +1947,7 @@ public class IndexShardTests extends IndexShardTestCase { .settings(settings) .primaryTerm(0, 1).build(); IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(primary); + recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); @@ -1978,6 +1978,58 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(primary, replica); } + public void testRefreshListenersDuringPeerRecovery() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + Consumer assertListenerCalled = shard -> { + AtomicBoolean called = new AtomicBoolean(); + shard.addRefreshListener(null, b -> { + assertFalse(b); + called.set(true); + }); + assertTrue(called.get()); + }; + IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + assertListenerCalled.accept(replica); + recoverReplica(replica, primary, (shard, discoveryNode) -> + new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { + }) { + // we're only checking that listeners are called when the engine is open, before there is no point + @Override + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(totalTranslogOps); + assertListenerCalled.accept(replica); + } + + @Override + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + assertListenerCalled.accept(replica); + return localCheckpoint; + } + + @Override + public void finalizeRecovery(long globalCheckpoint) { + super.finalizeRecovery(globalCheckpoint); + assertListenerCalled.accept(replica); + } + }, false); + + closeShards(primary, replica); + } + public void testRecoverFromLocalShard() throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -1989,7 +2041,7 @@ public class IndexShardTests extends IndexShardTestCase { .primaryTerm(0, 1).build(); IndexShard sourceShard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); - recoveryShardFromStore(sourceShard); + recoverShardFromStore(sourceShard); indexDoc(sourceShard, "test", "0", "{\"foo\" : \"bar\"}"); indexDoc(sourceShard, "test", "1", "{\"foo\" : \"bar\"}"); @@ -2011,7 +2063,7 @@ public class IndexShardTests extends IndexShardTestCase { }; final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true); - recoveryShardFromStore(differentIndex); + recoverShardFromStore(differentIndex); expectThrows(IllegalArgumentException.class, () -> { targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex)); }); @@ -2038,7 +2090,7 @@ public class IndexShardTests extends IndexShardTestCase { // now check that it's persistent ie. that the added shards are committed { final IndexShard newShard = reinitShard(targetShard); - recoveryShardFromStore(newShard); + recoverShardFromStore(newShard); assertDocCount(newShard, 2); closeShards(newShard); } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml index 7f2a630c083..6326b9464ca 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml @@ -1,24 +1,3 @@ ---- - setup: - - do: - cluster.put_settings: - body: - persistent: - logger._root: debug - ---- - teardown: - - do: - cluster.put_settings: - body: - persistent: - # this is not exactly correct as tests could be running - # under a different logging level; we sacrifice correctness - # here for now in the hopes of quickly understanding what is - # causing this test to fail and simply reverting the change - # here - logger._root: null - --- "refresh=true immediately makes changes are visible in search": - do: diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 96d92d3da9f..48523570933 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -329,7 +329,7 @@ public abstract class IndexShardTestCase extends ESTestCase { protected IndexShard newStartedShard(boolean primary) throws IOException { IndexShard shard = newShard(primary); if (primary) { - recoveryShardFromStore(shard); + recoverShardFromStore(shard); } else { recoveryEmptyReplica(shard); } @@ -352,7 +352,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } } - protected void recoveryShardFromStore(IndexShard primary) throws IOException { + protected void recoverShardFromStore(IndexShard primary) throws IOException { primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null));