Fix NOOP bulk updates (#32819)
#31821 introduced an unreleased bug where NOOP updates were incorrectly mutating the bulk shard request, inserting null item to be replicated, which would result in NullPointerExceptions when serializing the request to be shipped to the replicas. Closes #32808
This commit is contained in:
parent
10fddb62ee
commit
a8bfa466b2
|
@ -290,10 +290,10 @@ class BulkPrimaryExecutionContext {
|
||||||
/** finishes the execution of the current request, with the response that should be returned to the user */
|
/** finishes the execution of the current request, with the response that should be returned to the user */
|
||||||
public void markAsCompleted(BulkItemResponse translatedResponse) {
|
public void markAsCompleted(BulkItemResponse translatedResponse) {
|
||||||
assertInvariants(ItemProcessingState.EXECUTED);
|
assertInvariants(ItemProcessingState.EXECUTED);
|
||||||
assert executionResult == null || translatedResponse.getItemId() == executionResult.getItemId();
|
assert executionResult != null && translatedResponse.getItemId() == executionResult.getItemId();
|
||||||
assert translatedResponse.getItemId() == getCurrentItem().id();
|
assert translatedResponse.getItemId() == getCurrentItem().id();
|
||||||
|
|
||||||
if (translatedResponse.isFailed() == false && requestToExecute != getCurrent()) {
|
if (translatedResponse.isFailed() == false && requestToExecute != null && requestToExecute != getCurrent()) {
|
||||||
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
|
request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute);
|
||||||
}
|
}
|
||||||
getCurrentItem().setPrimaryResponse(translatedResponse);
|
getCurrentItem().setPrimaryResponse(translatedResponse);
|
||||||
|
|
|
@ -284,7 +284,6 @@ public class IndicesRequestIT extends ESIntegTestCase {
|
||||||
assertSameIndices(updateRequest, updateShardActions);
|
assertSameIndices(updateRequest, updateShardActions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32808")
|
|
||||||
public void testBulk() {
|
public void testBulk() {
|
||||||
String[] bulkShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
|
String[] bulkShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"};
|
||||||
interceptTransportActions(bulkShardActions);
|
interceptTransportActions(bulkShardActions);
|
||||||
|
|
|
@ -472,6 +472,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
||||||
assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse));
|
assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse));
|
||||||
assertThat(primaryResponse.getResponse().getResult(),
|
assertThat(primaryResponse.getResponse().getResult(),
|
||||||
equalTo(DocWriteResponse.Result.NOOP));
|
equalTo(DocWriteResponse.Result.NOOP));
|
||||||
|
assertThat(bulkShardRequest.items().length, equalTo(1));
|
||||||
|
assertEquals(primaryRequest, bulkShardRequest.items()[0]); // check that bulk item was not mutated
|
||||||
assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
|
assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue