Adding a refresh listener to a recovering shard should be a noop (#26055)

When `refresh=wait_for` is set on an indexing request, we register a listener on the shards that are call during the next refresh. During the recover translog phase, when the engine is open, we have a window of time when indexing operations succeed and they can add their listeners. Those listeners will only be called when the recovery finishes as we do not refresh during recoveries (unless the indexing buffer is full). Next to being a bad user experience, it can also cause deadlocks with an ongoing peer recovery that may wait for those operations to mark the replica in sync (details below).

To fix this, this PR changes refresh listeners to be a noop when the shard is not yet serving reads (implicitly covering the recovery period). It doesn't matter anyway. 

Deadlock with recovery:

When finalizing a peer recovery we mark the peer as "in sync". To do so we wait until the peer's local checkpoint is at least as high as the global checkpoint. If an operation with `refresh=wait_for` is added as a listener on that peer during recovery, it is not completed from the perspective of the primary. The primary than may wait for it to complete before advancing the local checkpoint for that peer. Since that peer is not considered in sync, the global checkpoint on the primary can be higher, causing a deadlock. Operation waits for recovery to finish and a refresh to happen. Recovery waits on the operation.
This commit is contained in:
Boaz Leskes 2017-08-04 19:51:15 +02:00 committed by GitHub
parent ad4dbbf1a6
commit e11cbed534
5 changed files with 103 additions and 57 deletions

View File

@ -675,24 +675,14 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private void maybeRefreshEngine() {
if (indexSettings.getRefreshInterval().millis() > 0) {
for (IndexShard shard : this.shards.values()) {
switch (shard.state()) {
case CREATED:
case RECOVERING:
case CLOSED:
continue;
case POST_RECOVERY:
case STARTED:
case RELOCATED:
try {
if (shard.isRefreshNeeded()) {
shard.refresh("schedule");
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
if (shard.isReadAllowed()) {
try {
if (shard.isRefreshNeeded()) {
shard.refresh("schedule");
}
continue;
default:
throw new IllegalStateException("unknown state: " + shard.state());
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
}
}
}

View File

@ -847,8 +847,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
public RefreshStats refreshStats() {
// Null refreshListeners means this shard doesn't support them so there can't be any.
int listeners = refreshListeners == null ? 0 : refreshListeners.pendingCount();
int listeners = refreshListeners.pendingCount();
return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()), listeners);
}
@ -1155,6 +1154,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (state == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shardId);
}
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
getEngine().refresh("post_recovery");
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
}
@ -1324,6 +1327,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
final Engine engine = this.currentEngineReference.getAndSet(null);
IOUtils.close(engine);
recoveryState().setStage(RecoveryState.Stage.INIT);
@ -1372,6 +1376,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
/** returns true if the {@link IndexShardState} allows reading */
public boolean isReadAllowed() {
return readAllowedStates.contains(state);
}
private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
@ -2356,7 +2365,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* false otherwise.
*/
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
refreshListeners.addOrNotify(location, listener);
final boolean readAllowed;
if (isReadAllowed()) {
readAllowed = true;
} else {
// check again under mutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (mutex) {
readAllowed = isReadAllowed();
}
}
if (readAllowed) {
refreshListeners.addOrNotify(location, listener);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
}
}
private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {

View File

@ -993,7 +993,7 @@ public class IndexShardTests extends IndexShardTestCase {
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard test = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoveryShardFromStore(test);
recoverShardFromStore(test);
indexDoc(test, "test", "test");
assertEquals(versionCreated.luceneVersion, test.minimumCompatibleVersion());
@ -1040,14 +1040,14 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // one refresh on end of recovery, one on starting shard
assertThat(shard.refreshStats().getTotal(), equalTo(3L)); // refresh on: finalize, end of recovery and on starting shard
long initialTotalTime = shard.refreshStats().getTotalTimeInMillis();
// check time advances
for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) {
indexDoc(shard, "test", "test");
assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1));
assertThat(shard.refreshStats().getTotal(), equalTo(3L + i - 1));
shard.refresh("test");
assertThat(shard.refreshStats().getTotal(), equalTo(2L + i));
assertThat(shard.refreshStats().getTotal(), equalTo(3L + i));
assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime));
}
long refreshCount = shard.refreshStats().getTotal();
@ -1130,7 +1130,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
});
recoveryShardFromStore(shard);
recoverShardFromStore(shard);
indexDoc(shard, "test", "1");
assertEquals(1, preIndex.get());
@ -1679,7 +1679,7 @@ public class IndexShardTests extends IndexShardTestCase {
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null);
recoveryShardFromStore(newShard);
recoverShardFromStore(newShard);
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
@ -1718,7 +1718,7 @@ public class IndexShardTests extends IndexShardTestCase {
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, wrapper);
recoveryShardFromStore(shard);
recoverShardFromStore(shard);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
shard.refresh("created segment 1");
indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}");
@ -1788,7 +1788,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
};
final IndexShard newShard = reinitShard(shard, listener);
recoveryShardFromStore(newShard);
recoverShardFromStore(newShard);
IndexingStats indexingStats = newShard.indexingStats();
// ensure we are not influencing the indexing stats
assertEquals(0, indexingStats.getTotal().getDeleteCount());
@ -1824,7 +1824,7 @@ public class IndexShardTests extends IndexShardTestCase {
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null);
recoveryShardFromStore(newShard);
recoverShardFromStore(newShard);
try {
newShard.acquireSearcher("test");
@ -1845,7 +1845,7 @@ public class IndexShardTests extends IndexShardTestCase {
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoveryShardFromStore(primary);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
@ -1947,7 +1947,7 @@ public class IndexShardTests extends IndexShardTestCase {
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoveryShardFromStore(primary);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
@ -1978,6 +1978,58 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary, replica);
}
public void testRefreshListenersDuringPeerRecovery() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
Consumer<IndexShard> assertListenerCalled = shard -> {
AtomicBoolean called = new AtomicBoolean();
shard.addRefreshListener(null, b -> {
assertFalse(b);
called.set(true);
});
assertTrue(called.get());
};
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
assertListenerCalled.accept(replica);
recoverReplica(replica, primary, (shard, discoveryNode) ->
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(totalTranslogOps);
assertListenerCalled.accept(replica);
}
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
assertListenerCalled.accept(replica);
return localCheckpoint;
}
@Override
public void finalizeRecovery(long globalCheckpoint) {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
}
}, false);
closeShards(primary, replica);
}
public void testRecoverFromLocalShard() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@ -1989,7 +2041,7 @@ public class IndexShardTests extends IndexShardTestCase {
.primaryTerm(0, 1).build();
IndexShard sourceShard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoveryShardFromStore(sourceShard);
recoverShardFromStore(sourceShard);
indexDoc(sourceShard, "test", "0", "{\"foo\" : \"bar\"}");
indexDoc(sourceShard, "test", "1", "{\"foo\" : \"bar\"}");
@ -2011,7 +2063,7 @@ public class IndexShardTests extends IndexShardTestCase {
};
final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true);
recoveryShardFromStore(differentIndex);
recoverShardFromStore(differentIndex);
expectThrows(IllegalArgumentException.class, () -> {
targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex));
});
@ -2038,7 +2090,7 @@ public class IndexShardTests extends IndexShardTestCase {
// now check that it's persistent ie. that the added shards are committed
{
final IndexShard newShard = reinitShard(targetShard);
recoveryShardFromStore(newShard);
recoverShardFromStore(newShard);
assertDocCount(newShard, 2);
closeShards(newShard);
}

View File

@ -1,24 +1,3 @@
---
setup:
- do:
cluster.put_settings:
body:
persistent:
logger._root: debug
---
teardown:
- do:
cluster.put_settings:
body:
persistent:
# this is not exactly correct as tests could be running
# under a different logging level; we sacrifice correctness
# here for now in the hopes of quickly understanding what is
# causing this test to fail and simply reverting the change
# here
logger._root: null
---
"refresh=true immediately makes changes are visible in search":
- do:

View File

@ -329,7 +329,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
protected IndexShard newStartedShard(boolean primary) throws IOException {
IndexShard shard = newShard(primary);
if (primary) {
recoveryShardFromStore(shard);
recoverShardFromStore(shard);
} else {
recoveryEmptyReplica(shard);
}
@ -352,7 +352,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
}
protected void recoveryShardFromStore(IndexShard primary) throws IOException {
protected void recoverShardFromStore(IndexShard primary) throws IOException {
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
null));