Backport: Introduce on_failure_pipeline ingest metadata inside on_failure block (#49596)
Backport of #49076 In case an exception occurs inside a pipeline processor, the pipeline stack is kept around as header in the exception. Then in the on_failure processor the id of the pipeline the exception occurred is made accessible via the `on_failure_pipeline` ingest metadata. Closes #44920
This commit is contained in:
parent
901c64ebbf
commit
90850f4ea0
|
@ -378,7 +378,7 @@ The `if` condition can be more then a simple equality check.
|
|||
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
|
||||
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
|
||||
|
||||
IMPORTANT: The value of ctx is read-only in `if` conditions.
|
||||
IMPORTANT: The value of ctx is read-only in `if` conditions.
|
||||
|
||||
A more complex `if` condition that drops the document (i.e. not index it)
|
||||
unless it has a multi-valued tag field with at least one value that contains the characters
|
||||
|
@ -722,8 +722,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.
|
|||
|
||||
You may want to retrieve the actual error message that was thrown
|
||||
by a failed processor. To do so you can access metadata fields called
|
||||
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
|
||||
from within the context of an `on_failure` block.
|
||||
`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
|
||||
`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
|
||||
These fields are only accessible from within the context of an `on_failure` block.
|
||||
|
||||
Here is an updated version of the example that you
|
||||
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
|
||||
|
|
|
@ -107,4 +107,4 @@ teardown:
|
|||
pipeline: "outer"
|
||||
body: {}
|
||||
- match: { error.root_cause.0.type: "ingest_processor_exception" }
|
||||
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
|
||||
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
|
||||
|
|
|
@ -54,7 +54,7 @@ class SimulateExecutionService {
|
|||
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
|
||||
});
|
||||
} else {
|
||||
pipeline.execute(ingestDocument, (result, e) -> {
|
||||
ingestDocument.executePipeline(pipeline, (result, e) -> {
|
||||
if (e == null) {
|
||||
handler.accept(new SimulateDocumentBaseResult(result), null);
|
||||
} else {
|
||||
|
|
|
@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
|
|||
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
|
||||
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
|
||||
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
|
||||
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
|
||||
|
||||
private final boolean ignoreFailure;
|
||||
private final List<Processor> processors;
|
||||
|
@ -144,7 +145,7 @@ public class CompoundProcessor implements Processor {
|
|||
innerExecute(currentProcessor + 1, ingestDocument, handler);
|
||||
} else {
|
||||
IngestProcessorException compoundProcessorException =
|
||||
newCompoundProcessorException(e, processor.getType(), processor.getTag());
|
||||
newCompoundProcessorException(e, processor, ingestDocument);
|
||||
if (onFailureProcessors.isEmpty()) {
|
||||
handler.accept(null, compoundProcessorException);
|
||||
} else {
|
||||
|
@ -177,7 +178,7 @@ public class CompoundProcessor implements Processor {
|
|||
onFailureProcessor.execute(ingestDocument, (result, e) -> {
|
||||
if (e != null) {
|
||||
removeFailureMetadata(ingestDocument);
|
||||
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
|
||||
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
|
||||
return;
|
||||
}
|
||||
if (result == null) {
|
||||
|
@ -192,12 +193,17 @@ public class CompoundProcessor implements Processor {
|
|||
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
|
||||
List<String> processorTypeHeader = cause.getHeader("processor_type");
|
||||
List<String> processorTagHeader = cause.getHeader("processor_tag");
|
||||
List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
|
||||
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
|
||||
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
|
||||
String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
|
||||
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
|
||||
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
|
||||
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
|
||||
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
|
||||
if (failedPipelineId != null) {
|
||||
ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeFailureMetadata(IngestDocument ingestDocument) {
|
||||
|
@ -205,21 +211,28 @@ public class CompoundProcessor implements Processor {
|
|||
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
|
||||
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
|
||||
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
|
||||
ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
|
||||
}
|
||||
|
||||
private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
|
||||
static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
|
||||
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
|
||||
return (IngestProcessorException) e;
|
||||
}
|
||||
|
||||
IngestProcessorException exception = new IngestProcessorException(e);
|
||||
|
||||
String processorType = processor.getType();
|
||||
if (processorType != null) {
|
||||
exception.addHeader("processor_type", processorType);
|
||||
}
|
||||
String processorTag = processor.getTag();
|
||||
if (processorTag != null) {
|
||||
exception.addHeader("processor_tag", processorTag);
|
||||
}
|
||||
List<String> pipelineStack = document.getPipelineStack();
|
||||
if (pipelineStack.size() > 1) {
|
||||
exception.addHeader("pipeline_origin", pipelineStack);
|
||||
}
|
||||
|
||||
return exception;
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -60,7 +60,7 @@ public final class IngestDocument {
|
|||
private final Map<String, Object> ingestMetadata;
|
||||
|
||||
// Contains all pipelines that have been executed for this document
|
||||
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
private final Set<String> executedPipelines = new LinkedHashSet<>();
|
||||
|
||||
public IngestDocument(String index, String type, String id, String routing,
|
||||
Long version, VersionType versionType, Map<String, Object> source) {
|
||||
|
@ -647,9 +647,9 @@ public final class IngestDocument {
|
|||
* @param handler handles the result or failure
|
||||
*/
|
||||
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
|
||||
if (executedPipelines.add(pipeline)) {
|
||||
if (executedPipelines.add(pipeline.getId())) {
|
||||
pipeline.execute(this, (result, e) -> {
|
||||
executedPipelines.remove(pipeline);
|
||||
executedPipelines.remove(pipeline.getId());
|
||||
handler.accept(result, e);
|
||||
});
|
||||
} else {
|
||||
|
@ -657,6 +657,15 @@ public final class IngestDocument {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a pipeline stack; all pipelines that are in execution by this document in reverse order
|
||||
*/
|
||||
List<String> getPipelineStack() {
|
||||
List<String> pipelineStack = new ArrayList<>(executedPipelines);
|
||||
Collections.reverse(pipelineStack);
|
||||
return pipelineStack;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) { return true; }
|
||||
|
|
|
@ -501,7 +501,7 @@ public class IngestService implements ClusterStateApplier {
|
|||
VersionType versionType = indexRequest.versionType();
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
|
||||
pipeline.execute(ingestDocument, (result, e) -> {
|
||||
ingestDocument.executePipeline(pipeline, (result, e) -> {
|
||||
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
|
||||
totalMetrics.postIngest(ingestTimeInMillis);
|
||||
if (e != null) {
|
||||
|
|
|
@ -28,12 +28,15 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -118,8 +121,8 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
|
||||
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
|
||||
when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1));
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
|
||||
Collections.singletonList(processor2), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(processor1),
|
||||
singletonList(processor2), relativeTimeProvider);
|
||||
compoundProcessor.execute(ingestDocument, (result, e) -> {});
|
||||
verify(relativeTimeProvider, times(2)).getAsLong();
|
||||
|
||||
|
@ -150,8 +153,8 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
|
||||
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
|
||||
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
|
||||
Collections.singletonList(processor2), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(processor1),
|
||||
singletonList(processor2), relativeTimeProvider);
|
||||
IngestDocument[] result = new IngestDocument[1];
|
||||
compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r);
|
||||
assertThat(result[0], nullValue());
|
||||
|
@ -178,10 +181,10 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
});
|
||||
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
|
||||
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
|
||||
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail),
|
||||
Collections.singletonList(lastProcessor), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
|
||||
Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider);
|
||||
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, singletonList(processorToFail),
|
||||
singletonList(lastProcessor), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(processor),
|
||||
singletonList(compoundOnFailProcessor), relativeTimeProvider);
|
||||
compoundProcessor.execute(ingestDocument, (result, e) -> {});
|
||||
|
||||
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
|
||||
|
@ -203,8 +206,8 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
|
||||
CompoundProcessor failCompoundProcessor = new CompoundProcessor(relativeTimeProvider, firstProcessor);
|
||||
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
|
||||
Collections.singletonList(secondProcessor), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(failCompoundProcessor),
|
||||
singletonList(secondProcessor), relativeTimeProvider);
|
||||
compoundProcessor.execute(ingestDocument, (result, e) -> {});
|
||||
|
||||
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
|
||||
|
@ -226,11 +229,11 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
|
||||
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
|
||||
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
|
||||
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
|
||||
Collections.singletonList(failProcessor), relativeTimeProvider);
|
||||
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, singletonList(firstProcessor),
|
||||
singletonList(failProcessor), relativeTimeProvider);
|
||||
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
|
||||
Collections.singletonList(secondProcessor), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(failCompoundProcessor),
|
||||
singletonList(secondProcessor), relativeTimeProvider);
|
||||
compoundProcessor.execute(ingestDocument, (result, e) -> {});
|
||||
|
||||
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
|
||||
|
@ -252,11 +255,11 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
|
||||
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
|
||||
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
|
||||
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
|
||||
Collections.singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor)));
|
||||
CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, singletonList(firstProcessor),
|
||||
singletonList(new CompoundProcessor(relativeTimeProvider, failProcessor)));
|
||||
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
|
||||
Collections.singletonList(secondProcessor), relativeTimeProvider);
|
||||
CompoundProcessor compoundProcessor = new CompoundProcessor(false, singletonList(failCompoundProcessor),
|
||||
singletonList(secondProcessor), relativeTimeProvider);
|
||||
compoundProcessor.execute(ingestDocument, (result, e) -> {});
|
||||
|
||||
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
|
||||
|
@ -271,7 +274,7 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
|
||||
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
|
||||
CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor),
|
||||
Collections.singletonList(onFailureProcessor), relativeTimeProvider);
|
||||
singletonList(onFailureProcessor), relativeTimeProvider);
|
||||
pipeline.execute(ingestDocument, (result, e) -> {});
|
||||
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(secondProcessor.getInvokedCounter(), equalTo(0));
|
||||
|
@ -279,6 +282,82 @@ public class CompoundProcessorTests extends ESTestCase {
|
|||
assertStats(pipeline, 1, 1, 0);
|
||||
}
|
||||
|
||||
public void testFailureProcessorIsInvokedOnFailure() {
|
||||
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
|
||||
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
|
||||
assertThat(ingestMetadata.entrySet(), hasSize(4));
|
||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
|
||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
|
||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
|
||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
|
||||
});
|
||||
|
||||
Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
|
||||
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, singletonList(new AbstractProcessor(null) {
|
||||
@Override
|
||||
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
||||
ingestDocument.executePipeline(pipeline2, handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return "pipeline";
|
||||
}
|
||||
}), singletonList(onFailureProcessor)));
|
||||
|
||||
ingestDocument.executePipeline(pipeline1, (document, e) -> {
|
||||
assertThat(document, notNullValue());
|
||||
assertThat(e, nullValue());
|
||||
});
|
||||
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
|
||||
}
|
||||
|
||||
public void testNewCompoundProcessorException() {
|
||||
TestProcessor processor = new TestProcessor("my_tag", "my_type", new RuntimeException());
|
||||
IngestProcessorException ingestProcessorException1 =
|
||||
CompoundProcessor.newCompoundProcessorException(new RuntimeException(), processor, ingestDocument);
|
||||
assertThat(ingestProcessorException1.getHeader("processor_tag"), equalTo(singletonList("my_tag")));
|
||||
assertThat(ingestProcessorException1.getHeader("processor_type"), equalTo(singletonList("my_type")));
|
||||
assertThat(ingestProcessorException1.getHeader("pipeline_origin"), nullValue());
|
||||
|
||||
IngestProcessorException ingestProcessorException2 =
|
||||
CompoundProcessor.newCompoundProcessorException(ingestProcessorException1, processor, ingestDocument);
|
||||
assertThat(ingestProcessorException2, sameInstance(ingestProcessorException1));
|
||||
}
|
||||
|
||||
public void testNewCompoundProcessorExceptionPipelineOrigin() {
|
||||
Pipeline pipeline2 = new Pipeline("2", null, null,
|
||||
new CompoundProcessor(new TestProcessor("my_tag", "my_type", new RuntimeException())));
|
||||
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null) {
|
||||
@Override
|
||||
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
||||
ingestDocument.executePipeline(pipeline2, handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return "my_type2";
|
||||
}
|
||||
}));
|
||||
|
||||
Exception[] holder = new Exception[1];
|
||||
ingestDocument.executePipeline(pipeline1, (document, e) -> holder[0] = e);
|
||||
IngestProcessorException ingestProcessorException = (IngestProcessorException) holder[0];
|
||||
assertThat(ingestProcessorException.getHeader("processor_tag"), equalTo(singletonList("my_tag")));
|
||||
assertThat(ingestProcessorException.getHeader("processor_type"), equalTo(singletonList("my_type")));
|
||||
assertThat(ingestProcessorException.getHeader("pipeline_origin"), equalTo(Arrays.asList("2", "1")));
|
||||
}
|
||||
|
||||
private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
|
||||
assertStats(0, compoundProcessor, 0L, count, failed, time);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
|
@ -39,12 +40,14 @@ import org.elasticsearch.action.update.UpdateRequest;
|
|||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -68,7 +71,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(IngestTestPlugin.class);
|
||||
return Collections.singleton(ExtendedIngestTestPlugin.class);
|
||||
}
|
||||
|
||||
public void testSimulate() throws Exception {
|
||||
|
@ -293,4 +296,157 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
assertFalse(item.isFailed());
|
||||
assertEquals("auto-generated", item.getResponse().getId());
|
||||
}
|
||||
|
||||
public void testPipelineOriginHeader() throws Exception {
|
||||
{
|
||||
XContentBuilder source = jsonBuilder().startObject();
|
||||
{
|
||||
source.startArray("processors");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("pipeline");
|
||||
source.field("name", "2");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
source.endObject();
|
||||
PutPipelineRequest putPipelineRequest =
|
||||
new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON);
|
||||
client().admin().cluster().putPipeline(putPipelineRequest).get();
|
||||
}
|
||||
{
|
||||
XContentBuilder source = jsonBuilder().startObject();
|
||||
{
|
||||
source.startArray("processors");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("pipeline");
|
||||
source.field("name", "3");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
source.endObject();
|
||||
PutPipelineRequest putPipelineRequest =
|
||||
new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON);
|
||||
client().admin().cluster().putPipeline(putPipelineRequest).get();
|
||||
}
|
||||
{
|
||||
XContentBuilder source = jsonBuilder().startObject();
|
||||
{
|
||||
source.startArray("processors");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("fail");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
source.endObject();
|
||||
PutPipelineRequest putPipelineRequest =
|
||||
new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON);
|
||||
client().admin().cluster().putPipeline(putPipelineRequest).get();
|
||||
}
|
||||
|
||||
Exception e = expectThrows(Exception.class, () -> {
|
||||
IndexRequest indexRequest = new IndexRequest("test");
|
||||
indexRequest.source("{}", XContentType.JSON);
|
||||
indexRequest.setPipeline("1");
|
||||
client().index(indexRequest).get();
|
||||
});
|
||||
IngestProcessorException ingestException = (IngestProcessorException) ExceptionsHelper.unwrap(e, IngestProcessorException.class);
|
||||
assertThat(ingestException.getHeader("processor_type"), equalTo(Collections.singletonList("fail")));
|
||||
assertThat(ingestException.getHeader("pipeline_origin"), equalTo(Arrays.asList("3", "2", "1")));
|
||||
}
|
||||
|
||||
public void testPipelineProcessorOnFailure() throws Exception {
|
||||
{
|
||||
XContentBuilder source = jsonBuilder().startObject();
|
||||
{
|
||||
source.startArray("processors");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("pipeline");
|
||||
source.field("name", "2");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
{
|
||||
source.startArray("on_failure");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("onfailure_processor");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
source.endObject();
|
||||
PutPipelineRequest putPipelineRequest =
|
||||
new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON);
|
||||
client().admin().cluster().putPipeline(putPipelineRequest).get();
|
||||
}
|
||||
{
|
||||
XContentBuilder source = jsonBuilder().startObject();
|
||||
{
|
||||
source.startArray("processors");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("pipeline");
|
||||
source.field("name", "3");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
source.endObject();
|
||||
PutPipelineRequest putPipelineRequest =
|
||||
new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON);
|
||||
client().admin().cluster().putPipeline(putPipelineRequest).get();
|
||||
}
|
||||
{
|
||||
XContentBuilder source = jsonBuilder().startObject();
|
||||
{
|
||||
source.startArray("processors");
|
||||
source.startObject();
|
||||
{
|
||||
source.startObject("fail");
|
||||
source.endObject();
|
||||
}
|
||||
source.endObject();
|
||||
source.endArray();
|
||||
}
|
||||
source.endObject();
|
||||
PutPipelineRequest putPipelineRequest =
|
||||
new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON);
|
||||
client().admin().cluster().putPipeline(putPipelineRequest).get();
|
||||
}
|
||||
|
||||
client().prepareIndex("test", "_doc").setId("1").setSource("{}", XContentType.JSON).setPipeline("1").get();
|
||||
Map<String, Object> inserted = client().prepareGet("test", "_doc", "1")
|
||||
.get().getSourceAsMap();
|
||||
assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
|
||||
}
|
||||
|
||||
public static class ExtendedIngestTestPlugin extends IngestTestPlugin {
|
||||
|
||||
@Override
|
||||
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
|
||||
Map<String, Processor.Factory> factories = new HashMap<>(super.getProcessors(parameters));
|
||||
factories.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
|
||||
factories.put("fail", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", new RuntimeException()));
|
||||
factories.put("onfailure_processor", (processorFactories, tag, config) -> new TestProcessor(tag, "fail", document -> {
|
||||
String onFailurePipeline = document.getFieldValue("_ingest.on_failure_pipeline", String.class);
|
||||
document.setFieldValue("readme", "pipeline with id [" + onFailurePipeline + "] is a bad pipeline");
|
||||
}));
|
||||
return factories;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue