ingest: fix on_failure with Drop processor (#36686)

This commit allows a document to be dropped when a Drop processor
is used in the on_failure fork of the processor chain.

Fixes #36151
This commit is contained in:
Jake Landis 2018-12-17 14:10:13 -06:00 committed by GitHub
parent 4d0bb9dd0a
commit 7bf822bbbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 4 deletions

View File

@ -57,3 +57,44 @@ teardown:
type: test
id: 2
- match: { _source.foo: "blub" }
---
"Test Drop Processor On Failure":
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 3
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}
- do:
catch: missing
get:
index: test
type: test
id: 3
- match: { found: false }

View File

@ -134,7 +134,9 @@ public class CompoundProcessor implements Processor {
if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException;
} else {
executeOnFailure(ingestDocument, compoundProcessorException);
if (executeOnFailure(ingestDocument, compoundProcessorException) == false) {
return null;
}
break;
}
} finally {
@ -145,13 +147,17 @@ public class CompoundProcessor implements Processor {
return ingestDocument;
}
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
/**
* @return true if execution should continue, false if document is dropped.
*/
boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, exception);
for (Processor processor : onFailureProcessors) {
try {
processor.execute(ingestDocument);
if (processor.execute(ingestDocument) == null) {
return false;
}
} catch (Exception e) {
throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
}
@ -159,6 +165,7 @@ public class CompoundProcessor implements Processor {
} finally {
removeFailureMetadata(ingestDocument);
}
return true;
}
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {

View File

@ -129,6 +129,35 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(processor2.getInvokedCounter(), equalTo(1));
}
public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
Processor processor2 = new Processor() {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
//Simulates the drop processor
return null;
}
@Override
public String getType() {
return "drop";
}
@Override
public String getTag() {
return null;
}
};
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2), relativeTimeProvider);
assertNull(compoundProcessor.execute(ingestDocument));
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(compoundProcessor, 1, 1, 0);
}
public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {