Fix wrong result when executing bulk requests with and without pipeline (#60818) (#61777)

This commit is contained in:
Dan Hermann 2020-09-01 07:05:25 -05:00 committed by GitHub
parent 3fd25bfa87
commit 88a448f1cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 13 deletions

View File

@ -0,0 +1,25 @@
---
"One request has pipeline and another not":
- skip:
version: " - 7.99.99"
reason: "change after backporting"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"f1": "v1", "f2": 42}'
- '{"index": {"_index": "test_index", "_id": "test_id2", "pipeline": "mypipeline"}}'
- '{"f1": "v2", "f2": 47}'
- match: { errors: true }
- match: { items.0.index.result: created }
- match: { items.1.index.status: 400 }
- match: { items.1.index.error.type: illegal_argument_exception }
- match: { items.1.index.error.reason: "pipeline with id [mypipeline] does not exist" }
- do:
count:
index: test_index
- match: {count: 1}

View File

@ -473,6 +473,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}
@ -495,6 +496,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

View File

@ -655,18 +655,26 @@ public class IngestServiceTests extends ESTestCase {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final SetOnce<Boolean> failure = new SetOnce<>();
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
BulkRequest bulkRequest = new BulkRequest();
final IndexRequest indexRequest1 =
new IndexRequest("_index", "_type", "_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id2").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getMessage(), equalTo("error"));
failure.set(true);
assertThat(slot, equalTo(1));
};
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, indexReq -> {});
assertTrue(failure.get());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
@ -685,13 +693,15 @@ public class IngestServiceTests extends ESTestCase {
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
IndexRequest indexRequest1 =
new IndexRequest("_index", "_type", "_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.setPipeline("does_not_exist")
.setFinalPipeline("_none");
IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
IndexRequest indexRequest3 =
new IndexRequest("_index", "_type", "_id3").source(emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none");
bulkRequest.add(indexRequest3);
@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
@ -702,7 +712,7 @@ public class IngestServiceTests extends ESTestCase {
argThat(new CustomTypeSafeMatcher<Integer>("failure handler was not called with the expected arguments") {
@Override
protected boolean matchesSafely(Integer item) {
return item == 1;
return item == 2;
}
}),
@ -1201,18 +1211,27 @@ public class IngestServiceTests extends ESTestCase {
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest =
new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
BulkRequest bulkRequest = new BulkRequest();
final IndexRequest indexRequest1 =
new IndexRequest("_index", "_type", "_id1").source(Collections.emptyMap()).setPipeline("_none").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id2").source(Collections.emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final IntConsumer dropHandler = mock(IntConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
completionHandler, dropHandler);
verify(failureHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
verify(dropHandler, times(1)).accept(0);
verify(dropHandler, times(1)).accept(1);
}
public void testIngestClusterStateListeners_orderOfExecution() {