[7.x] Handle errors when evaluating if conditions in processors (#52892)
This commit is contained in:
parent
8785f57dfe
commit
3c8b46a8c1
|
@ -18,11 +18,13 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.ingest.common;
|
package org.elasticsearch.ingest.common;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.ingest.IngestStats;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.script.MockScriptEngine;
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
import org.elasticsearch.script.MockScriptPlugin;
|
import org.elasticsearch.script.MockScriptPlugin;
|
||||||
|
@ -31,12 +33,14 @@ import org.elasticsearch.test.InternalTestCluster;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
|
||||||
// Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
|
// Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
|
||||||
// ends up being copied into this test.
|
// ends up being copied into this test.
|
||||||
|
@ -56,10 +60,52 @@ public class IngestRestartIT extends ESIntegTestCase {
|
||||||
public static class CustomScriptPlugin extends MockScriptPlugin {
|
public static class CustomScriptPlugin extends MockScriptPlugin {
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||||
return Collections.singletonMap("my_script", ctx -> {
|
Map<String, Function<Map<String, Object>, Object>> pluginScripts = new HashMap<>();
|
||||||
|
pluginScripts.put("my_script", ctx -> {
|
||||||
ctx.put("z", 0);
|
ctx.put("z", 0);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
pluginScripts.put("throwing_script", ctx -> {
|
||||||
|
throw new RuntimeException("this script always fails");
|
||||||
|
});
|
||||||
|
return pluginScripts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFailureInConditionalProcessor() {
|
||||||
|
internalCluster().ensureAtLeastNumDataNodes(1);
|
||||||
|
internalCluster().startMasterOnlyNode();
|
||||||
|
final String pipelineId = "foo";
|
||||||
|
client().admin().cluster().preparePutPipeline(pipelineId,
|
||||||
|
new BytesArray("{\n" +
|
||||||
|
" \"processors\" : [\n" +
|
||||||
|
" {\"set\" : {\"field\": \"any_field\", \"value\": \"any_value\"}},\n" +
|
||||||
|
" {\"set\" : {" + "" +
|
||||||
|
" \"if\" : " + "{\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"throwing_script\"}," +
|
||||||
|
" \"field\": \"any_field2\"," +
|
||||||
|
" \"value\": \"any_value2\"}" +
|
||||||
|
" }\n" +
|
||||||
|
" ]\n" +
|
||||||
|
"}"), XContentType.JSON).get();
|
||||||
|
|
||||||
|
Exception e = expectThrows(
|
||||||
|
Exception.class,
|
||||||
|
() ->
|
||||||
|
client().prepareIndex("index", "doc").setId("1")
|
||||||
|
.setSource("x", 0)
|
||||||
|
.setPipeline(pipelineId)
|
||||||
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||||
|
.get()
|
||||||
|
);
|
||||||
|
assertTrue(e.getMessage().contains("this script always fails"));
|
||||||
|
|
||||||
|
NodesStatsResponse r = client().admin().cluster().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get();
|
||||||
|
int nodeCount = r.getNodes().size();
|
||||||
|
for (int k = 0; k < nodeCount; k++) {
|
||||||
|
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
|
||||||
|
for (IngestStats.ProcessorStat st : stats) {
|
||||||
|
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,15 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
||||||
if (evaluate(ingestDocument)) {
|
final boolean matches;
|
||||||
|
try {
|
||||||
|
matches = evaluate(ingestDocument);
|
||||||
|
} catch (Exception e) {
|
||||||
|
handler.accept(null, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (matches) {
|
||||||
final long startTimeInNanos = relativeTimeProvider.getAsLong();
|
final long startTimeInNanos = relativeTimeProvider.getAsLong();
|
||||||
metric.preIngest();
|
metric.preIngest();
|
||||||
processor.execute(ingestDocument, (result, e) -> {
|
processor.execute(ingestDocument, (result, e) -> {
|
||||||
|
|
|
@ -47,12 +47,14 @@ public interface Processor {
|
||||||
* otherwise just overwrite {@link #execute(IngestDocument)}.
|
* otherwise just overwrite {@link #execute(IngestDocument)}.
|
||||||
*/
|
*/
|
||||||
default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
||||||
|
final IngestDocument result;
|
||||||
try {
|
try {
|
||||||
IngestDocument result = execute(ingestDocument);
|
result = execute(ingestDocument);
|
||||||
handler.accept(result, null);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
handler.accept(null, e);
|
handler.accept(null, e);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
handler.accept(result, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue