Backport: add templating support to pipeline processor (#49643)

Backport of #49030

This commit adds templating support to the pipeline processor's `name` option.

Closes #39955
This commit is contained in:
Martijn van Groningen 2019-11-27 15:53:40 +01:00 committed by GitHub
parent 15342c4dd2
commit 0a42395dfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 139 additions and 28 deletions

View File

@ -7,7 +7,7 @@ Executes another pipeline.
[options="header"]
|======
| Name | Required | Default | Description
| `name` | yes | - | The name of the pipeline to execute
| `name` | yes | - | The name of the pipeline to execute. Supports <<accessing-template-fields,template snippets>>.
include::common-options.asciidoc[]
|======

View File

@ -28,3 +28,9 @@ dependencies {
compile project(':libs:elasticsearch-grok')
compile project(':libs:elasticsearch-dissect')
}
testClusters.integTest {
// Needed in order to test ingest pipeline templating:
// (this is because the integTest node is not using default distribution, but only the minimal number of required modules)
module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile)
}

View File

@ -108,3 +108,97 @@ teardown:
body: {}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
---
"Test Pipeline Processor with templating":
- do:
ingest.put_pipeline:
id: "engineering-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "john"
}
}
]
}
- match: { acknowledged: true }
- do:
ingest.put_pipeline:
id: "sales-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "jan"
}
}
]
}
- match: { acknowledged: true }
- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"processors" : [
{
"pipeline" : {
"name": "{{org}}-department"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "outer"
body: >
{
"org": "engineering"
}
- do:
get:
index: test
id: 1
- match: { _source.manager: "john" }
- do:
index:
index: test
id: 2
pipeline: "outer"
body: >
{
"org": "sales"
}
- do:
get:
index: test
id: 2
- match: { _source.manager: "jan" }
- do:
catch: /illegal_state_exception/
index:
index: test
id: 3
pipeline: "outer"
body: >
{
"org": "legal"
}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }

View File

@ -461,7 +461,7 @@ public class IngestService implements ClusterStateApplier {
}
//package private for testing
static String getProcessorName(Processor processor){
static String getProcessorName(Processor processor) {
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getInnerProcessor();
@ -470,7 +470,7 @@ public class IngestService implements ClusterStateApplier {
sb.append(processor.getType());
if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
String pipelineName = ((PipelineProcessor) processor).getPipelineTemplate().newInstance(Collections.emptyMap()).execute();
sb.append(":");
sb.append(pipelineName);
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest;
import org.elasticsearch.script.TemplateScript;
import java.util.Map;
import java.util.function.BiConsumer;
@ -26,18 +28,18 @@ public class PipelineProcessor extends AbstractProcessor {
public static final String TYPE = "pipeline";
private final String pipelineName;
private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;
private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineName = pipelineName;
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
}
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
@ -52,7 +54,8 @@ public class PipelineProcessor extends AbstractProcessor {
throw new UnsupportedOperationException("this method should not get executed");
}
Pipeline getPipeline(){
Pipeline getPipeline(IngestDocument ingestDocument) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
return ingestService.getPipeline(pipelineName);
}
@ -61,8 +64,8 @@ public class PipelineProcessor extends AbstractProcessor {
return TYPE;
}
String getPipelineName() {
return pipelineName;
TemplateScript.Factory getPipelineTemplate() {
return pipelineTemplate;
}
public static final class Factory implements Processor.Factory {
@ -76,9 +79,9 @@ public class PipelineProcessor extends AbstractProcessor {
@Override
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String pipeline =
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name");
return new PipelineProcessor(processorTag, pipeline, ingestService);
TemplateScript.Factory pipelineTemplate =
ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService());
return new PipelineProcessor(processorTag, pipelineTemplate, ingestService);
}
}
}

View File

@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline();
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;

View File

@ -1140,7 +1140,7 @@ public class IngestServiceTests extends ESTestCase {
PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
when(pipelineProcessor.getPipelineTemplate()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName));
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase {
public void testExecutesPipeline() throws Exception {
String pipelineId = "pipeline";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Pipeline pipeline = new Pipeline(
@ -69,7 +70,7 @@ public class PipelineProcessorTests extends ESTestCase {
}
public void testThrowsOnMissingPipeline() throws Exception {
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
@ -85,7 +86,7 @@ public class PipelineProcessorTests extends ESTestCase {
public void testThrowsOnRecursivePipelineInvocations() throws Exception {
String innerPipelineId = "inner";
String outerPipelineId = "outer";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
@ -113,7 +114,7 @@ public class PipelineProcessorTests extends ESTestCase {
public void testAllowsRepeatedPipelineInvocations() throws Exception {
String innerPipelineId = "inner";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
@ -131,7 +132,7 @@ public class PipelineProcessorTests extends ESTestCase {
String pipeline1Id = "pipeline1";
String pipeline2Id = "pipeline2";
String pipeline3Id = "pipeline3";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> pipeline1ProcessorConfig = new HashMap<>();
@ -203,4 +204,11 @@ public class PipelineProcessorTests extends ESTestCase {
assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
}
static IngestService createIngestService() {
IngestService ingestService = mock(IngestService.class);
ScriptService scriptService = mock(ScriptService.class);
when(ingestService.getScriptService()).thenReturn(scriptService);
return ingestService;
}
}

View File

@ -40,6 +40,7 @@ import java.util.Map;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
import static org.elasticsearch.ingest.PipelineProcessorTests.createIngestService;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
@ -47,7 +48,6 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -195,7 +195,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualPipelineProcessor() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
@ -240,7 +240,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualPipelineProcessorWithTrueConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
@ -308,7 +308,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualPipelineProcessorWithFalseConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
@ -377,7 +377,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
RuntimeException exception = new RuntimeException("processor failed");
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
@ -430,7 +430,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualPipelineProcessorWithCycle() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
@ -462,7 +462,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);