ingest: better support for conditionals with simulate?verbose (#34155)

This commit introduces two corrections to the way simulate?verbose
handles conditionals on processors.

1) Prior to this change when executing simulate?verbose for
processors with conditionals that evaluate to false, that processor
would still be displayed in the result set. What was displayed was
correct, such that no changes to the document occurred. However, if the
conditional evaluates to false, the processor should not even be
displayed.

2) Prior to this change when executing simulate?verbose for
pipeline processors with conditionals, the individual steps would no
longer be displayed. Commit e37e5df addressed the issue, but
failed account for a conditional on the pipeline processor. Since
a pipeline processor can introduce cycles and is effectively a
single processor that encapsulates multiple other processors that
are potentially guarded by a single conditional, special handling is
needed to for pipeline and conditional pipeline processors.
This commit is contained in:
Jake Landis 2018-10-23 11:33:48 -05:00 committed by GitHub
parent 4dbf498721
commit 89dc07bdd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 275 additions and 71 deletions

View File

@ -641,7 +641,6 @@ teardown:
- match: { acknowledged: true } - match: { acknowledged: true }
- do: - do:
catch: /illegal_state_exception/
ingest.simulate: ingest.simulate:
verbose: true verbose: true
body: > body: >
@ -667,8 +666,10 @@ teardown:
} }
] ]
} }
- match: { error.root_cause.0.type: "illegal_state_exception" } - length: { docs: 1 }
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } - length: { docs.0.processor_results: 1 }
- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
- match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" }
--- ---
"Test verbose simulate with Pipeline Processor with Multiple Pipelines": "Test verbose simulate with Pipeline Processor with Multiple Pipelines":

View File

@ -21,17 +21,13 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Set;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
@ -46,11 +42,9 @@ class SimulateExecutionService {
} }
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
// Prevent cycles in pipeline decoration
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
if (verbose) { if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>(); List<SimulateProcessorResult> processorResultList = new ArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
try { try {
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor); verbosePipelineProcessor);

View File

@ -62,10 +62,7 @@ public class ConditionalProcessor extends AbstractProcessor {
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
IngestConditionalScript script = if (evaluate(ingestDocument)) {
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
// Only record metric if the script evaluates to true
long startTimeInNanos = relativeTimeProvider.getAsLong(); long startTimeInNanos = relativeTimeProvider.getAsLong();
try { try {
metric.preIngest(); metric.preIngest();
@ -81,6 +78,12 @@ public class ConditionalProcessor extends AbstractProcessor {
return ingestDocument; return ingestDocument;
} }
boolean evaluate(IngestDocument ingestDocument) {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()));
}
Processor getProcessor() { Processor getProcessor() {
return processor; return processor;
} }

View File

@ -19,11 +19,11 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.action.ingest.SimulateProcessorResult;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Processor to be used within Simulate API to keep track of processors executed in pipeline. * Processor to be used within Simulate API to keep track of processors executed in pipeline.
@ -42,14 +42,46 @@ public final class TrackingResultProcessor implements Processor {
@Override @Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Processor processor = actualProcessor;
try { try {
actualProcessor.execute(ingestDocument); if (processor instanceof ConditionalProcessor) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
return ingestDocument;
}
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getProcessor();
}
}
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
Pipeline pipeline = pipelineProcessor.getPipeline();
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
try {
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline());
} catch (ElasticsearchException elasticsearchException) {
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
throw elasticsearchException;
}
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
} catch (Exception e) {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
}
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
} else {
processor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
}
} catch (Exception e) { } catch (Exception e) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
} else { } else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
} }
throw e; throw e;
} }
@ -66,35 +98,19 @@ public final class TrackingResultProcessor implements Processor {
return actualProcessor.getTag(); return actualProcessor.getTag();
} }
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList, public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
Set<PipelineProcessor> pipelinesSeen) {
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size()); List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getProcessors()) { for (Processor processor : compoundProcessor.getProcessors()) {
if (processor instanceof PipelineProcessor) { if (processor instanceof CompoundProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); processors.add(decorate((CompoundProcessor) processor, processorResultList));
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
}
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
pipelinesSeen.remove(pipelineProcessor);
} else if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
} else { } else {
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
} }
} }
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) { for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
if (processor instanceof PipelineProcessor) { if (processor instanceof CompoundProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
}
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
pipelinesSeen));
pipelinesSeen.remove(pipelineProcessor);
} else if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
} else { } else {
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
} }

View File

@ -21,17 +21,22 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
@ -39,10 +44,11 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TY
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -50,13 +56,11 @@ public class TrackingResultProcessorTests extends ESTestCase {
private IngestDocument ingestDocument; private IngestDocument ingestDocument;
private List<SimulateProcessorResult> resultList; private List<SimulateProcessorResult> resultList;
private Set<PipelineProcessor> pipelinesSeen;
@Before @Before
public void init() { public void init() {
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
resultList = new ArrayList<>(); resultList = new ArrayList<>();
pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
} }
public void testActualProcessor() throws Exception { public void testActualProcessor() throws Exception {
@ -76,9 +80,9 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualCompoundProcessorWithoutOnFailure() throws Exception { public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed"); RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
try { try {
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument);
@ -97,14 +101,14 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualCompoundProcessorWithOnFailure() throws Exception { public void testActualCompoundProcessorWithOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("fail"); RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor = new CompoundProcessor(false, CompoundProcessor actualProcessor = new CompoundProcessor(false,
Arrays.asList(new CompoundProcessor(false, Arrays.asList(new CompoundProcessor(false,
Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(failProcessor, onFailureProcessor),
Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor)); Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
@ -139,10 +143,10 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed"); RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
Collections.emptyList()); Collections.emptyList());
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument);
@ -154,6 +158,45 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
} }
public void testActualCompoundProcessorWithFalseConditional() throws Exception {
String key1 = randomAlphaOfLength(10);
String key2 = randomAlphaOfLength(10);
String key3 = randomAlphaOfLength(10);
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
CompoundProcessor compoundProcessor = new CompoundProcessor(
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })),
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); }));
CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument);
//the step for key 2 is never executed due to conditional and thus not part of the result set
assertThat(resultList.size(), equalTo(2));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
assertTrue(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(1).getFailure(), nullValue());
assertThat(resultList.get(1).getProcessorTag(), nullValue());
}
public void testActualPipelineProcessor() throws Exception { public void testActualPipelineProcessor() throws Exception {
String pipelineId = "pipeline1"; String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class); IngestService ingestService = mock(IngestService.class);
@ -176,13 +219,13 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
verify(ingestService).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(3)); assertThat(resultList.size(), equalTo(3));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
@ -198,6 +241,142 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(2).getProcessorTag(), nullValue()); assertThat(resultList.get(2).getProcessorTag(), nullValue());
} }
public void testActualPipelineProcessorWithTrueConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
pipelineConfig1.put("name", pipelineId1);
Map<String, Object> pipelineConfig2 = new HashMap<>();
pipelineConfig2.put("name", pipelineId2);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
String key1 = randomAlphaOfLength(10);
String key2 = randomAlphaOfLength(10);
String key3 = randomAlphaOfLength(10);
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
Pipeline pipeline1 = new Pipeline(
pipelineId1, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
factory.create(Collections.emptyMap(), null, pipelineConfig2)),
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); })
)
);
Pipeline pipeline2 = new Pipeline(
pipelineId2, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })));
when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2);
assertThat(resultList.size(), equalTo(3));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertTrue(resultList.get(1).getIngestDocument().hasField(key2));
assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(2).getFailure(), nullValue());
assertThat(resultList.get(2).getProcessorTag(), nullValue());
}
public void testActualPipelineProcessorWithFalseConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
pipelineConfig1.put("name", pipelineId1);
Map<String, Object> pipelineConfig2 = new HashMap<>();
pipelineConfig2.put("name", pipelineId2);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
String key1 = randomAlphaOfLength(10);
String key2 = randomAlphaOfLength(10);
String key3 = randomAlphaOfLength(10);
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG,
new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())),
new HashMap<>(ScriptModule.CORE_CONTEXTS)
);
Pipeline pipeline1 = new Pipeline(
pipelineId1, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }),
new ConditionalProcessor(
randomAlphaOfLength(10),
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
factory.create(Collections.emptyMap(), null, pipelineConfig2)),
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); })
)
);
Pipeline pipeline2 = new Pipeline(
pipelineId2, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })));
when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
verify(ingestService, Mockito.never()).getPipeline(pipelineId2);
assertThat(resultList.size(), equalTo(2));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
assertTrue(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(1).getFailure(), nullValue());
assertThat(resultList.get(1).getProcessorTag(), nullValue());
}
public void testActualPipelineProcessorWithHandledFailure() throws Exception { public void testActualPipelineProcessorWithHandledFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed"); RuntimeException exception = new RuntimeException("processor failed");
@ -226,13 +405,13 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
verify(ingestService).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(4)); assertThat(resultList.size(), equalTo(4));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
@ -253,25 +432,36 @@ public class TrackingResultProcessorTests extends ESTestCase {
} }
public void testActualPipelineProcessorWithCycle() throws Exception { public void testActualPipelineProcessorWithCycle() throws Exception {
String pipelineId = "pipeline1"; String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class); IngestService ingestService = mock(IngestService.class);
Map<String, Object> pipelineConfig = new HashMap<>(); Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig.put("name", pipelineId); pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
pipelineConfig1.put("name", pipelineId1);
Map<String, Object> pipelineConfig2 = new HashMap<>();
pipelineConfig2.put("name", pipelineId2);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); Pipeline pipeline1 = new Pipeline(
Pipeline pipeline = new Pipeline( pipelineId1, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig2)));
pipelineId, null, null, new CompoundProcessor(pipelineProcessor)
);
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
Pipeline pipeline2 = new Pipeline(
pipelineId2, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig1)));
when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
IllegalStateException exception = expectThrows(IllegalStateException.class, CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
() -> decorate(actualProcessor, resultList, pipelinesSeen));
assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1"));
}
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument));
assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class));
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
}
public void testActualPipelineProcessorRepeatedInvocation() throws Exception { public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
String pipelineId = "pipeline1"; String pipelineId = "pipeline1";
@ -284,19 +474,19 @@ public class TrackingResultProcessorTests extends ESTestCase {
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig);
Pipeline pipeline = new Pipeline( Pipeline pipeline = new Pipeline(
pipelineId, null, null, new CompoundProcessor( pipelineId, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }))
); );
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument); trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
verify(ingestService, times(2)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(2)); assertThat(resultList.size(), equalTo(2));
assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));