From fca442f4d1812cb770605ee5e5e66aedfb02d0bb Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 25 Jan 2016 16:08:27 -0800 Subject: [PATCH] Introduce Pipeline Factory Error Responses in Node Ingest When there is an exception thrown during pipeline creation within Rest calls (in put pipeline, and simulate) We now return a structured error response to the user with details around which processor's configuration is the cause of the issue, or which configuration property is misconfigured, etc. --- .../ingest/SimulatePipelineRequest.java | 20 ++-- .../ingest/SimulatePipelineResponse.java | 78 ++++++++++----- .../SimulatePipelineTransportAction.java | 5 + .../action/ingest/WritePipelineResponse.java | 26 ++++- .../WritePipelineResponseRestListener.java | 41 ++++++++ .../elasticsearch/ingest/PipelineStore.java | 31 +++--- .../ingest/core/AbstractProcessorFactory.java | 2 +- .../ingest/core/ConfigurationUtils.java | 74 +++++++------- .../elasticsearch/ingest/core/Pipeline.java | 37 ++++--- .../ingest/core/PipelineFactoryError.java | 96 +++++++++++++++++++ .../ingest/core/PipelineFactoryResult.java | 43 +++++++++ .../elasticsearch/ingest/core/Processor.java | 2 + .../processor/AbstractStringProcessor.java | 7 +- .../ingest/processor/AppendProcessor.java | 4 +- .../ConfigurationPropertyException.java | 53 ++++++++++ .../ingest/processor/ConvertProcessor.java | 4 +- .../ingest/processor/DateProcessor.java | 10 +- .../ingest/processor/DeDotProcessor.java | 2 +- .../ingest/processor/FailProcessor.java | 2 +- .../ingest/processor/GsubProcessor.java | 6 +- .../ingest/processor/JoinProcessor.java | 4 +- .../ingest/processor/LowercaseProcessor.java | 5 + .../ingest/processor/RemoveProcessor.java | 2 +- .../ingest/processor/RenameProcessor.java | 4 +- .../ingest/processor/SetProcessor.java | 4 +- .../ingest/processor/SplitProcessor.java | 4 +- .../ingest/processor/TrimProcessor.java | 5 + .../ingest/processor/UppercaseProcessor.java | 5 + .../action/ingest/RestPutPipelineAction.java | 9 +- .../ingest/RestSimulatePipelineAction.java | 3 +- .../ingest/WritePipelineResponseTests.java | 61 ++++++++++++ .../elasticsearch/ingest/IngestClientIT.java | 34 +++++++ .../ingest/PipelineStoreTests.java | 41 +++++++- .../ingest/core/ConfigurationUtilsTests.java | 14 +-- .../ingest/core/PipelineFactoryTests.java | 15 ++- .../AppendProcessorFactoryTests.java | 13 +-- .../ConvertProcessorFactoryTests.java | 9 +- .../processor/DateProcessorFactoryTests.java | 13 +-- .../processor/FailProcessorFactoryTests.java | 5 +- .../processor/GsubProcessorFactoryTests.java | 13 +-- .../processor/JoinProcessorFactoryTests.java | 9 +- .../LowercaseProcessorFactoryTests.java | 5 +- .../RemoveProcessorFactoryTests.java | 5 +- .../RenameProcessorFactoryTests.java | 9 +- .../processor/SetProcessorFactoryTests.java | 13 +-- .../processor/SplitProcessorFactoryTests.java | 9 +- .../processor/TrimProcessorFactoryTests.java | 5 +- .../UppercaseProcessorFactoryTests.java | 5 +- .../ingest/grok/GrokProcessor.java | 6 +- .../grok/GrokProcessorFactoryTests.java | 28 ++++++ .../ingest/geoip/GeoIpProcessor.java | 14 +-- .../geoip/GeoIpProcessorFactoryTests.java | 14 +-- .../rest-api-spec/test/ingest/10_crud.yaml | 23 +++++ .../test/ingest/40_simulate.yaml | 53 ++++++++-- 54 files changed, 801 insertions(+), 203 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponseRestListener.java create mode 100644 core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryError.java create mode 100644 core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryResult.java create mode 100644 core/src/main/java/org/elasticsearch/ingest/processor/ConfigurationPropertyException.java create mode 100644 core/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index af18ac5db46..847de99f372 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -139,24 +139,24 @@ public class SimulatePipelineRequest extends ActionRequest config, boolean verbose, PipelineStore pipelineStore) throws Exception { - Map pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE); + Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } private static List parseDocs(Map config) { - List> docs = ConfigurationUtils.readList(config, Fields.DOCS); + List> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS); List ingestDocumentList = new ArrayList<>(); for (Map dataMap : docs) { - Map document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE); - IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, MetaData.INDEX.getFieldName(), "_index"), - ConfigurationUtils.readStringProperty(dataMap, MetaData.TYPE.getFieldName(), "_type"), - ConfigurationUtils.readStringProperty(dataMap, MetaData.ID.getFieldName(), "_id"), - ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.ROUTING.getFieldName()), - ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.PARENT.getFieldName()), - ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TIMESTAMP.getFieldName()), - ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TTL.getFieldName()), + Map document = ConfigurationUtils.readMap(null, null, dataMap, Fields.SOURCE); + IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.INDEX.getFieldName(), "_index"), + ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.TYPE.getFieldName(), "_type"), + ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"), + ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.TIMESTAMP.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.TTL.getFieldName()), document); ingestDocumentList.add(ingestDocument); } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java index c7c0822f04a..4337d0ee165 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java @@ -22,24 +22,31 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.StatusToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.ingest.core.PipelineFactoryError; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -public class SimulatePipelineResponse extends ActionResponse implements ToXContent { +public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent { private String pipelineId; private boolean verbose; private List results; + private PipelineFactoryError error; public SimulatePipelineResponse() { } + public SimulatePipelineResponse(PipelineFactoryError error) { + this.error = error; + } + public SimulatePipelineResponse(String pipelineId, boolean verbose, List responses) { this.pipelineId = pipelineId; this.verbose = verbose; @@ -58,42 +65,69 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte return verbose; } + public boolean isError() { + return error != null; + } + + @Override + public RestStatus status() { + if (isError()) { + return RestStatus.BAD_REQUEST; + } + return RestStatus.OK; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(pipelineId); - out.writeBoolean(verbose); - out.writeVInt(results.size()); - for (SimulateDocumentResult response : results) { - response.writeTo(out); + out.writeBoolean(isError()); + if (isError()) { + error.writeTo(out); + } else { + out.writeString(pipelineId); + out.writeBoolean(verbose); + out.writeVInt(results.size()); + for (SimulateDocumentResult response : results) { + response.writeTo(out); + } } } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - this.pipelineId = in.readString(); - boolean verbose = in.readBoolean(); - int responsesLength = in.readVInt(); - results = new ArrayList<>(); - for (int i = 0; i < responsesLength; i++) { - SimulateDocumentResult simulateDocumentResult; - if (verbose) { - simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in); - } else { - simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in); + boolean isError = in.readBoolean(); + if (isError) { + error = new PipelineFactoryError(); + error.readFrom(in); + } else { + this.pipelineId = in.readString(); + boolean verbose = in.readBoolean(); + int responsesLength = in.readVInt(); + results = new ArrayList<>(); + for (int i = 0; i < responsesLength; i++) { + SimulateDocumentResult simulateDocumentResult; + if (verbose) { + simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in); + } else { + simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in); + } + results.add(simulateDocumentResult); } - results.add(simulateDocumentResult); } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startArray(Fields.DOCUMENTS); - for (SimulateDocumentResult response : results) { - response.toXContent(builder, params); + if (isError()) { + error.toXContent(builder, params); + } else { + builder.startArray(Fields.DOCUMENTS); + for (SimulateDocumentResult response : results) { + response.toXContent(builder, params); + } + builder.endArray(); } - builder.endArray(); return builder; } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 5640d7c1c8c..3d6586315ad 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.core.PipelineFactoryError; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -56,6 +58,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction { + + public WritePipelineResponseRestListener(RestChannel channel) { + super(channel); + } + + @Override + protected void addCustomFields(XContentBuilder builder, WritePipelineResponse response) throws IOException { + if (!response.isAcknowledged()) { + response.getError().toXContent(builder, null); + } + } +} + diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 805f1e417ec..21128a94b65 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -36,8 +36,10 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.ingest.core.Pipeline; +import org.elasticsearch.ingest.core.PipelineFactoryError; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import org.elasticsearch.script.ScriptService; import java.io.Closeable; @@ -101,7 +103,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust Map pipelines = new HashMap<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { - pipelines.put(pipeline.getId(), constructPipeline(pipeline.getId(), pipeline.getConfigAsMap())); + pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactoryRegistry)); } catch (Exception e) { throw new RuntimeException(e); } @@ -148,16 +150,14 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust /** * Stores the specified pipeline definition in the request. - * - * @throws IllegalArgumentException If the pipeline holds incorrect configuration */ - public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener listener) throws IllegalArgumentException { - try { - // validates the pipeline and processor configuration before submitting a cluster update task: - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2(); - constructPipeline(request.getId(), pipelineConfig); - } catch (Exception e) { - throw new IllegalArgumentException("Invalid pipeline configuration", e); + public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener listener) { + // validates the pipeline and processor configuration before submitting a cluster update task: + Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2(); + WritePipelineResponse response = validatePipelineResponse(request.getId(), pipelineConfig); + if (response != null) { + listener.onResponse(response); + return; } clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask(request, listener) { @@ -235,8 +235,15 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust return result; } - private Pipeline constructPipeline(String id, Map config) throws Exception { - return factory.create(id, config, processorFactoryRegistry); + WritePipelineResponse validatePipelineResponse(String id, Map config) { + try { + factory.create(id, config, processorFactoryRegistry); + return null; + } catch (ConfigurationPropertyException e) { + return new WritePipelineResponse(new PipelineFactoryError(e)); + } catch (Exception e) { + return new WritePipelineResponse(new PipelineFactoryError(e.getMessage())); + } } } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/AbstractProcessorFactory.java b/core/src/main/java/org/elasticsearch/ingest/core/AbstractProcessorFactory.java index 1082461845e..323344f8f41 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/AbstractProcessorFactory.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/AbstractProcessorFactory.java @@ -31,7 +31,7 @@ public abstract class AbstractProcessorFactory

implements P @Override public P create(Map config) throws Exception { - String tag = ConfigurationUtils.readOptionalStringProperty(config, TAG_KEY); + String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); return doCreate(tag, config); } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java index c6204166908..69adc0f9492 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/ConfigurationUtils.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest.core; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; + import java.util.List; import java.util.Map; @@ -30,133 +32,133 @@ public final class ConfigurationUtils { /** * Returns and removes the specified optional property from the specified configuration map. * - * If the property value isn't of type string a {@link IllegalArgumentException} is thrown. + * If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown. */ - public static String readOptionalStringProperty(Map configuration, String propertyName) { + public static String readOptionalStringProperty(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); - return readString(propertyName, value); + return readString(processorType, processorTag, propertyName, value); } /** * Returns and removes the specified property from the specified configuration map. * - * If the property value isn't of type string an {@link IllegalArgumentException} is thrown. - * If the property is missing an {@link IllegalArgumentException} is thrown + * If the property value isn't of type string an {@link ConfigurationPropertyException} is thrown. + * If the property is missing an {@link ConfigurationPropertyException} is thrown */ - public static String readStringProperty(Map configuration, String propertyName) { - return readStringProperty(configuration, propertyName, null); + public static String readStringProperty(String processorType, String processorTag, Map configuration, String propertyName) { + return readStringProperty(processorType, processorTag, configuration, propertyName, null); } /** * Returns and removes the specified property from the specified configuration map. * - * If the property value isn't of type string a {@link IllegalArgumentException} is thrown. - * If the property is missing and no default value has been specified a {@link IllegalArgumentException} is thrown + * If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown. + * If the property is missing and no default value has been specified a {@link ConfigurationPropertyException} is thrown */ - public static String readStringProperty(Map configuration, String propertyName, String defaultValue) { + public static String readStringProperty(String processorType, String processorTag, Map configuration, String propertyName, String defaultValue) { Object value = configuration.remove(propertyName); if (value == null && defaultValue != null) { return defaultValue; } else if (value == null) { - throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing"); } - return readString(propertyName, value); + return readString(processorType, processorTag, propertyName, value); } - private static String readString(String propertyName, Object value) { + private static String readString(String processorType, String processorTag, String propertyName, Object value) { if (value == null) { return null; } if (value instanceof String) { return (String) value; } - throw new IllegalArgumentException("property [" + propertyName + "] isn't a string, but of type [" + value.getClass().getName() + "]"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]"); } /** * Returns and removes the specified property of type list from the specified configuration map. * - * If the property value isn't of type list an {@link IllegalArgumentException} is thrown. + * If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown. */ - public static List readOptionalList(Map configuration, String propertyName) { + public static List readOptionalList(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { return null; } - return readList(propertyName, value); + return readList(processorType, processorTag, propertyName, value); } /** * Returns and removes the specified property of type list from the specified configuration map. * - * If the property value isn't of type list an {@link IllegalArgumentException} is thrown. - * If the property is missing an {@link IllegalArgumentException} is thrown + * If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown. + * If the property is missing an {@link ConfigurationPropertyException} is thrown */ - public static List readList(Map configuration, String propertyName) { + public static List readList(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { - throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing"); } - return readList(propertyName, value); + return readList(processorType, processorTag, propertyName, value); } - private static List readList(String propertyName, Object value) { + private static List readList(String processorType, String processorTag, String propertyName, Object value) { if (value instanceof List) { @SuppressWarnings("unchecked") List stringList = (List) value; return stringList; } else { - throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a list, but of type [" + value.getClass().getName() + "]"); } } /** * Returns and removes the specified property of type map from the specified configuration map. * - * If the property value isn't of type map an {@link IllegalArgumentException} is thrown. - * If the property is missing an {@link IllegalArgumentException} is thrown + * If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown. + * If the property is missing an {@link ConfigurationPropertyException} is thrown */ - public static Map readMap(Map configuration, String propertyName) { + public static Map readMap(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { - throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing"); } - return readMap(propertyName, value); + return readMap(processorType, processorTag, propertyName, value); } /** * Returns and removes the specified property of type map from the specified configuration map. * - * If the property value isn't of type map an {@link IllegalArgumentException} is thrown. + * If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown. */ - public static Map readOptionalMap(Map configuration, String propertyName) { + public static Map readOptionalMap(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { return null; } - return readMap(propertyName, value); + return readMap(processorType, processorTag, propertyName, value); } - private static Map readMap(String propertyName, Object value) { + private static Map readMap(String processorType, String processorTag, String propertyName, Object value) { if (value instanceof Map) { @SuppressWarnings("unchecked") Map map = (Map) value; return map; } else { - throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a map, but of type [" + value.getClass().getName() + "]"); } } /** * Returns and removes the specified property as an {@link Object} from the specified configuration map. */ - public static Object readObject(Map configuration, String propertyName) { + public static Object readObject(String processorType, String processorTag, Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { - throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); + throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing"); } return value; } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java index 68ba8da4855..5c654fbce21 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest.core; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -82,19 +84,20 @@ public final class Pipeline { public final static class Factory { - public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(config, DESCRIPTION_KEY); - List processors = readProcessors(PROCESSORS_KEY, processorRegistry, config); - List onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config); + public Pipeline create(String id, Map config, Map processorRegistry) throws ConfigurationPropertyException { + String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); + List>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); + List processors = readProcessorConfigs(processorConfigs, processorRegistry); + List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); + List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); if (config.isEmpty() == false) { - throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + throw new ConfigurationPropertyException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); } CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); return new Pipeline(id, description, compoundProcessor); } - private List readProcessors(String fieldName, Map processorRegistry, Map config) throws Exception { - List>> processorConfigs = ConfigurationUtils.readOptionalList(config, fieldName); + private List readProcessorConfigs(List>> processorConfigs, Map processorRegistry) throws ConfigurationPropertyException { List processors = new ArrayList<>(); if (processorConfigs != null) { for (Map> processorConfigWithKey : processorConfigs) { @@ -107,20 +110,28 @@ public final class Pipeline { return processors; } - private Processor readProcessor(Map processorRegistry, String type, Map config) throws Exception { + private Processor readProcessor(Map processorRegistry, String type, Map config) throws ConfigurationPropertyException { Processor.Factory factory = processorRegistry.get(type); if (factory != null) { - List onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config); - Processor processor = factory.create(config); - if (config.isEmpty() == false) { - throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); + List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); + Processor processor; + try { + processor = factory.create(config); + } catch (ConfigurationPropertyException e) { + throw e; + } catch (Exception e) { + throw new ConfigurationPropertyException(e.getMessage()); + } + if (!config.isEmpty()) { + throw new ConfigurationPropertyException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); } if (onFailureProcessors.isEmpty()) { return processor; } return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); } - throw new IllegalArgumentException("No processor type exists with name [" + type + "]"); + throw new ConfigurationPropertyException("No processor type exists with name [" + type + "]"); } } } diff --git a/core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryError.java b/core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryError.java new file mode 100644 index 00000000000..b987e1ee266 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryError.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.core; + + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; + +import java.io.IOException; + +public class PipelineFactoryError implements Streamable, ToXContent { + private String reason; + private String processorType; + private String processorTag; + private String processorPropertyName; + + public PipelineFactoryError() { + + } + + public PipelineFactoryError(ConfigurationPropertyException e) { + this.reason = e.getMessage(); + this.processorType = e.getProcessorType(); + this.processorTag = e.getProcessorTag(); + this.processorPropertyName = e.getPropertyName(); + } + + public PipelineFactoryError(String reason) { + this.reason = "Constructing Pipeline failed:" + reason; + } + + public String getReason() { + return reason; + } + + public String getProcessorTag() { + return processorTag; + } + + public String getProcessorPropertyName() { + return processorPropertyName; + } + + public String getProcessorType() { + return processorType; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + reason = in.readString(); + processorType = in.readOptionalString(); + processorTag = in.readOptionalString(); + processorPropertyName = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(reason); + out.writeOptionalString(processorType); + out.writeOptionalString(processorTag); + out.writeOptionalString(processorPropertyName); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("error"); + builder.field("type", processorType); + builder.field("tag", processorTag); + builder.field("reason", reason); + builder.field("property_name", processorPropertyName); + builder.endObject(); + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryResult.java b/core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryResult.java new file mode 100644 index 00000000000..ab284981b33 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/core/PipelineFactoryResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.core; + +public class PipelineFactoryResult { + private final Pipeline pipeline; + private final PipelineFactoryError error; + + public PipelineFactoryResult(Pipeline pipeline) { + this.pipeline = pipeline; + this.error = null; + } + + public PipelineFactoryResult(PipelineFactoryError error) { + this.error = error; + this.pipeline = null; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public PipelineFactoryError getError() { + return error; + } +} diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Processor.java b/core/src/main/java/org/elasticsearch/ingest/core/Processor.java index f178051b751..28049983692 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/Processor.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/Processor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.core; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; + import java.util.Map; /** diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java index 32e54765b18..6ae9f2d3526 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java @@ -55,10 +55,15 @@ public abstract class AbstractStringProcessor extends AbstractProcessor { protected abstract String process(String value); public static abstract class Factory extends AbstractProcessorFactory { + protected final String processorType; + + protected Factory(String processorType) { + this.processorType = processorType; + } @Override public T doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); + String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field"); return newProcessor(processorTag, field); } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java index deff384cf92..84f979083b4 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java @@ -74,8 +74,8 @@ public class AppendProcessor extends AbstractProcessor { @Override public AppendProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - Object value = ConfigurationUtils.readObject(config, "value"); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService)); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/ConfigurationPropertyException.java b/core/src/main/java/org/elasticsearch/ingest/processor/ConfigurationPropertyException.java new file mode 100644 index 00000000000..dbc35c9334f --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/processor/ConfigurationPropertyException.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +/** + * Exception class thrown by processor factories. + */ +public class ConfigurationPropertyException extends RuntimeException { + private String processorType; + private String processorTag; + private String propertyName; + + public ConfigurationPropertyException(String processorType, String processorTag, String propertyName, String message) { + super("[" + propertyName + "] " + message); + this.processorTag = processorTag; + this.processorType = processorType; + this.propertyName = propertyName; + } + + public ConfigurationPropertyException(String errorMessage) { + super(errorMessage); + } + + public String getPropertyName() { + return propertyName; + } + + public String getProcessorType() { + return processorType; + } + + public String getProcessorTag() { + return processorTag; + } +} + diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java index 5b6bacf2ed1..213e3ec2c78 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java @@ -137,8 +137,8 @@ public class ConvertProcessor extends AbstractProcessor { public static class Factory extends AbstractProcessorFactory { @Override public ConvertProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(config, "type")); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type")); return new ConvertProcessor(processorTag, field, convertType); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java index 9fc0378d774..4b08b42a735 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java @@ -112,11 +112,11 @@ public final class DateProcessor extends AbstractProcessor { @SuppressWarnings("unchecked") public DateProcessor doCreate(String processorTag, Map config) throws Exception { - String matchField = ConfigurationUtils.readStringProperty(config, "match_field"); - String targetField = ConfigurationUtils.readStringProperty(config, "target_field", DEFAULT_TARGET_FIELD); - String timezoneString = ConfigurationUtils.readOptionalStringProperty(config, "timezone"); + String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "match_field"); + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD); + String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone"); DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString); - String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale"); + String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "locale"); Locale locale = Locale.ENGLISH; if (localeString != null) { try { @@ -125,7 +125,7 @@ public final class DateProcessor extends AbstractProcessor { throw new IllegalArgumentException("Invalid language tag specified: " + localeString); } } - List matchFormats = ConfigurationUtils.readList(config, "match_formats"); + List matchFormats = ConfigurationUtils.readList(TYPE, processorTag, config, "match_formats"); return new DateProcessor(processorTag, timezone, locale, matchField, matchFormats, targetField); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java index b8f86616ffc..62063a49fd0 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/DeDotProcessor.java @@ -95,7 +95,7 @@ public class DeDotProcessor extends AbstractProcessor { @Override public DeDotProcessor doCreate(String processorTag, Map config) throws Exception { - String separator = ConfigurationUtils.readOptionalStringProperty(config, "separator"); + String separator = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "separator"); if (separator == null) { separator = DEFAULT_SEPARATOR; } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java index 76c7b3c40ea..86758de862c 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java @@ -66,7 +66,7 @@ public class FailProcessor extends AbstractProcessor { @Override public FailProcessor doCreate(String processorTag, Map config) throws Exception { - String message = ConfigurationUtils.readStringProperty(config, "message"); + String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message"); return new FailProcessor(processorTag, templateService.compile(message)); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java index 0ec7fba84f2..1118ed6b956 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java @@ -79,9 +79,9 @@ public class GsubProcessor extends AbstractProcessor { public static class Factory extends AbstractProcessorFactory { @Override public GsubProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - String pattern = ConfigurationUtils.readStringProperty(config, "pattern"); - String replacement = ConfigurationUtils.readStringProperty(config, "replacement"); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String pattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern"); + String replacement = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "replacement"); Pattern searchPattern = Pattern.compile(pattern); return new GsubProcessor(processorTag, field, searchPattern, replacement); } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java index dd729dd0afd..813c42a2965 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java @@ -73,8 +73,8 @@ public class JoinProcessor extends AbstractProcessor { public static class Factory extends AbstractProcessorFactory { @Override public JoinProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - String separator = ConfigurationUtils.readStringProperty(config, "separator"); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator"); return new JoinProcessor(processorTag, field, separator); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/LowercaseProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/LowercaseProcessor.java index 617efd9b480..0931e5d77d9 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/LowercaseProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/LowercaseProcessor.java @@ -45,6 +45,11 @@ public class LowercaseProcessor extends AbstractStringProcessor { } public static class Factory extends AbstractStringProcessor.Factory { + + public Factory() { + super(TYPE); + } + @Override protected LowercaseProcessor newProcessor(String tag, String field) { return new LowercaseProcessor(tag, field); diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java index a39ac8f5cf4..489822867c9 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java @@ -65,7 +65,7 @@ public class RemoveProcessor extends AbstractProcessor { @Override public RemoveProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); return new RemoveProcessor(processorTag, templateService.compile(field)); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java index 6088315884e..8e19c4ecc15 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java @@ -78,8 +78,8 @@ public class RenameProcessor extends AbstractProcessor { public static class Factory extends AbstractProcessorFactory { @Override public RenameProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - String newField = ConfigurationUtils.readStringProperty(config, "to"); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String newField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "to"); return new RenameProcessor(processorTag, field, newField); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java index e046a5f3bdb..d150016cf91 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java @@ -73,8 +73,8 @@ public class SetProcessor extends AbstractProcessor { @Override public SetProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - Object value = ConfigurationUtils.readObject(config, "value"); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); return new SetProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService)); } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java index ad0bffb061a..2ecaad1a7d0 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java @@ -75,8 +75,8 @@ public class SplitProcessor extends AbstractProcessor { public static class Factory extends AbstractProcessorFactory { @Override public SplitProcessor doCreate(String processorTag, Map config) throws Exception { - String field = ConfigurationUtils.readStringProperty(config, "field"); - return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(config, "separator")); + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator")); } } } diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/TrimProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/TrimProcessor.java index c66cc848933..7de309b51c6 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/TrimProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/TrimProcessor.java @@ -42,6 +42,11 @@ public class TrimProcessor extends AbstractStringProcessor { } public static class Factory extends AbstractStringProcessor.Factory { + + public Factory() { + super(TYPE); + } + @Override protected TrimProcessor newProcessor(String tag, String field) { return new TrimProcessor(tag, field); diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/UppercaseProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/UppercaseProcessor.java index e6a1f77cb86..7b10d022798 100644 --- a/core/src/main/java/org/elasticsearch/ingest/processor/UppercaseProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/processor/UppercaseProcessor.java @@ -44,6 +44,11 @@ public class UppercaseProcessor extends AbstractStringProcessor { } public static class Factory extends AbstractStringProcessor.Factory { + + public Factory() { + super(TYPE); + } + @Override protected UppercaseProcessor newProcessor(String tag, String field) { return new UppercaseProcessor(tag, field); diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index f0ddc83acaa..badccbb9579 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -20,9 +20,13 @@ package org.elasticsearch.rest.action.ingest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.WritePipelineResponse; +import org.elasticsearch.action.ingest.WritePipelineResponseRestListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; @@ -30,6 +34,8 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; import org.elasticsearch.rest.action.support.RestActions; +import java.io.IOException; + public class RestPutPipelineAction extends BaseRestHandler { @Inject @@ -43,6 +49,7 @@ public class RestPutPipelineAction extends BaseRestHandler { PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), RestActions.getRestContent(restRequest)); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout())); - client.admin().cluster().putPipeline(request, new AcknowledgedRestListener<>(channel)); + client.admin().cluster().putPipeline(request, new WritePipelineResponseRestListener(channel)); } + } diff --git a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java index 82b504b0ea7..94f80a9b611 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import org.elasticsearch.rest.action.support.RestToXContentListener; public class RestSimulatePipelineAction extends BaseRestHandler { @@ -46,6 +47,6 @@ public class RestSimulatePipelineAction extends BaseRestHandler { SimulatePipelineRequest request = new SimulatePipelineRequest(RestActions.getRestContent(restRequest)); request.setId(restRequest.param("id")); request.setVerbose(restRequest.paramAsBoolean("verbose", false)); - client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel)); + client.admin().cluster().simulatePipeline(request, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java b/core/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java new file mode 100644 index 00000000000..8eb3f4ece75 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.ingest.core.PipelineFactoryError; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; + +public class WritePipelineResponseTests extends ESTestCase { + + public void testSerializationWithoutError() throws IOException { + boolean isAcknowledged = randomBoolean(); + WritePipelineResponse response; + response = new WritePipelineResponse(isAcknowledged); + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = StreamInput.wrap(out.bytes()); + WritePipelineResponse otherResponse = new WritePipelineResponse(); + otherResponse.readFrom(streamInput); + + assertThat(otherResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + } + + public void testSerializationWithError() throws IOException { + PipelineFactoryError error = new PipelineFactoryError("error"); + WritePipelineResponse response = new WritePipelineResponse(error); + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = StreamInput.wrap(out.bytes()); + WritePipelineResponse otherResponse = new WritePipelineResponse(); + otherResponse.readFrom(streamInput); + + assertThat(otherResponse.getError().getReason(), equalTo(response.getError().getReason())); + assertThat(otherResponse.getError().getProcessorType(), equalTo(response.getError().getProcessorType())); + assertThat(otherResponse.getError().getProcessorTag(), equalTo(response.getError().getProcessorTag())); + assertThat(otherResponse.getError().getProcessorPropertyName(), equalTo(response.getError().getProcessorPropertyName())); + } +} diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 1a6e0c46df5..bcbe41dd66f 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -47,6 +47,7 @@ import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.Is.is; @@ -200,6 +201,39 @@ public class IngestClientIT extends ESIntegTestCase { assertThat(getResponse.pipelines().size(), equalTo(0)); } + public void testPutWithPipelineError() throws Exception { + BytesReference source = jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("not_found") + .endObject() + .endObject() + .endArray() + .endObject().bytes(); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source); + WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get(); + assertThat(response.isAcknowledged(), equalTo(false)); + assertThat(response.getError().getReason(), equalTo("No processor type exists with name [not_found]")); + } + + public void testPutWithProcessorFactoryError() throws Exception { + BytesReference source = jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .field("unused", ":sad_face:") + .endObject() + .endObject() + .endArray() + .endObject().bytes(); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source); + WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get(); + assertThat(response.isAcknowledged(), equalTo(false)); + assertThat(response.getError().getReason(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); + } + @Override protected Collection> getMockPlugins() { return Collections.singletonList(TestSeedPlugin.class); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 117b95b2cd7..a75a84f0379 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; @@ -41,7 +42,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class PipelineStoreTests extends ESTestCase { @@ -102,6 +102,45 @@ public class PipelineStoreTests extends ESTestCase { assertThat(pipeline.getProcessors().size(), equalTo(0)); } + public void testPutWithErrorResponse() { + + } + + public void testConstructPipelineResponseSuccess() { + Map processorConfig = new HashMap<>(); + processorConfig.put("field", "foo"); + processorConfig.put("value", "bar"); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("description", "_description"); + pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig))); + WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig); + assertThat(response, nullValue()); + } + + public void testConstructPipelineResponseMissingProcessorsFieldException() { + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("description", "_description"); + WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig); + assertThat(response.getError().getProcessorType(), is(nullValue())); + assertThat(response.getError().getProcessorTag(), is(nullValue())); + assertThat(response.getError().getProcessorPropertyName(), equalTo("processors")); + assertThat(response.getError().getReason(), equalTo("[processors] required property is missing")); + } + + public void testConstructPipelineResponseConfigurationException() { + Map processorConfig = new HashMap<>(); + processorConfig.put("field", "foo"); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("description", "_description"); + pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig))); + WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig); + + assertThat(response.getError().getProcessorTag(), nullValue()); + assertThat(response.getError().getProcessorType(), equalTo("set")); + assertThat(response.getError().getProcessorPropertyName(), equalTo("value")); + assertThat(response.getError().getReason(), equalTo("[value] required property is missing")); + } + public void testDelete() { PipelineConfiguration config = new PipelineConfiguration( "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}") diff --git a/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java b/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java index 958378f355a..954a03c2172 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/ConfigurationUtilsTests.java @@ -19,11 +19,13 @@ package org.elasticsearch.ingest.core; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,21 +51,21 @@ public class ConfigurationUtilsTests extends ESTestCase { } public void testReadStringProperty() { - String val = ConfigurationUtils.readStringProperty(config, "foo"); + String val = ConfigurationUtils.readStringProperty(null, null, config, "foo"); assertThat(val, equalTo("bar")); } public void testReadStringPropertyInvalidType() { try { - ConfigurationUtils.readStringProperty(config, "arr"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("property [arr] isn't a string, but of type [java.util.Arrays$ArrayList]")); + ConfigurationUtils.readStringProperty(null, null, config, "arr"); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]")); } } // TODO(talevy): Issue with generics. This test should fail, "int" is of type List public void testOptional_InvalidType() { - List val = ConfigurationUtils.readList(config, "int"); - assertThat(val, equalTo(Arrays.asList(2))); + List val = ConfigurationUtils.readList(null, null, config, "int"); + assertThat(val, equalTo(Collections.singletonList(2))); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java index 229290372b6..746ac2f5617 100644 --- a/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/core/PipelineFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.core; import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -51,6 +52,18 @@ public class PipelineFactoryTests extends ESTestCase { assertThat(pipeline.getProcessors().get(1).getTag(), nullValue()); } + public void testCreateWithNoProcessorsField() throws Exception { + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + Pipeline.Factory factory = new Pipeline.Factory(); + try { + factory.create("_id", pipelineConfig, Collections.emptyMap()); + fail("should fail, missing required [processors] field"); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[processors] required property is missing")); + } + } + public void testCreateWithPipelineOnFailure() throws Exception { Map processorConfig = new HashMap<>(); Map pipelineConfig = new HashMap<>(); @@ -78,7 +91,7 @@ public class PipelineFactoryTests extends ESTestCase { Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); try { factory.create("_id", pipelineConfig, processorRegistry); - } catch (IllegalArgumentException e) { + } catch (ConfigurationPropertyException e) { assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorFactoryTests.java index b72c144605f..c4c13a6ab7d 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorFactoryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -64,8 +65,8 @@ public class AppendProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } @@ -75,8 +76,8 @@ public class AppendProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [value] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[value] required property is missing")); } } @@ -87,8 +88,8 @@ public class AppendProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [value] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[value] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorFactoryTests.java index 706433141d4..a07cec5c4e7 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -66,8 +67,8 @@ public class ConvertProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), Matchers.equalTo("required property [field] is missing")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing")); } } @@ -78,8 +79,8 @@ public class ConvertProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), Matchers.equalTo("required property [type] is missing")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), Matchers.equalTo("[type] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/DateProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/DateProcessorFactoryTests.java index a145a7c5149..1139f1968f7 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/DateProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/DateProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import org.joda.time.DateTimeZone; @@ -63,8 +64,8 @@ public class DateProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("processor creation should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("required property [match_field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), containsString("[match_field] required property is missing")); } } @@ -79,8 +80,8 @@ public class DateProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("processor creation should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("required property [match_formats] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), containsString("[match_formats] required property is missing")); } } @@ -169,8 +170,8 @@ public class DateProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("processor creation should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("property [match_formats] isn't a list, but of type [java.lang.String]")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), containsString("[match_formats] property isn't a list, but of type [java.lang.String]")); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/FailProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/FailProcessorFactoryTests.java index 993c7ccd904..661a6383dfd 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/FailProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/FailProcessorFactoryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -54,8 +55,8 @@ public class FailProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [message] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[message] required property is missing")); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorFactoryTests.java index fd62f6cdeac..bce033091ac 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -52,8 +53,8 @@ public class GsubProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } @@ -65,8 +66,8 @@ public class GsubProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [pattern] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[pattern] required property is missing")); } } @@ -78,8 +79,8 @@ public class GsubProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [replacement] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[replacement] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorFactoryTests.java index 2af2b096417..51eb989beda 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -49,8 +50,8 @@ public class JoinProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } @@ -61,8 +62,8 @@ public class JoinProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [separator] is missing")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[separator] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/LowercaseProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/LowercaseProcessorFactoryTests.java index 6a4a67e40cf..32eefa07896 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/LowercaseProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/LowercaseProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -46,8 +47,8 @@ public class LowercaseProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorFactoryTests.java index 0b03150adb6..5b03d288064 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorFactoryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -54,8 +55,8 @@ public class RemoveProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorFactoryTests.java index 21f5c663671..ea6284f305a 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -49,8 +50,8 @@ public class RenameProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } @@ -61,8 +62,8 @@ public class RenameProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [to] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[to] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/SetProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/SetProcessorFactoryTests.java index a58ee491a7c..1c3cf15e48f 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/SetProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/SetProcessorFactoryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -57,8 +58,8 @@ public class SetProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } @@ -68,8 +69,8 @@ public class SetProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [value] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[value] required property is missing")); } } @@ -80,8 +81,8 @@ public class SetProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [value] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[value] required property is missing")); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorFactoryTests.java index 7267544c1ff..3bd2f95e2bc 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -49,8 +50,8 @@ public class SplitProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } @@ -61,8 +62,8 @@ public class SplitProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [separator] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[separator] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/TrimProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/TrimProcessorFactoryTests.java index 350aaa66e6d..8012893bfcb 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/TrimProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/TrimProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -46,8 +47,8 @@ public class TrimProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } } diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/UppercaseProcessorFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/UppercaseProcessorFactoryTests.java index 2220438c75f..914909f9378 100644 --- a/core/src/test/java/org/elasticsearch/ingest/processor/UppercaseProcessorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/processor/UppercaseProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; @@ -46,8 +47,8 @@ public class UppercaseProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("factory create should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("required property [field] is missing")); + } catch(ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); } } } diff --git a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java index 4df8d673072..b4755d61c56 100644 --- a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java +++ b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java @@ -74,9 +74,9 @@ public final class GrokProcessor extends AbstractProcessor { @Override public GrokProcessor doCreate(String processorTag, Map config) throws Exception { - String matchField = ConfigurationUtils.readStringProperty(config, "field"); - String matchPattern = ConfigurationUtils.readStringProperty(config, "pattern"); - Map customPatternBank = ConfigurationUtils.readOptionalMap(config, "pattern_definitions"); + String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + String matchPattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern"); + Map customPatternBank = ConfigurationUtils.readOptionalMap(TYPE, processorTag, config, "pattern_definitions"); Map patternBank = new HashMap<>(builtinPatterns); if (customPatternBank != null) { patternBank.putAll(customPatternBank); diff --git a/modules/ingest-grok/src/test/java/org/elasticsearch/ingest/grok/GrokProcessorFactoryTests.java b/modules/ingest-grok/src/test/java/org/elasticsearch/ingest/grok/GrokProcessorFactoryTests.java index f6bed139552..1c36e26925d 100644 --- a/modules/ingest-grok/src/test/java/org/elasticsearch/ingest/grok/GrokProcessorFactoryTests.java +++ b/modules/ingest-grok/src/test/java/org/elasticsearch/ingest/grok/GrokProcessorFactoryTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.grok; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -45,6 +47,32 @@ public class GrokProcessorFactoryTests extends ESTestCase { assertThat(processor.getGrok(), notNullValue()); } + public void testBuildMissingField() throws Exception { + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + Map config = new HashMap<>(); + config.put("pattern", "(?\\w+)"); + try { + factory.create(config); + fail("should fail"); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[field] required property is missing")); + + } + } + + public void testBuildMissingPattern() throws Exception { + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + Map config = new HashMap<>(); + config.put("field", "foo"); + try { + factory.create(config); + fail("should fail"); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[pattern] required property is missing")); + } + + } + public void testCreateWithCustomPatterns() throws Exception { GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index b1c25f5a1ec..dbcdbbc1a7d 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -35,6 +35,8 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.core.AbstractProcessor; import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import java.io.Closeable; import java.io.IOException; @@ -226,10 +228,10 @@ public final class GeoIpProcessor extends AbstractProcessor { @Override public GeoIpProcessor doCreate(String processorTag, Map config) throws Exception { - String ipField = readStringProperty(config, "source_field"); - String targetField = readStringProperty(config, "target_field", "geoip"); - String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb"); - List fieldNames = readOptionalList(config, "fields"); + String ipField = readStringProperty(TYPE, processorTag, config, "source_field"); + String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip"); + String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb"); + List fieldNames = readOptionalList(TYPE, processorTag, config, "fields"); final Set fields; if (fieldNames != null) { @@ -238,7 +240,7 @@ public final class GeoIpProcessor extends AbstractProcessor { try { fields.add(Field.parse(fieldName)); } catch (Exception e) { - throw new IllegalArgumentException("illegal field option [" + fieldName +"]. valid values are [" + Arrays.toString(Field.values()) +"]", e); + throw new ConfigurationPropertyException(TYPE, processorTag, "fields", "illegal field option [" + fieldName + "]. valid values are [" + Arrays.toString(Field.values()) +"]"); } } } else { @@ -247,7 +249,7 @@ public final class GeoIpProcessor extends AbstractProcessor { DatabaseReader databaseReader = databaseReaders.get(databaseFile); if (databaseReader == null) { - throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist"); + throw new ConfigurationPropertyException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist"); } return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, fields); } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 271476cc2f6..410f6e343f7 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -21,6 +21,8 @@ package org.elasticsearch.ingest.geoip; import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.ingest.processor.ConfigurationPropertyException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; import org.junit.AfterClass; @@ -111,8 +113,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("Exception expected"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("database file [does-not-exist.mmdb] doesn't exist")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist")); } } @@ -144,8 +146,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("exception expected"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[fields] illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]")); } config = new HashMap<>(); @@ -154,8 +156,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { try { factory.create(config); fail("exception expected"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("property [fields] isn't a list, but of type [java.lang.String]")); + } catch (ConfigurationPropertyException e) { + assertThat(e.getMessage(), equalTo("[fields] property isn't a list, but of type [java.lang.String]")); } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml index bf0817f2da1..74808331446 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/10_crud.yaml @@ -50,6 +50,29 @@ ] } +--- +"Test invalid processor config": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "tag" : "fritag" + } + } + ] + } + - match: { "acknowledged": false } + - length: { "error": 4 } + - match: { "error.reason": "[field] required property is missing" } + - match: { "error.property_name": "field" } + - match: { "error.type": "set" } + - match: { "error.tag": "fritag" } + --- "Test basic pipeline with on_failure in processor": - do: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml index e61ad4e60a6..92fad242db9 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/40_simulate.yaml @@ -81,6 +81,7 @@ "processors": [ { "set" : { + "tag" : "fails", "value" : "_value" } } @@ -97,10 +98,11 @@ } ] } - - length: { error: 3 } - - match: { status: 400 } - - match: { error.type: "illegal_argument_exception" } - - match: { error.reason: "required property [field] is missing" } + - length: { error: 4 } + - match: { error.tag: "fails" } + - match: { error.type: "set" } + - match: { error.reason: "[field] required property is missing" } + - match: { error.property_name: "field" } --- "Test simulate without index type and id": @@ -189,10 +191,45 @@ } ] } - - length: { error: 3 } - - match: { status: 400 } - - match: { error.type: "illegal_argument_exception" } - - match: { error.reason: "required property [pipeline] is missing" } + - length: { error: 4 } + - is_false: error.processor_type + - is_false: error.processor_tag + - match: { error.property_name: "pipeline" } + - match: { error.reason: "[pipeline] required property is missing" } + +--- +"Test simulate with invalid processor config": + - do: + catch: request + ingest.simulate: + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { error: 4 } + - match: { error.type: "set" } + - is_false: error.tag + - match: { error.reason: "[value] required property is missing" } + - match: { error.property_name: "value" } --- "Test simulate with verbose flag":