Remove immediate operation retry after mapping update (#38873)
Prior to this commit, when an indexing operation resulted in an `Engine.Result.Type.MAPPING_UPDATE_REQUIRED`, TransportShardBulkAction immediately retries the indexing operation to see if it succeeds. In the event that it succeeds the context does not wait until the mapping update has propagated through the cluster state before finishing the indexing. In some of our tests we rely on mappings being available as soon as they've been introduced in a document that indexed correctly. By removing the immediate retry we always wait for this to be the case. Resolves #38428 Supercedes #38579 Relates to #38711
This commit is contained in:
parent
d9c255dbbf
commit
0c733c04be
|
@ -483,26 +483,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||||
throws IOException {
|
throws IOException {
|
||||||
T result = toExecute.get();
|
T result = toExecute.get();
|
||||||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
|
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
|
||||||
// try to update the mappings and try again.
|
// try to update the mappings and mark the context as needing to try again.
|
||||||
try {
|
try {
|
||||||
mappingUpdater.accept(result.getRequiredMappingUpdate());
|
mappingUpdater.accept(result.getRequiredMappingUpdate());
|
||||||
|
context.markAsRequiringMappingUpdate();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// failure to update the mapping should translate to a failure of specific requests. Other requests
|
// failure to update the mapping should translate to a failure of specific requests. Other requests
|
||||||
// still need to be executed and replicated.
|
// still need to be executed and replicated.
|
||||||
onComplete.accept(exceptionToResult.apply(e));
|
onComplete.accept(exceptionToResult.apply(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now
|
|
||||||
result = toExecute.get();
|
|
||||||
|
|
||||||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
|
|
||||||
// double mapping update. We assume that the successful mapping update wasn't yet processed on the node
|
|
||||||
// and retry the entire request again.
|
|
||||||
context.markAsRequiringMappingUpdate();
|
|
||||||
} else {
|
|
||||||
onComplete.accept(result);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
onComplete.accept(result);
|
onComplete.accept(result);
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,8 +285,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
|
|
||||||
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
|
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
|
||||||
|
|
||||||
// Verify that the shard "executed" the operation twice
|
// Verify that the shard "executed" the operation once
|
||||||
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
|
verify(shard, times(1)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
|
||||||
|
|
||||||
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
|
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
|
||||||
.thenReturn(success);
|
.thenReturn(success);
|
||||||
|
@ -295,9 +295,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
(update, shardId, type) -> fail("should not have had to update the mappings"), () -> {});
|
(update, shardId, type) -> fail("should not have had to update the mappings"), () -> {});
|
||||||
|
|
||||||
|
|
||||||
// Verify that the shard "executed" the operation only once (2 for previous invocations plus
|
// Verify that the shard "executed" the operation only once (1 for previous invocations plus
|
||||||
// 1 for this execution)
|
// 1 for this execution)
|
||||||
verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
|
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
|
||||||
|
|
||||||
|
|
||||||
BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
|
BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
|
||||||
|
|
Loading…
Reference in New Issue