Retry synced-flush on conflict in tests (#66968)

Closes #66631
This commit is contained in:
Nhat Nguyen 2021-01-05 08:36:54 -05:00
parent 150a734a83
commit 6ba5ae2388
4 changed files with 24 additions and 29 deletions

View File

@ -731,13 +731,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
assertOK(client().performRequest(flushRequest));
if (randomBoolean()) {
// We had a bug before where we failed to perform peer recovery with sync_id from 5.x to 6.x.
// We added this synced flush so we can exercise different paths of recovery code.
try {
performSyncedFlush(index);
} catch (ResponseException ignored) {
// synced flush is optional here
}
performSyncedFlush(index, randomBoolean());
}
if (shouldHaveTranslog) {
// Update a few documents so we are sure to have a translog
@ -1451,7 +1445,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
if (randomBoolean()) {
flush(index, randomBoolean());
} else if (randomBoolean()) {
performSyncedFlush(index);
performSyncedFlush(index, randomBoolean());
}
saveInfoDocument("doc_count", Integer.toString(numDocs));
}

View File

@ -308,7 +308,8 @@ public class RecoveryIT extends AbstractRollingTestCase {
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
if (randomBoolean()) {
syncedFlush(index);
performSyncedFlush(index, randomBoolean());
ensureGlobalCheckpointSynced(index);
}
}
@ -587,22 +588,6 @@ public class RecoveryIT extends AbstractRollingTestCase {
}
}
private void syncedFlush(String index) throws Exception {
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = performSyncedFlush(index);
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
} catch (ResponseException ex) {
throw new AssertionError(ex); // cause assert busy to retry
}
});
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
ensureGlobalCheckpointSynced(index);
}
@SuppressWarnings("unchecked")
private void assertPeerRecoveredFiles(String reason, String index, String targetNode, Matcher<Integer> sizeMatcher) throws IOException {
Map<?, ?> recoveryStats = entityAsMap(client().performRequest(new Request("GET", index + "/_recovery")));
@ -668,7 +653,8 @@ public class RecoveryIT extends AbstractRollingTestCase {
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
}
if (randomBoolean()) {
syncedFlush(index);
performSyncedFlush(index, randomBoolean());
ensureGlobalCheckpointSynced(index);
}
}

View File

@ -132,7 +132,7 @@ public class TranslogPolicyIT extends AbstractFullClusterRestartTestCase {
if (randomBoolean()) {
flush(index, randomBoolean());
} else if (randomBoolean()) {
performSyncedFlush(index);
performSyncedFlush(index, randomBoolean());
}
}
ensureGreen(index);

View File

@ -1464,7 +1464,7 @@ public abstract class ESRestTestCase extends ESTestCase {
return minVersion;
}
protected static Response performSyncedFlush(String indexName) throws IOException {
protected static void performSyncedFlush(String indexName, boolean retryOnConflict) throws Exception {
final Request request = new Request("POST", indexName + "/_flush/synced");
final List<String> expectedWarnings = Collections.singletonList(SyncedFlushService.SYNCED_FLUSH_DEPRECATION_MESSAGE);
if (nodeVersions.stream().allMatch(version -> version.onOrAfter(Version.V_7_6_0))) {
@ -1476,7 +1476,22 @@ public abstract class ESRestTestCase extends ESTestCase {
options.setWarningsHandler(warnings -> warnings.isEmpty() == false && warnings.equals(expectedWarnings) == false);
request.setOptions(options);
}
return client().performRequest(request);
// We have to spin a synced-flush request because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = client().performRequest(request);
if (retryOnConflict) {
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
}
} catch (ResponseException ex) {
assertThat(ex.getResponse().getStatusLine(), equalTo(HttpStatus.SC_CONFLICT));
if (retryOnConflict) {
throw new AssertionError(ex); // cause assert busy to retry
}
}
});
}
/**