Backport: Add pipeline name to ingest metadata (#51050)

Backport: #50467

This commit adds the name of the current pipeline to ingest metadata.
This pipeline name is accessible under the following key: '_ingest.pipeline'.

Example usage in pipeline:
PUT /_ingest/pipeline/2
{
    "processors": [
        {
            "set": {
                "field": "pipeline_name",
                "value": "{{_ingest.pipeline}}"
            }
        }
    ]
}

Closes #42106
This commit is contained in:
Martijn van Groningen 2020-01-16 10:50:47 +01:00 committed by GitHub
parent 45d7bdcfd7
commit 02dfd71efa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 198 additions and 41 deletions

View File

@ -355,7 +355,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.674Z"
"timestamp": "2017-05-04T22:46:09.674Z",
"pipeline": "_simulate_pipeline"
}
}
},
@ -370,7 +371,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z"
"timestamp": "2017-05-04T22:46:09.675Z",
"pipeline": "_simulate_pipeline"
}
}
}
@ -388,7 +390,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z"
"timestamp": "2017-05-04T22:46:09.676Z",
"pipeline": "_simulate_pipeline"
}
}
},
@ -403,7 +406,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z"
"timestamp": "2017-05-04T22:46:09.677Z",
"pipeline": "_simulate_pipeline"
}
}
}

View File

@ -21,6 +21,8 @@ include::common-options.asciidoc[]
--------------------------------------------------
// NOTCONSOLE
The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.
An example of using this processor for nesting pipelines would be:
Define an inner pipeline:

View File

@ -202,3 +202,81 @@ teardown:
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [legal-department]" }
---
"Test _ingest.pipeline metadata":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline"
}
}
]
}
- match: { acknowledged: true }
- do:
ingest.put_pipeline:
id: "another_pipeline"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline2"
}
}
]
}
- match: { acknowledged: true }
- do:
ingest.put_pipeline:
id: "another_pipeline2"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "pipeline1"
body: >
{
}
- do:
get:
index: test
id: 1
- length: { _source.pipelines: 3 }
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }

View File

@ -291,26 +291,30 @@ teardown:
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
- length: { docs.0.processor_results.0.doc._ingest: 1 }
- length: { docs.0.processor_results.0.doc._ingest: 2 }
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
- is_true: docs.0.processor_results.0.doc._ingest.pipeline
- length: { docs.0.processor_results.1.doc._source: 3 }
- match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
- match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.1.doc._ingest: 1 }
- length: { docs.0.processor_results.1.doc._ingest: 2 }
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
- is_true: docs.0.processor_results.1.doc._ingest.pipeline
- length: { docs.0.processor_results.2.doc._source: 3 }
- match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.2.doc._ingest: 1 }
- length: { docs.0.processor_results.2.doc._ingest: 2 }
- is_true: docs.0.processor_results.2.doc._ingest.timestamp
- is_true: docs.0.processor_results.2.doc._ingest.pipeline
- length: { docs.0.processor_results.3.doc._source: 3 }
- match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
- match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.3.doc._ingest: 1 }
- length: { docs.0.processor_results.3.doc._ingest: 2 }
- is_true: docs.0.processor_results.3.doc._ingest.timestamp
- is_true: docs.0.processor_results.3.doc._ingest.pipeline
---
"Test simulate with exception thrown":
@ -404,12 +408,14 @@ teardown:
- match: { docs.1.processor_results.0.doc._index: "index" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- length: { docs.1.processor_results.0.doc._ingest: 1 }
- length: { docs.1.processor_results.0.doc._ingest: 2 }
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
- is_true: docs.1.processor_results.0.doc._ingest.pipeline
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
- length: { docs.1.processor_results.1.doc._ingest: 1 }
- length: { docs.1.processor_results.1.doc._ingest: 2 }
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline
---
"Test verbose simulate with on_failure":

View File

@ -648,8 +648,14 @@ public final class IngestDocument {
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
}
handler.accept(result, e);
});
} else {

View File

@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;
private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;

View File

@ -91,23 +91,17 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument();
assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument);
assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument();
assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument);
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(),
not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
@ -147,10 +141,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
@ -191,14 +182,12 @@ public class SimulateExecutionServiceTests extends ESTestCase {
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(),
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(),
ingestDocumentWithOnFailureMetadata);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument);
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
}
@ -221,10 +210,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}
public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
@ -245,10 +231,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}
public void testExecuteItemWithFailure() throws Exception {
@ -392,4 +375,19 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
}
private static void assertVerboseResult(SimulateProcessorResult result,
String expectedPipelineId,
IngestDocument expectedIngestDocument) {
IngestDocument simulateVerboseIngestDocument = result.getIngestDocument();
// Remove and compare pipeline key. It is always in the verbose result,
// since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument.
// The key gets added and removed during pipeline execution.
String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline");
assertThat(actualPipelineId, equalTo(expectedPipelineId));
assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument)));
assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument);
assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata())));
}
}

View File

@ -285,11 +285,12 @@ public class CompoundProcessorTests extends ESTestCase {
public void testFailureProcessorIsInvokedOnFailure() {
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(4));
assertThat(ingestMetadata.entrySet(), hasSize(5));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
});
Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));

View File

@ -20,17 +20,22 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -205,6 +210,58 @@ public class PipelineProcessorTests extends ESTestCase {
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
}
public void testIngestPipelineMetadata() {
IngestService ingestService = createIngestService();
final int numPipelines = 16;
Pipeline firstPipeline = null;
for (int i = 0; i < numPipelines; i++) {
String pipelineId = Integer.toString(i);
List<Processor> processors = new ArrayList<>();
processors.add(new AbstractProcessor(null) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue("pipelines", ingestDocument.getIngestMetadata().get("pipeline"));
return ingestDocument;
}
@Override
public String getType() {
return null;
}
});
if (i < (numPipelines - 1)) {
TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1));
processors.add(new PipelineProcessor(null, pipelineName, ingestService));
}
Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, Collections.emptyList()));
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
if (firstPipeline == null) {
firstPipeline = pipeline;
}
}
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
IngestDocument[] docHolder = new IngestDocument[1];
Exception[] errorHolder = new Exception[1];
testIngestDocument.executePipeline(firstPipeline, (doc, e) -> {
docHolder[0] = doc;
errorHolder[0] = e;
});
assertThat(docHolder[0], notNullValue());
assertThat(errorHolder[0], nullValue());
IngestDocument ingestDocument = docHolder[0];
List<?> pipelines = ingestDocument.getFieldValue("pipelines", List.class);
assertThat(pipelines.size(), equalTo(numPipelines));
for (int i = 0; i < numPipelines; i++) {
assertThat(pipelines.get(i), equalTo(Integer.toString(i)));
}
}
static IngestService createIngestService() {
IngestService ingestService = mock(IngestService.class);
ScriptService scriptService = mock(ScriptService.class);

View File

@ -220,6 +220,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(3));
@ -287,6 +288,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2);
@ -355,6 +357,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
verify(ingestService, Mockito.never()).getPipeline(pipelineId2);
@ -406,6 +409,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(4));
@ -482,6 +486,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(2));