ingest: support simulate with verbose for pipeline processor (#33839)
* ingest: support simulate with verbose for pipeline processor This change better supports the use of simulate?verbose with the pipeline processor. Prior to this change any pipeline processors executed with simulate?verbose would not show all intermediate processors for the inner pipelines. This changes also moves the PipelineProcess and TrackingResultProcessor classes to enable instance checks and to avoid overly public classes. As well this updates the error message for when cycles are detected in pipelines calling other pipelines.
This commit is contained in:
parent
019aeadb7d
commit
e37e5dfc04
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.SettingsFilter;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.grok.Grok;
|
||||
import org.elasticsearch.grok.ThreadWatchdog;
|
||||
import org.elasticsearch.ingest.PipelineProcessor;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.IngestPlugin;
|
||||
|
|
|
@ -110,4 +110,4 @@ teardown:
|
|||
pipeline: "outer"
|
||||
body: {}
|
||||
- match: { error.root_cause.0.type: "exception" }
|
||||
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." }
|
||||
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
|
||||
|
|
|
@ -605,3 +605,150 @@ teardown:
|
|||
- length: { docs.0.processor_results.1: 2 }
|
||||
- match: { docs.0.processor_results.1.tag: "rename-1" }
|
||||
- match: { docs.0.processor_results.1.doc._source.new_status: 200 }
|
||||
|
||||
---
|
||||
"Test verbose simulate with Pipeline Processor with Circular Pipelines":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "outer"
|
||||
body: >
|
||||
{
|
||||
"description" : "outer pipeline",
|
||||
"processors" : [
|
||||
{
|
||||
"pipeline" : {
|
||||
"pipeline": "inner"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "inner"
|
||||
body: >
|
||||
{
|
||||
"description" : "inner pipeline",
|
||||
"processors" : [
|
||||
{
|
||||
"pipeline" : {
|
||||
"pipeline": "outer"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
catch: /illegal_state_exception/
|
||||
ingest.simulate:
|
||||
verbose: true
|
||||
body: >
|
||||
{
|
||||
"pipeline": {
|
||||
"processors" : [
|
||||
{
|
||||
"pipeline" : {
|
||||
"pipeline": "outer"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
,
|
||||
"docs": [
|
||||
{
|
||||
"_index": "index",
|
||||
"_type": "type",
|
||||
"_id": "id",
|
||||
"_source": {
|
||||
"field1": "123.42 400 <foo>"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { error.root_cause.0.type: "illegal_state_exception" }
|
||||
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" }
|
||||
|
||||
---
|
||||
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "pipeline1"
|
||||
body: >
|
||||
{
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "pipeline1",
|
||||
"value": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"pipeline": {
|
||||
"pipeline": "pipeline2"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "pipeline2"
|
||||
body: >
|
||||
{
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "pipeline2",
|
||||
"value": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
ingest.simulate:
|
||||
verbose: true
|
||||
body: >
|
||||
{
|
||||
"pipeline": {
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "pipeline0",
|
||||
"value": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"pipeline": {
|
||||
"pipeline": "pipeline1"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"docs": [
|
||||
{
|
||||
"_index": "index",
|
||||
"_type": "type",
|
||||
"_id": "id",
|
||||
"_source": {
|
||||
"field1": "123.42 400 <foo>"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- length: { docs.0.processor_results: 3 }
|
||||
- match: { docs.0.processor_results.0.doc._source.pipeline0: true }
|
||||
- is_false: docs.0.processor_results.0.doc._source.pipeline1
|
||||
- is_false: docs.0.processor_results.0.doc._source.pipeline2
|
||||
- match: { docs.0.processor_results.1.doc._source.pipeline0: true }
|
||||
- match: { docs.0.processor_results.1.doc._source.pipeline1: true }
|
||||
- is_false: docs.0.processor_results.1.doc._source.pipeline2
|
||||
- match: { docs.0.processor_results.2.doc._source.pipeline0: true }
|
||||
- match: { docs.0.processor_results.2.doc._source.pipeline1: true }
|
||||
- match: { docs.0.processor_results.2.doc._source.pipeline2: true }
|
||||
|
||||
|
|
|
@ -24,12 +24,16 @@ import org.elasticsearch.action.ActionRunnable;
|
|||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.CompoundProcessor;
|
||||
import org.elasticsearch.ingest.PipelineProcessor;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
|
||||
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
|
||||
|
||||
class SimulateExecutionService {
|
||||
|
||||
|
@ -42,11 +46,15 @@ class SimulateExecutionService {
|
|||
}
|
||||
|
||||
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
|
||||
// Prevent cycles in pipeline decoration
|
||||
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
if (verbose) {
|
||||
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
|
||||
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
|
||||
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen);
|
||||
try {
|
||||
verbosePipelineProcessor.execute(ingestDocument);
|
||||
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
|
||||
verbosePipelineProcessor);
|
||||
ingestDocument.executePipeline(verbosePipeline);
|
||||
return new SimulateDocumentVerboseResult(processorResultList);
|
||||
} catch (Exception e) {
|
||||
return new SimulateDocumentVerboseResult(processorResultList);
|
||||
|
|
|
@ -647,7 +647,7 @@ public final class IngestDocument {
|
|||
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
|
||||
try {
|
||||
if (this.executedPipelines.add(pipeline) == false) {
|
||||
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
|
||||
throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId());
|
||||
}
|
||||
return pipeline.execute(this);
|
||||
} finally {
|
||||
|
|
|
@ -17,15 +17,9 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.common;
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.Map;
|
||||
import org.elasticsearch.ingest.AbstractProcessor;
|
||||
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
|
||||
public class PipelineProcessor extends AbstractProcessor {
|
||||
|
||||
|
@ -50,6 +44,10 @@ public class PipelineProcessor extends AbstractProcessor {
|
|||
return ingestDocument.executePipeline(pipeline);
|
||||
}
|
||||
|
||||
Pipeline getPipeline(){
|
||||
return ingestService.getPipeline(pipelineName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
|
@ -17,14 +17,13 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.ingest;
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.ingest.CompoundProcessor;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.action.ingest.SimulateProcessorResult;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
|
||||
|
@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor {
|
|||
private final List<SimulateProcessorResult> processorResultList;
|
||||
private final boolean ignoreFailure;
|
||||
|
||||
public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
|
||||
TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
|
||||
this.ignoreFailure = ignoreFailure;
|
||||
this.processorResultList = processorResultList;
|
||||
this.actualProcessor = actualProcessor;
|
||||
|
@ -67,19 +66,35 @@ public final class TrackingResultProcessor implements Processor {
|
|||
return actualProcessor.getTag();
|
||||
}
|
||||
|
||||
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
|
||||
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList,
|
||||
Set<PipelineProcessor> pipelinesSeen) {
|
||||
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
|
||||
for (Processor processor : compoundProcessor.getProcessors()) {
|
||||
if (processor instanceof CompoundProcessor) {
|
||||
processors.add(decorate((CompoundProcessor) processor, processorResultList));
|
||||
if (processor instanceof PipelineProcessor) {
|
||||
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
|
||||
if (pipelinesSeen.add(pipelineProcessor) == false) {
|
||||
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
|
||||
}
|
||||
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
|
||||
pipelinesSeen.remove(pipelineProcessor);
|
||||
} else if (processor instanceof CompoundProcessor) {
|
||||
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
|
||||
} else {
|
||||
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
|
||||
}
|
||||
}
|
||||
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
|
||||
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
|
||||
if (processor instanceof CompoundProcessor) {
|
||||
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
|
||||
if (processor instanceof PipelineProcessor) {
|
||||
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
|
||||
if (pipelinesSeen.add(pipelineProcessor) == false) {
|
||||
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
|
||||
}
|
||||
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
|
||||
pipelinesSeen));
|
||||
pipelinesSeen.remove(pipelineProcessor);
|
||||
} else if (processor instanceof CompoundProcessor) {
|
||||
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
|
||||
} else {
|
||||
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
|
||||
}
|
|
@ -1,149 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ingest.TestProcessor;
|
||||
import org.elasticsearch.ingest.CompoundProcessor;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
|
||||
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
|
||||
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
|
||||
import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class TrackingResultProcessorTests extends ESTestCase {
|
||||
|
||||
private IngestDocument ingestDocument;
|
||||
private List<SimulateProcessorResult> resultList;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||
resultList = new ArrayList<>();
|
||||
}
|
||||
|
||||
public void testActualProcessor() throws Exception {
|
||||
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
|
||||
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
|
||||
|
||||
assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
|
||||
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(0).getFailure(), nullValue());
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("processor failed");
|
||||
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
|
||||
|
||||
try {
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
fail("processor should throw exception");
|
||||
} catch (ElasticsearchException e) {
|
||||
assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage()));
|
||||
}
|
||||
|
||||
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
assertThat(resultList.get(0).getIngestDocument(), nullValue());
|
||||
assertThat(resultList.get(0).getFailure(), equalTo(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualCompoundProcessorWithOnFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("fail");
|
||||
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
|
||||
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(false,
|
||||
Arrays.asList(new CompoundProcessor(false,
|
||||
Arrays.asList(failProcessor, onFailureProcessor),
|
||||
Arrays.asList(onFailureProcessor, failProcessor))),
|
||||
Arrays.asList(onFailureProcessor));
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
|
||||
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
|
||||
|
||||
assertThat(failProcessor.getInvokedCounter(), equalTo(2));
|
||||
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
|
||||
assertThat(resultList.size(), equalTo(4));
|
||||
|
||||
assertThat(resultList.get(0).getIngestDocument(), nullValue());
|
||||
assertThat(resultList.get(0).getFailure(), equalTo(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
|
||||
|
||||
Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
|
||||
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
|
||||
assertThat(resultList.get(1).getFailure(), nullValue());
|
||||
assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
|
||||
|
||||
assertThat(resultList.get(2).getIngestDocument(), nullValue());
|
||||
assertThat(resultList.get(2).getFailure(), equalTo(exception));
|
||||
assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
|
||||
|
||||
metadata = resultList.get(3).getIngestDocument().getIngestMetadata();
|
||||
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
|
||||
assertThat(resultList.get(3).getFailure(), nullValue());
|
||||
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("processor failed");
|
||||
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
|
||||
Collections.emptyList());
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
|
||||
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(0).getFailure(), sameInstance(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.ingest.common;
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.ingest.CompoundProcessor;
|
|||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.PipelineProcessor;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -110,7 +111,7 @@ public class PipelineProcessorTests extends ESTestCase {
|
|||
() -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument)
|
||||
);
|
||||
assertEquals(
|
||||
"Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
|
||||
"Cycle detected for pipeline: inner", e.getRootCause().getMessage()
|
||||
);
|
||||
}
|
||||
|
|
@ -0,0 +1,315 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ingest.SimulateProcessorResult;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
|
||||
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
|
||||
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
|
||||
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TrackingResultProcessorTests extends ESTestCase {
|
||||
|
||||
private IngestDocument ingestDocument;
|
||||
private List<SimulateProcessorResult> resultList;
|
||||
private Set<PipelineProcessor> pipelinesSeen;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||
resultList = new ArrayList<>();
|
||||
pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
}
|
||||
|
||||
public void testActualProcessor() throws Exception {
|
||||
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
|
||||
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
|
||||
|
||||
assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
|
||||
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(0).getFailure(), nullValue());
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("processor failed");
|
||||
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen);
|
||||
|
||||
try {
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
fail("processor should throw exception");
|
||||
} catch (ElasticsearchException e) {
|
||||
assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage()));
|
||||
}
|
||||
|
||||
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
assertThat(resultList.get(0).getIngestDocument(), nullValue());
|
||||
assertThat(resultList.get(0).getFailure(), equalTo(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualCompoundProcessorWithOnFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("fail");
|
||||
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
|
||||
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(false,
|
||||
Arrays.asList(new CompoundProcessor(false,
|
||||
Arrays.asList(failProcessor, onFailureProcessor),
|
||||
Arrays.asList(onFailureProcessor, failProcessor))),
|
||||
Arrays.asList(onFailureProcessor));
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen);
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
|
||||
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
|
||||
|
||||
assertThat(failProcessor.getInvokedCounter(), equalTo(2));
|
||||
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
|
||||
assertThat(resultList.size(), equalTo(4));
|
||||
|
||||
assertThat(resultList.get(0).getIngestDocument(), nullValue());
|
||||
assertThat(resultList.get(0).getFailure(), equalTo(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
|
||||
|
||||
Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
|
||||
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
|
||||
assertThat(resultList.get(1).getFailure(), nullValue());
|
||||
assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
|
||||
|
||||
assertThat(resultList.get(2).getIngestDocument(), nullValue());
|
||||
assertThat(resultList.get(2).getFailure(), equalTo(exception));
|
||||
assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
|
||||
|
||||
metadata = resultList.get(3).getIngestDocument().getIngestMetadata();
|
||||
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
|
||||
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
|
||||
assertThat(resultList.get(3).getFailure(), nullValue());
|
||||
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("processor failed");
|
||||
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
|
||||
Collections.emptyList());
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen);
|
||||
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(0).getFailure(), sameInstance(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
|
||||
}
|
||||
|
||||
public void testActualPipelineProcessor() throws Exception {
|
||||
String pipelineId = "pipeline1";
|
||||
IngestService ingestService = mock(IngestService.class);
|
||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||
pipelineConfig.put("pipeline", pipelineId);
|
||||
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
|
||||
|
||||
String key1 = randomAlphaOfLength(10);
|
||||
String key2 = randomAlphaOfLength(10);
|
||||
String key3 = randomAlphaOfLength(10);
|
||||
|
||||
Pipeline pipeline = new Pipeline(
|
||||
pipelineId, null, null, new CompoundProcessor(
|
||||
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
|
||||
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }),
|
||||
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }))
|
||||
);
|
||||
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
|
||||
|
||||
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
|
||||
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen);
|
||||
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
|
||||
|
||||
verify(ingestService).getPipeline(pipelineId);
|
||||
assertThat(resultList.size(), equalTo(3));
|
||||
|
||||
assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
|
||||
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
|
||||
assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
|
||||
|
||||
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
|
||||
assertTrue(resultList.get(1).getIngestDocument().hasField(key2));
|
||||
assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
|
||||
|
||||
assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(2).getFailure(), nullValue());
|
||||
assertThat(resultList.get(2).getProcessorTag(), nullValue());
|
||||
}
|
||||
|
||||
public void testActualPipelineProcessorWithHandledFailure() throws Exception {
|
||||
RuntimeException exception = new RuntimeException("processor failed");
|
||||
|
||||
String pipelineId = "pipeline1";
|
||||
IngestService ingestService = mock(IngestService.class);
|
||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||
pipelineConfig.put("pipeline", pipelineId);
|
||||
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
|
||||
|
||||
String key1 = randomAlphaOfLength(10);
|
||||
String key2 = randomAlphaOfLength(10);
|
||||
String key3 = randomAlphaOfLength(10);
|
||||
|
||||
Pipeline pipeline = new Pipeline(
|
||||
pipelineId, null, null, new CompoundProcessor(
|
||||
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }),
|
||||
new CompoundProcessor(
|
||||
false,
|
||||
Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })),
|
||||
Collections.singletonList(new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))
|
||||
),
|
||||
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }))
|
||||
);
|
||||
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
|
||||
|
||||
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
|
||||
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen);
|
||||
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
|
||||
|
||||
verify(ingestService).getPipeline(pipelineId);
|
||||
assertThat(resultList.size(), equalTo(4));
|
||||
|
||||
assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
|
||||
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
|
||||
assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
|
||||
|
||||
//failed processor
|
||||
assertNull(resultList.get(1).getIngestDocument());
|
||||
assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage()));
|
||||
|
||||
assertTrue(resultList.get(2).getIngestDocument().hasField(key1));
|
||||
assertTrue(resultList.get(2).getIngestDocument().hasField(key2));
|
||||
assertFalse(resultList.get(2).getIngestDocument().hasField(key3));
|
||||
|
||||
assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(3).getFailure(), nullValue());
|
||||
assertThat(resultList.get(3).getProcessorTag(), nullValue());
|
||||
}
|
||||
|
||||
public void testActualPipelineProcessorWithCycle() throws Exception {
|
||||
String pipelineId = "pipeline1";
|
||||
IngestService ingestService = mock(IngestService.class);
|
||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||
pipelineConfig.put("pipeline", pipelineId);
|
||||
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
|
||||
|
||||
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
|
||||
Pipeline pipeline = new Pipeline(
|
||||
pipelineId, null, null, new CompoundProcessor(pipelineProcessor)
|
||||
);
|
||||
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
|
||||
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
||||
() -> decorate(actualProcessor, resultList, pipelinesSeen));
|
||||
assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1"));
|
||||
}
|
||||
|
||||
|
||||
public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
|
||||
String pipelineId = "pipeline1";
|
||||
IngestService ingestService = mock(IngestService.class);
|
||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||
pipelineConfig.put("pipeline", pipelineId);
|
||||
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
|
||||
|
||||
String key1 = randomAlphaOfLength(10);
|
||||
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
|
||||
Pipeline pipeline = new Pipeline(
|
||||
pipelineId, null, null, new CompoundProcessor(
|
||||
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }))
|
||||
);
|
||||
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
|
||||
|
||||
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);
|
||||
|
||||
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen);
|
||||
|
||||
trackingProcessor.execute(ingestDocument);
|
||||
|
||||
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
|
||||
|
||||
verify(ingestService, times(2)).getPipeline(pipelineId);
|
||||
assertThat(resultList.size(), equalTo(2));
|
||||
|
||||
assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));
|
||||
assertThat(resultList.get(0).getFailure(), nullValue());
|
||||
assertThat(resultList.get(0).getProcessorTag(), nullValue());
|
||||
|
||||
assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(1).getFailure(), nullValue());
|
||||
assertThat(resultList.get(1).getProcessorTag(), nullValue());
|
||||
|
||||
//each invocation updates key1 with a random int
|
||||
assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1),
|
||||
resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue