diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index 74f1d66b847..ca841b0d2b2 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -445,6 +445,20 @@ An example that adds the parsed date to the `timestamp` field based on the `init } -------------------------------------------------- +==== Fail processor +The Fail Processor is used to raise an exception. This is useful for when +a user expects a pipeline to fail and wishes to relay a specific message +to the requester. + +[source,js] +-------------------------------------------------- +{ + "fail": { + "message": "an error message" + } +} +-------------------------------------------------- + === Accessing data in pipelines Processors in pipelines have read and write access to documents that pass through the pipeline. diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 993f6e2fa91..c6356867bd9 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -165,8 +165,7 @@ public final class IngestDocument { * @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. */ public void removeField(TemplateService.Template fieldPathTemplate) { - Map model = createTemplateModel(); - removeField(fieldPathTemplate.execute(model)); + removeField(renderTemplate(fieldPathTemplate)); } /** @@ -422,6 +421,10 @@ public final class IngestDocument { throw new IllegalArgumentException("field [" + path + "] of type [" + object.getClass().getName() + "] cannot be cast to [" + clazz.getName() + "]"); } + public String renderTemplate(TemplateService.Template template) { + return template.execute(createTemplateModel()); + } + private Map createTemplateModel() { Map model = new HashMap<>(sourceAndMetadata); model.put(SOURCE_KEY, sourceAndMetadata); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/fail/FailProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/fail/FailProcessor.java new file mode 100644 index 00000000000..e4b8e23fd8f --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/fail/FailProcessor.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.fail; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.ingest.processor.ConfigurationUtils; +import org.elasticsearch.ingest.processor.Processor; + +import java.util.Map; + +/** + * Processor that raises a runtime exception with a provided + * error message. + */ +public class FailProcessor implements Processor { + + public static final String TYPE = "fail"; + + private final TemplateService.Template message; + + FailProcessor(TemplateService.Template message) { + this.message = message; + } + + public TemplateService.Template getMessage() { + return message; + } + + @Override + public void execute(IngestDocument document) { + throw new FailProcessorException(document.renderTemplate(message)); + } + + @Override + public String getType() { + return TYPE; + } + + public static class Factory implements Processor.Factory { + + private final TemplateService templateService; + + public Factory(TemplateService templateService) { + this.templateService = templateService; + } + + @Override + public FailProcessor create(Map config) throws Exception { + String message = ConfigurationUtils.readStringProperty(config, "message"); + return new FailProcessor(templateService.compile(message)); + } + } +} + diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/fail/FailProcessorException.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/fail/FailProcessorException.java new file mode 100644 index 00000000000..8c451c63864 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/fail/FailProcessorException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.fail; + +/** + * Exception class thrown by {@link FailProcessor}. + */ +public class FailProcessorException extends RuntimeException { + + public FailProcessorException(String message) { + super(message); + } +} + diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index bc083938ec7..bb987e5fac1 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.ingest.processor.append.AppendProcessor; import org.elasticsearch.ingest.processor.convert.ConvertProcessor; import org.elasticsearch.ingest.processor.date.DateProcessor; +import org.elasticsearch.ingest.processor.fail.FailProcessor; import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor; import org.elasticsearch.ingest.processor.grok.GrokProcessor; import org.elasticsearch.ingest.processor.gsub.GsubProcessor; @@ -72,6 +73,7 @@ public class IngestModule extends AbstractModule { addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); + addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); for (Map.Entry entry : processorFactoryProviders.entrySet()) { diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/fail/FailProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/fail/FailProcessorFactoryTests.java new file mode 100644 index 00000000000..346409c72fb --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/fail/FailProcessorFactoryTests.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.fail; + +import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class FailProcessorFactoryTests extends ESTestCase { + + private FailProcessor.Factory factory; + + @Before + public void init() { + factory = new FailProcessor.Factory(TestTemplateService.instance()); + } + + public void testCreate() throws Exception { + Map config = new HashMap<>(); + config.put("message", "error"); + FailProcessor failProcessor = factory.create(config); + assertThat(failProcessor.getMessage().execute(Collections.emptyMap()), equalTo("error")); + } + + public void testCreateMissingMessageField() throws Exception { + Map config = new HashMap<>(); + try { + factory.create(config); + fail("factory create should have failed"); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("required property [message] is missing")); + } + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/fail/FailProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/fail/FailProcessorTests.java new file mode 100644 index 00000000000..51e33034a4b --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/fail/FailProcessorTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.fail; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class FailProcessorTests extends ESTestCase { + + public void test() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String message = randomAsciiOfLength(10); + Processor processor = new FailProcessor(new TestTemplateService.MockTemplate(message)); + try { + processor.execute(ingestDocument); + fail("fail processor should throw an exception"); + } catch (FailProcessorException e) { + assertThat(e.getMessage(), equalTo(message)); + } + } +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/90_fail.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/90_fail.yaml new file mode 100644 index 00000000000..d416e84100a --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/90_fail.yaml @@ -0,0 +1,68 @@ +--- +"Test Fail Processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "fail" : { + "message" : "error_message" + } + } + ] + } + - match: { _id: "my_pipeline" } + + - do: + catch: request + ingest.index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {} + +--- +"Test fail with on_failure": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "fail" : { + "message" : "error", + "on_failure" : [ + { + "set" : { + "field" : "error_message", + "value" : "fail_processor_ran" + } + } + ] + } + } + ] + } + - match: { _id: "my_pipeline" } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.error_message: "fail_processor_ran" } +