diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index b4871e213f1..1b5cc65a5bc 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -18,11 +18,13 @@ */ package org.elasticsearch.ingest.common; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptPlugin; @@ -31,12 +33,14 @@ import org.elasticsearch.test.InternalTestCluster; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; // Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor // ends up being copied into this test. @@ -56,10 +60,52 @@ public class IngestRestartIT extends ESIntegTestCase { public static class CustomScriptPlugin extends MockScriptPlugin { @Override protected Map, Object>> pluginScripts() { - return Collections.singletonMap("my_script", ctx -> { + Map, Object>> pluginScripts = new HashMap<>(); + pluginScripts.put("my_script", ctx -> { ctx.put("z", 0); return null; }); + pluginScripts.put("throwing_script", ctx -> { + throw new RuntimeException("this script always fails"); + }); + return pluginScripts; + } + } + + public void testFailureInConditionalProcessor() { + internalCluster().ensureAtLeastNumDataNodes(1); + internalCluster().startMasterOnlyNode(); + final String pipelineId = "foo"; + client().admin().cluster().preparePutPipeline(pipelineId, + new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"set\" : {\"field\": \"any_field\", \"value\": \"any_value\"}},\n" + + " {\"set\" : {" + "" + + " \"if\" : " + "{\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"throwing_script\"}," + + " \"field\": \"any_field2\"," + + " \"value\": \"any_value2\"}" + + " }\n" + + " ]\n" + + "}"), XContentType.JSON).get(); + + Exception e = expectThrows( + Exception.class, + () -> + client().prepareIndex("index", "doc").setId("1") + .setSource("x", 0) + .setPipeline(pipelineId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + ); + assertTrue(e.getMessage().contains("this script always fails")); + + NodesStatsResponse r = client().admin().cluster().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get(); + int nodeCount = r.getNodes().size(); + for (int k = 0; k < nodeCount; k++) { + List stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId); + for (IngestStats.ProcessorStat st : stats) { + assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L)); + } } } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index e6f92a87178..72d50ee663b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -76,7 +76,15 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { - if (evaluate(ingestDocument)) { + final boolean matches; + try { + matches = evaluate(ingestDocument); + } catch (Exception e) { + handler.accept(null, e); + return; + } + + if (matches) { final long startTimeInNanos = relativeTimeProvider.getAsLong(); metric.preIngest(); processor.execute(ingestDocument, (result, e) -> { diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 671670212ab..d0b96c99747 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -47,12 +47,14 @@ public interface Processor { * otherwise just overwrite {@link #execute(IngestDocument)}. */ default void execute(IngestDocument ingestDocument, BiConsumer handler) { + final IngestDocument result; try { - IngestDocument result = execute(ingestDocument); - handler.accept(result, null); + result = execute(ingestDocument); } catch (Exception e) { handler.accept(null, e); + return; } + handler.accept(result, null); } /**