Introduce dedicated ingest processor exception (#48810)
Today we wrap exceptions that occur while executing an ingest processor in an ElasticsearchException. Today, in ExceptionsHelper#unwrapCause we only unwrap causes for exceptions that implement ElasticsearchWrapperException, which the top-level ElasticsearchException does not. Ultimately, this means that any exception that occurs during processor execution does not have its cause unwrapped, and so its status is blanket treated as a 500. This means that while executing a bulk request with an ingest pipeline, document-level failures that occur during a processor will cause the status for that document to be treated as 500. Since that does not give the client any indication that they made a mistake, it means some clients will enter infinite retries, thinking that there is some server-side problem that merely needs to clear. This commit addresses this by introducing a dedicated ingest processor exception, so that its causes can be unwrapped. While we could consider a broader change to unwrap causes for more than just ElasticsearchWrapperExceptions, that is a broad change with unclear implications. Since the problem of reporting 500s on client errors is a user-facing bug, we take the conservative approach for now, and we can revisit the unwrapping in a future change.
This commit is contained in:
parent
cac9fe4d86
commit
2bcdcb17cd
|
@ -18,12 +18,10 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.ingest.common;
|
package org.elasticsearch.ingest.common;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.script.MockScriptEngine;
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
|
@ -99,7 +97,7 @@ public class IngestRestartIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
checkPipelineExists.accept(pipelineIdWithoutScript);
|
checkPipelineExists.accept(pipelineIdWithoutScript);
|
||||||
checkPipelineExists.accept(pipelineIdWithScript);
|
checkPipelineExists.accept(pipelineIdWithScript);
|
||||||
|
|
||||||
|
@ -109,15 +107,13 @@ public class IngestRestartIT extends ESIntegTestCase {
|
||||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
||||||
() -> client().prepareIndex("index", "doc", "2")
|
() -> client().prepareIndex("index", "doc", "2")
|
||||||
.setSource("x", 0)
|
.setSource("x", 0)
|
||||||
.setPipeline(pipelineIdWithScript)
|
.setPipeline(pipelineIdWithScript)
|
||||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||||
.get());
|
.get());
|
||||||
assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type")));
|
assertThat(exception.getMessage(),
|
||||||
assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown")));
|
|
||||||
assertThat(exception.getRootCause().getMessage(),
|
|
||||||
equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
|
equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
|
||||||
"[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " +
|
"[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " +
|
||||||
"nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +
|
"nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +
|
||||||
|
|
|
@ -106,5 +106,5 @@ teardown:
|
||||||
id: 1
|
id: 1
|
||||||
pipeline: "outer"
|
pipeline: "outer"
|
||||||
body: {}
|
body: {}
|
||||||
- match: { error.root_cause.0.type: "exception" }
|
- match: { error.root_cause.0.type: "ingest_processor_exception" }
|
||||||
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
|
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
|
||||||
|
|
|
@ -348,7 +348,7 @@ teardown:
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
- length: { docs: 2 }
|
- length: { docs: 2 }
|
||||||
- match: { docs.0.error.type: "exception" }
|
- match: { docs.0.error.type: "illegal_argument_exception" }
|
||||||
- match: { docs.1.doc._source.foo: "BAR" }
|
- match: { docs.1.doc._source.foo: "BAR" }
|
||||||
- length: { docs.1.doc._ingest: 1 }
|
- length: { docs.1.doc._ingest: 1 }
|
||||||
- is_true: docs.1.doc._ingest.timestamp
|
- is_true: docs.1.doc._ingest.timestamp
|
||||||
|
@ -668,8 +668,7 @@ teardown:
|
||||||
}
|
}
|
||||||
- length: { docs: 1 }
|
- length: { docs: 1 }
|
||||||
- length: { docs.0.processor_results: 1 }
|
- length: { docs.0.processor_results: 1 }
|
||||||
- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
|
- match: { docs.0.processor_results.0.error.reason: "Cycle detected for pipeline: outer" }
|
||||||
- match: { docs.0.processor_results.0.error.caused_by.reason: "Cycle detected for pipeline: outer" }
|
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
|
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
|
||||||
|
|
|
@ -1036,7 +1036,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
||||||
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class,
|
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class,
|
||||||
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new,
|
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new,
|
||||||
156,
|
156,
|
||||||
Version.V_7_5_0);
|
Version.V_7_5_0),
|
||||||
|
INGEST_PROCESSOR_EXCEPTION(
|
||||||
|
org.elasticsearch.ingest.IngestProcessorException.class,
|
||||||
|
org.elasticsearch.ingest.IngestProcessorException::new,
|
||||||
|
157,
|
||||||
|
Version.V_7_6_0);
|
||||||
|
|
||||||
final Class<? extends ElasticsearchException> exceptionClass;
|
final Class<? extends ElasticsearchException> exceptionClass;
|
||||||
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
|
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
|
||||||
|
|
|
@ -143,7 +143,7 @@ public class CompoundProcessor implements Processor {
|
||||||
if (ignoreFailure) {
|
if (ignoreFailure) {
|
||||||
innerExecute(currentProcessor + 1, ingestDocument, handler);
|
innerExecute(currentProcessor + 1, ingestDocument, handler);
|
||||||
} else {
|
} else {
|
||||||
ElasticsearchException compoundProcessorException =
|
IngestProcessorException compoundProcessorException =
|
||||||
newCompoundProcessorException(e, processor.getType(), processor.getTag());
|
newCompoundProcessorException(e, processor.getType(), processor.getTag());
|
||||||
if (onFailureProcessors.isEmpty()) {
|
if (onFailureProcessors.isEmpty()) {
|
||||||
handler.accept(null, compoundProcessorException);
|
handler.accept(null, compoundProcessorException);
|
||||||
|
@ -207,12 +207,12 @@ public class CompoundProcessor implements Processor {
|
||||||
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
|
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
|
private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
|
||||||
if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) {
|
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
|
||||||
return (ElasticsearchException) e;
|
return (IngestProcessorException) e;
|
||||||
}
|
}
|
||||||
|
|
||||||
ElasticsearchException exception = new ElasticsearchException(e);
|
IngestProcessorException exception = new IngestProcessorException(e);
|
||||||
|
|
||||||
if (processorType != null) {
|
if (processorType != null) {
|
||||||
exception.addHeader("processor_type", processorType);
|
exception.addHeader("processor_type", processorType);
|
||||||
|
@ -223,4 +223,5 @@ public class CompoundProcessor implements Processor {
|
||||||
|
|
||||||
return exception;
|
return exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchWrapperException;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A dedicated wrapper for exceptions encountered executing an ingest processor. The wrapper is needed as we currently only unwrap causes
|
||||||
|
* for instances of {@link ElasticsearchWrapperException}.
|
||||||
|
*/
|
||||||
|
public class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException {
|
||||||
|
|
||||||
|
IngestProcessorException(final Exception cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IngestProcessorException(final StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -71,6 +71,7 @@ import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
|
||||||
import org.elasticsearch.indices.IndexTemplateMissingException;
|
import org.elasticsearch.indices.IndexTemplateMissingException;
|
||||||
import org.elasticsearch.indices.InvalidIndexTemplateException;
|
import org.elasticsearch.indices.InvalidIndexTemplateException;
|
||||||
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
|
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
|
||||||
|
import org.elasticsearch.ingest.IngestProcessorException;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
|
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
|
||||||
|
@ -817,6 +818,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
||||||
ids.put(154, RetentionLeaseNotFoundException.class);
|
ids.put(154, RetentionLeaseNotFoundException.class);
|
||||||
ids.put(155, ShardNotInPrimaryModeException.class);
|
ids.put(155, ShardNotInPrimaryModeException.class);
|
||||||
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
|
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
|
||||||
|
ids.put(157, IngestProcessorException.class);
|
||||||
|
|
||||||
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
|
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
|
||||||
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
|
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
|
||||||
|
|
|
@ -19,14 +19,14 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.ingest;
|
package org.elasticsearch.action.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ingest.CompoundProcessor;
|
||||||
import org.elasticsearch.ingest.DropProcessor;
|
import org.elasticsearch.ingest.DropProcessor;
|
||||||
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.IngestProcessorException;
|
||||||
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.TestProcessor;
|
import org.elasticsearch.ingest.TestProcessor;
|
||||||
import org.elasticsearch.ingest.CompoundProcessor;
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
|
@ -259,7 +259,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
||||||
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
|
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
|
||||||
assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
|
assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
|
||||||
Exception exception = simulateDocumentBaseResult.getFailure();
|
Exception exception = simulateDocumentBaseResult.getFailure();
|
||||||
assertThat(exception, instanceOf(ElasticsearchException.class));
|
assertThat(exception, instanceOf(IngestProcessorException.class));
|
||||||
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: processor failed"));
|
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: processor failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.logging.log4j.Level;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
import org.elasticsearch.ResourceNotFoundException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
@ -843,7 +842,7 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
||||||
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
||||||
verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
|
verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
|
||||||
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
|
||||||
import org.elasticsearch.action.ingest.SimulateProcessorResult;
|
import org.elasticsearch.action.ingest.SimulateProcessorResult;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.script.MockScriptEngine;
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
|
@ -86,7 +85,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
|
||||||
|
|
||||||
Exception[] holder = new Exception[1];
|
Exception[] holder = new Exception[1];
|
||||||
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
|
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
|
||||||
assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
|
assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
|
||||||
|
|
||||||
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
|
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
|
||||||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||||
|
@ -456,7 +455,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
|
||||||
|
|
||||||
Exception[] holder = new Exception[1];
|
Exception[] holder = new Exception[1];
|
||||||
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
|
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
|
||||||
ElasticsearchException exception = (ElasticsearchException) holder[0];
|
IngestProcessorException exception = (IngestProcessorException) holder[0];
|
||||||
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
|
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
|
||||||
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
|
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue