Introduce Pipeline Factory Error Responses in Node Ingest

When there is an exception thrown during pipeline creation within
Rest calls (in put pipeline, and simulate) We now return a structured
error response to the user with details around which processor's
configuration is the cause of the issue, or which configuration property
is misconfigured, etc.
This commit is contained in:
Tal Levy 2016-01-25 16:08:27 -08:00
parent 23f70296fb
commit fca442f4d1
54 changed files with 801 additions and 203 deletions

View File

@ -139,24 +139,24 @@ public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineReque
} }
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception { static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE); Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
List<IngestDocument> ingestDocumentList = parseDocs(config); List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose); return new Parsed(pipeline, ingestDocumentList, verbose);
} }
private static List<IngestDocument> parseDocs(Map<String, Object> config) { private static List<IngestDocument> parseDocs(Map<String, Object> config) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS); List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
List<IngestDocument> ingestDocumentList = new ArrayList<>(); List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) { for (Map<String, Object> dataMap : docs) {
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE); Map<String, Object> document = ConfigurationUtils.readMap(null, null, dataMap, Fields.SOURCE);
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, MetaData.INDEX.getFieldName(), "_index"), IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.INDEX.getFieldName(), "_index"),
ConfigurationUtils.readStringProperty(dataMap, MetaData.TYPE.getFieldName(), "_type"), ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.TYPE.getFieldName(), "_type"),
ConfigurationUtils.readStringProperty(dataMap, MetaData.ID.getFieldName(), "_id"), ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.ROUTING.getFieldName()), ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.PARENT.getFieldName()), ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TIMESTAMP.getFieldName()), ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.TIMESTAMP.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TTL.getFieldName()), ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.TTL.getFieldName()),
document); document);
ingestDocumentList.add(ingestDocument); ingestDocumentList.add(ingestDocument);
} }

View File

@ -22,24 +22,31 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
public class SimulatePipelineResponse extends ActionResponse implements ToXContent { public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent {
private String pipelineId; private String pipelineId;
private boolean verbose; private boolean verbose;
private List<SimulateDocumentResult> results; private List<SimulateDocumentResult> results;
private PipelineFactoryError error;
public SimulatePipelineResponse() { public SimulatePipelineResponse() {
} }
public SimulatePipelineResponse(PipelineFactoryError error) {
this.error = error;
}
public SimulatePipelineResponse(String pipelineId, boolean verbose, List<SimulateDocumentResult> responses) { public SimulatePipelineResponse(String pipelineId, boolean verbose, List<SimulateDocumentResult> responses) {
this.pipelineId = pipelineId; this.pipelineId = pipelineId;
this.verbose = verbose; this.verbose = verbose;
@ -58,42 +65,69 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
return verbose; return verbose;
} }
public boolean isError() {
return error != null;
}
@Override
public RestStatus status() {
if (isError()) {
return RestStatus.BAD_REQUEST;
}
return RestStatus.OK;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(pipelineId); out.writeBoolean(isError());
out.writeBoolean(verbose); if (isError()) {
out.writeVInt(results.size()); error.writeTo(out);
for (SimulateDocumentResult response : results) { } else {
response.writeTo(out); out.writeString(pipelineId);
out.writeBoolean(verbose);
out.writeVInt(results.size());
for (SimulateDocumentResult response : results) {
response.writeTo(out);
}
} }
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
this.pipelineId = in.readString(); boolean isError = in.readBoolean();
boolean verbose = in.readBoolean(); if (isError) {
int responsesLength = in.readVInt(); error = new PipelineFactoryError();
results = new ArrayList<>(); error.readFrom(in);
for (int i = 0; i < responsesLength; i++) { } else {
SimulateDocumentResult<?> simulateDocumentResult; this.pipelineId = in.readString();
if (verbose) { boolean verbose = in.readBoolean();
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in); int responsesLength = in.readVInt();
} else { results = new ArrayList<>();
simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in); for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult<?> simulateDocumentResult;
if (verbose) {
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in);
} else {
simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in);
}
results.add(simulateDocumentResult);
} }
results.add(simulateDocumentResult);
} }
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Fields.DOCUMENTS); if (isError()) {
for (SimulateDocumentResult response : results) { error.toXContent(builder, params);
response.toXContent(builder, params); } else {
builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : results) {
response.toXContent(builder, params);
}
builder.endArray();
} }
builder.endArray();
return builder; return builder;
} }

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -56,6 +58,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
} else { } else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore); simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
} }
} catch (ConfigurationPropertyException e) {
listener.onResponse(new SimulatePipelineResponse(new PipelineFactoryError(e)));
return;
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
return; return;

View File

@ -22,27 +22,49 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import java.io.IOException; import java.io.IOException;
public class WritePipelineResponse extends AcknowledgedResponse { public class WritePipelineResponse extends AcknowledgedResponse {
private PipelineFactoryError error;
WritePipelineResponse() { WritePipelineResponse() {
} }
public WritePipelineResponse(boolean acknowledge) { public WritePipelineResponse(boolean acknowledged) {
super(acknowledge); super(acknowledged);
if (!isAcknowledged()) {
error = new PipelineFactoryError("pipeline write is not acknowledged");
}
}
public WritePipelineResponse(PipelineFactoryError error) {
super(false);
this.error = error;
}
public PipelineFactoryError getError() {
return error;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
readAcknowledged(in); readAcknowledged(in);
if (!isAcknowledged()) {
error = new PipelineFactoryError();
error.readFrom(in);
}
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
writeAcknowledged(out); writeAcknowledged(out);
if (!isAcknowledged()) {
error.writeTo(out);
}
} }
} }

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import java.io.IOException;
public class WritePipelineResponseRestListener extends AcknowledgedRestListener<WritePipelineResponse> {
public WritePipelineResponseRestListener(RestChannel channel) {
super(channel);
}
@Override
protected void addCustomFields(XContentBuilder builder, WritePipelineResponse response) throws IOException {
if (!response.isAcknowledged()) {
response.getError().toXContent(builder, null);
}
}
}

View File

@ -36,8 +36,10 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import java.io.Closeable; import java.io.Closeable;
@ -101,7 +103,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
Map<String, Pipeline> pipelines = new HashMap<>(); Map<String, Pipeline> pipelines = new HashMap<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try { try {
pipelines.put(pipeline.getId(), constructPipeline(pipeline.getId(), pipeline.getConfigAsMap())); pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactoryRegistry));
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -148,16 +150,14 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
/** /**
* Stores the specified pipeline definition in the request. * Stores the specified pipeline definition in the request.
*
* @throws IllegalArgumentException If the pipeline holds incorrect configuration
*/ */
public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) throws IllegalArgumentException { public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) {
try { // validates the pipeline and processor configuration before submitting a cluster update task:
// validates the pipeline and processor configuration before submitting a cluster update task: Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2(); WritePipelineResponse response = validatePipelineResponse(request.getId(), pipelineConfig);
constructPipeline(request.getId(), pipelineConfig); if (response != null) {
} catch (Exception e) { listener.onResponse(response);
throw new IllegalArgumentException("Invalid pipeline configuration", e); return;
} }
clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) { clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
@ -235,8 +235,15 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
return result; return result;
} }
private Pipeline constructPipeline(String id, Map<String, Object> config) throws Exception { WritePipelineResponse validatePipelineResponse(String id, Map<String, Object> config) {
return factory.create(id, config, processorFactoryRegistry); try {
factory.create(id, config, processorFactoryRegistry);
return null;
} catch (ConfigurationPropertyException e) {
return new WritePipelineResponse(new PipelineFactoryError(e));
} catch (Exception e) {
return new WritePipelineResponse(new PipelineFactoryError(e.getMessage()));
}
} }
} }

View File

@ -31,7 +31,7 @@ public abstract class AbstractProcessorFactory<P extends Processor> implements P
@Override @Override
public P create(Map<String, Object> config) throws Exception { public P create(Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(config, TAG_KEY); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
return doCreate(tag, config); return doCreate(tag, config);
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest.core; package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -30,133 +32,133 @@ public final class ConfigurationUtils {
/** /**
* Returns and removes the specified optional property from the specified configuration map. * Returns and removes the specified optional property from the specified configuration map.
* *
* If the property value isn't of type string a {@link IllegalArgumentException} is thrown. * If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown.
*/ */
public static String readOptionalStringProperty(Map<String, Object> configuration, String propertyName) { public static String readOptionalStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
return readString(propertyName, value); return readString(processorType, processorTag, propertyName, value);
} }
/** /**
* Returns and removes the specified property from the specified configuration map. * Returns and removes the specified property from the specified configuration map.
* *
* If the property value isn't of type string an {@link IllegalArgumentException} is thrown. * If the property value isn't of type string an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown * If the property is missing an {@link ConfigurationPropertyException} is thrown
*/ */
public static String readStringProperty(Map<String, Object> configuration, String propertyName) { public static String readStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
return readStringProperty(configuration, propertyName, null); return readStringProperty(processorType, processorTag, configuration, propertyName, null);
} }
/** /**
* Returns and removes the specified property from the specified configuration map. * Returns and removes the specified property from the specified configuration map.
* *
* If the property value isn't of type string a {@link IllegalArgumentException} is thrown. * If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown.
* If the property is missing and no default value has been specified a {@link IllegalArgumentException} is thrown * If the property is missing and no default value has been specified a {@link ConfigurationPropertyException} is thrown
*/ */
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) { public static String readStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null && defaultValue != null) { if (value == null && defaultValue != null) {
return defaultValue; return defaultValue;
} else if (value == null) { } else if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
} }
return readString(propertyName, value); return readString(processorType, processorTag, propertyName, value);
} }
private static String readString(String propertyName, Object value) { private static String readString(String processorType, String processorTag, String propertyName, Object value) {
if (value == null) { if (value == null) {
return null; return null;
} }
if (value instanceof String) { if (value instanceof String) {
return (String) value; return (String) value;
} }
throw new IllegalArgumentException("property [" + propertyName + "] isn't a string, but of type [" + value.getClass().getName() + "]"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
} }
/** /**
* Returns and removes the specified property of type list from the specified configuration map. * Returns and removes the specified property of type list from the specified configuration map.
* *
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown. * If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown.
*/ */
public static <T> List<T> readOptionalList(Map<String, Object> configuration, String propertyName) { public static <T> List<T> readOptionalList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
return null; return null;
} }
return readList(propertyName, value); return readList(processorType, processorTag, propertyName, value);
} }
/** /**
* Returns and removes the specified property of type list from the specified configuration map. * Returns and removes the specified property of type list from the specified configuration map.
* *
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown. * If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown * If the property is missing an {@link ConfigurationPropertyException} is thrown
*/ */
public static <T> List<T> readList(Map<String, Object> configuration, String propertyName) { public static <T> List<T> readList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
} }
return readList(propertyName, value); return readList(processorType, processorTag, propertyName, value);
} }
private static <T> List<T> readList(String propertyName, Object value) { private static <T> List<T> readList(String processorType, String processorTag, String propertyName, Object value) {
if (value instanceof List) { if (value instanceof List) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<T> stringList = (List<T>) value; List<T> stringList = (List<T>) value;
return stringList; return stringList;
} else { } else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a list, but of type [" + value.getClass().getName() + "]");
} }
} }
/** /**
* Returns and removes the specified property of type map from the specified configuration map. * Returns and removes the specified property of type map from the specified configuration map.
* *
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown. * If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown * If the property is missing an {@link ConfigurationPropertyException} is thrown
*/ */
public static <T> Map<String, T> readMap(Map<String, Object> configuration, String propertyName) { public static <T> Map<String, T> readMap(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
} }
return readMap(propertyName, value); return readMap(processorType, processorTag, propertyName, value);
} }
/** /**
* Returns and removes the specified property of type map from the specified configuration map. * Returns and removes the specified property of type map from the specified configuration map.
* *
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown. * If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown.
*/ */
public static <T> Map<String, T> readOptionalMap(Map<String, Object> configuration, String propertyName) { public static <T> Map<String, T> readOptionalMap(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
return null; return null;
} }
return readMap(propertyName, value); return readMap(processorType, processorTag, propertyName, value);
} }
private static <T> Map<String, T> readMap(String propertyName, Object value) { private static <T> Map<String, T> readMap(String processorType, String processorTag, String propertyName, Object value) {
if (value instanceof Map) { if (value instanceof Map) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, T> map = (Map<String, T>) value; Map<String, T> map = (Map<String, T>) value;
return map; return map;
} else { } else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a map, but of type [" + value.getClass().getName() + "]");
} }
} }
/** /**
* Returns and removes the specified property as an {@link Object} from the specified configuration map. * Returns and removes the specified property as an {@link Object} from the specified configuration map.
*/ */
public static Object readObject(Map<String, Object> configuration, String propertyName) { public static Object readObject(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
} }
return value; return value;
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest.core; package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -82,19 +84,20 @@ public final class Pipeline {
public final static class Factory { public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception { public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws ConfigurationPropertyException {
String description = ConfigurationUtils.readOptionalStringProperty(config, DESCRIPTION_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Processor> processors = readProcessors(PROCESSORS_KEY, processorRegistry, config); List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config); List<Processor> processors = readProcessorConfigs(processorConfigs, processorRegistry);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
if (config.isEmpty() == false) { if (config.isEmpty() == false) {
throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); throw new ConfigurationPropertyException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
} }
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor); return new Pipeline(id, description, compoundProcessor);
} }
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception { private List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs, Map<String, Processor.Factory> processorRegistry) throws ConfigurationPropertyException {
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readOptionalList(config, fieldName);
List<Processor> processors = new ArrayList<>(); List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) { if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) { for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
@ -107,20 +110,28 @@ public final class Pipeline {
return processors; return processors;
} }
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception { private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws ConfigurationPropertyException {
Processor.Factory factory = processorRegistry.get(type); Processor.Factory factory = processorRegistry.get(type);
if (factory != null) { if (factory != null) {
List<Processor> onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config); List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
Processor processor = factory.create(config); List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
if (config.isEmpty() == false) { Processor processor;
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); try {
processor = factory.create(config);
} catch (ConfigurationPropertyException e) {
throw e;
} catch (Exception e) {
throw new ConfigurationPropertyException(e.getMessage());
}
if (!config.isEmpty()) {
throw new ConfigurationPropertyException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
} }
if (onFailureProcessors.isEmpty()) { if (onFailureProcessors.isEmpty()) {
return processor; return processor;
} }
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
} }
throw new IllegalArgumentException("No processor type exists with name [" + type + "]"); throw new ConfigurationPropertyException("No processor type exists with name [" + type + "]");
} }
} }
} }

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.io.IOException;
public class PipelineFactoryError implements Streamable, ToXContent {
private String reason;
private String processorType;
private String processorTag;
private String processorPropertyName;
public PipelineFactoryError() {
}
public PipelineFactoryError(ConfigurationPropertyException e) {
this.reason = e.getMessage();
this.processorType = e.getProcessorType();
this.processorTag = e.getProcessorTag();
this.processorPropertyName = e.getPropertyName();
}
public PipelineFactoryError(String reason) {
this.reason = "Constructing Pipeline failed:" + reason;
}
public String getReason() {
return reason;
}
public String getProcessorTag() {
return processorTag;
}
public String getProcessorPropertyName() {
return processorPropertyName;
}
public String getProcessorType() {
return processorType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
reason = in.readString();
processorType = in.readOptionalString();
processorTag = in.readOptionalString();
processorPropertyName = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(reason);
out.writeOptionalString(processorType);
out.writeOptionalString(processorTag);
out.writeOptionalString(processorPropertyName);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("error");
builder.field("type", processorType);
builder.field("tag", processorTag);
builder.field("reason", reason);
builder.field("property_name", processorPropertyName);
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
public class PipelineFactoryResult {
private final Pipeline pipeline;
private final PipelineFactoryError error;
public PipelineFactoryResult(Pipeline pipeline) {
this.pipeline = pipeline;
this.error = null;
}
public PipelineFactoryResult(PipelineFactoryError error) {
this.error = error;
this.pipeline = null;
}
public Pipeline getPipeline() {
return pipeline;
}
public PipelineFactoryError getError() {
return error;
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest.core; package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.Map; import java.util.Map;
/** /**

View File

@ -55,10 +55,15 @@ public abstract class AbstractStringProcessor extends AbstractProcessor {
protected abstract String process(String value); protected abstract String process(String value);
public static abstract class Factory<T extends AbstractStringProcessor> extends AbstractProcessorFactory<T> { public static abstract class Factory<T extends AbstractStringProcessor> extends AbstractProcessorFactory<T> {
protected final String processorType;
protected Factory(String processorType) {
this.processorType = processorType;
}
@Override @Override
public T doCreate(String processorTag, Map<String, Object> config) throws Exception { public T doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field");
return newProcessor(processorTag, field); return newProcessor(processorTag, field);
} }

View File

@ -74,8 +74,8 @@ public class AppendProcessor extends AbstractProcessor {
@Override @Override
public AppendProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public AppendProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(config, "value"); Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService)); return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
} }
} }

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
/**
* Exception class thrown by processor factories.
*/
public class ConfigurationPropertyException extends RuntimeException {
private String processorType;
private String processorTag;
private String propertyName;
public ConfigurationPropertyException(String processorType, String processorTag, String propertyName, String message) {
super("[" + propertyName + "] " + message);
this.processorTag = processorTag;
this.processorType = processorType;
this.propertyName = propertyName;
}
public ConfigurationPropertyException(String errorMessage) {
super(errorMessage);
}
public String getPropertyName() {
return propertyName;
}
public String getProcessorType() {
return processorType;
}
public String getProcessorTag() {
return processorTag;
}
}

View File

@ -137,8 +137,8 @@ public class ConvertProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<ConvertProcessor> { public static class Factory extends AbstractProcessorFactory<ConvertProcessor> {
@Override @Override
public ConvertProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public ConvertProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(config, "type")); Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type"));
return new ConvertProcessor(processorTag, field, convertType); return new ConvertProcessor(processorTag, field, convertType);
} }
} }

View File

@ -112,11 +112,11 @@ public final class DateProcessor extends AbstractProcessor {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public DateProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public DateProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "match_field"); String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "match_field");
String targetField = ConfigurationUtils.readStringProperty(config, "target_field", DEFAULT_TARGET_FIELD); String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(config, "timezone"); String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString); DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale"); String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "locale");
Locale locale = Locale.ENGLISH; Locale locale = Locale.ENGLISH;
if (localeString != null) { if (localeString != null) {
try { try {
@ -125,7 +125,7 @@ public final class DateProcessor extends AbstractProcessor {
throw new IllegalArgumentException("Invalid language tag specified: " + localeString); throw new IllegalArgumentException("Invalid language tag specified: " + localeString);
} }
} }
List<String> matchFormats = ConfigurationUtils.readList(config, "match_formats"); List<String> matchFormats = ConfigurationUtils.readList(TYPE, processorTag, config, "match_formats");
return new DateProcessor(processorTag, timezone, locale, matchField, matchFormats, targetField); return new DateProcessor(processorTag, timezone, locale, matchField, matchFormats, targetField);
} }
} }

View File

@ -95,7 +95,7 @@ public class DeDotProcessor extends AbstractProcessor {
@Override @Override
public DeDotProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public DeDotProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String separator = ConfigurationUtils.readOptionalStringProperty(config, "separator"); String separator = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "separator");
if (separator == null) { if (separator == null) {
separator = DEFAULT_SEPARATOR; separator = DEFAULT_SEPARATOR;
} }

View File

@ -66,7 +66,7 @@ public class FailProcessor extends AbstractProcessor {
@Override @Override
public FailProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public FailProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(config, "message"); String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
return new FailProcessor(processorTag, templateService.compile(message)); return new FailProcessor(processorTag, templateService.compile(message));
} }
} }

View File

@ -79,9 +79,9 @@ public class GsubProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<GsubProcessor> { public static class Factory extends AbstractProcessorFactory<GsubProcessor> {
@Override @Override
public GsubProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public GsubProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String pattern = ConfigurationUtils.readStringProperty(config, "pattern"); String pattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern");
String replacement = ConfigurationUtils.readStringProperty(config, "replacement"); String replacement = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "replacement");
Pattern searchPattern = Pattern.compile(pattern); Pattern searchPattern = Pattern.compile(pattern);
return new GsubProcessor(processorTag, field, searchPattern, replacement); return new GsubProcessor(processorTag, field, searchPattern, replacement);
} }

View File

@ -73,8 +73,8 @@ public class JoinProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<JoinProcessor> { public static class Factory extends AbstractProcessorFactory<JoinProcessor> {
@Override @Override
public JoinProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public JoinProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String separator = ConfigurationUtils.readStringProperty(config, "separator"); String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator");
return new JoinProcessor(processorTag, field, separator); return new JoinProcessor(processorTag, field, separator);
} }
} }

View File

@ -45,6 +45,11 @@ public class LowercaseProcessor extends AbstractStringProcessor {
} }
public static class Factory extends AbstractStringProcessor.Factory<LowercaseProcessor> { public static class Factory extends AbstractStringProcessor.Factory<LowercaseProcessor> {
public Factory() {
super(TYPE);
}
@Override @Override
protected LowercaseProcessor newProcessor(String tag, String field) { protected LowercaseProcessor newProcessor(String tag, String field) {
return new LowercaseProcessor(tag, field); return new LowercaseProcessor(tag, field);

View File

@ -65,7 +65,7 @@ public class RemoveProcessor extends AbstractProcessor {
@Override @Override
public RemoveProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public RemoveProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new RemoveProcessor(processorTag, templateService.compile(field)); return new RemoveProcessor(processorTag, templateService.compile(field));
} }
} }

View File

@ -78,8 +78,8 @@ public class RenameProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<RenameProcessor> { public static class Factory extends AbstractProcessorFactory<RenameProcessor> {
@Override @Override
public RenameProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public RenameProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String newField = ConfigurationUtils.readStringProperty(config, "to"); String newField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "to");
return new RenameProcessor(processorTag, field, newField); return new RenameProcessor(processorTag, field, newField);
} }
} }

View File

@ -73,8 +73,8 @@ public class SetProcessor extends AbstractProcessor {
@Override @Override
public SetProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public SetProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(config, "value"); Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new SetProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService)); return new SetProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
} }
} }

View File

@ -75,8 +75,8 @@ public class SplitProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<SplitProcessor> { public static class Factory extends AbstractProcessorFactory<SplitProcessor> {
@Override @Override
public SplitProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public SplitProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field"); String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(config, "separator")); return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator"));
} }
} }
} }

View File

@ -42,6 +42,11 @@ public class TrimProcessor extends AbstractStringProcessor {
} }
public static class Factory extends AbstractStringProcessor.Factory<TrimProcessor> { public static class Factory extends AbstractStringProcessor.Factory<TrimProcessor> {
public Factory() {
super(TYPE);
}
@Override @Override
protected TrimProcessor newProcessor(String tag, String field) { protected TrimProcessor newProcessor(String tag, String field) {
return new TrimProcessor(tag, field); return new TrimProcessor(tag, field);

View File

@ -44,6 +44,11 @@ public class UppercaseProcessor extends AbstractStringProcessor {
} }
public static class Factory extends AbstractStringProcessor.Factory<UppercaseProcessor> { public static class Factory extends AbstractStringProcessor.Factory<UppercaseProcessor> {
public Factory() {
super(TYPE);
}
@Override @Override
protected UppercaseProcessor newProcessor(String tag, String field) { protected UppercaseProcessor newProcessor(String tag, String field) {
return new UppercaseProcessor(tag, field); return new UppercaseProcessor(tag, field);

View File

@ -20,9 +20,13 @@
package org.elasticsearch.rest.action.ingest; package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponseRestListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
@ -30,6 +34,8 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener; import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
public class RestPutPipelineAction extends BaseRestHandler { public class RestPutPipelineAction extends BaseRestHandler {
@Inject @Inject
@ -43,6 +49,7 @@ public class RestPutPipelineAction extends BaseRestHandler {
PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), RestActions.getRestContent(restRequest)); PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), RestActions.getRestContent(restRequest));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
request.timeout(restRequest.paramAsTime("timeout", request.timeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
client.admin().cluster().putPipeline(request, new AcknowledgedRestListener<>(channel)); client.admin().cluster().putPipeline(request, new WritePipelineResponseRestListener(channel));
} }
} }

View File

@ -28,6 +28,7 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import org.elasticsearch.rest.action.support.RestToXContentListener; import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestSimulatePipelineAction extends BaseRestHandler { public class RestSimulatePipelineAction extends BaseRestHandler {
@ -46,6 +47,6 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
SimulatePipelineRequest request = new SimulatePipelineRequest(RestActions.getRestContent(restRequest)); SimulatePipelineRequest request = new SimulatePipelineRequest(RestActions.getRestContent(restRequest));
request.setId(restRequest.param("id")); request.setId(restRequest.param("id"));
request.setVerbose(restRequest.paramAsBoolean("verbose", false)); request.setVerbose(restRequest.paramAsBoolean("verbose", false));
client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel)); client.admin().cluster().simulatePipeline(request, new RestStatusToXContentListener<>(channel));
} }
} }

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
public class WritePipelineResponseTests extends ESTestCase {
public void testSerializationWithoutError() throws IOException {
boolean isAcknowledged = randomBoolean();
WritePipelineResponse response;
response = new WritePipelineResponse(isAcknowledged);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
WritePipelineResponse otherResponse = new WritePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(otherResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
}
public void testSerializationWithError() throws IOException {
PipelineFactoryError error = new PipelineFactoryError("error");
WritePipelineResponse response = new WritePipelineResponse(error);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
WritePipelineResponse otherResponse = new WritePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(otherResponse.getError().getReason(), equalTo(response.getError().getReason()));
assertThat(otherResponse.getError().getProcessorType(), equalTo(response.getError().getProcessorType()));
assertThat(otherResponse.getError().getProcessorTag(), equalTo(response.getError().getProcessorTag()));
assertThat(otherResponse.getError().getProcessorPropertyName(), equalTo(response.getError().getProcessorPropertyName()));
}
}

View File

@ -47,6 +47,7 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
@ -200,6 +201,39 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(getResponse.pipelines().size(), equalTo(0)); assertThat(getResponse.pipelines().size(), equalTo(0));
} }
public void testPutWithPipelineError() throws Exception {
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("not_found")
.endObject()
.endObject()
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source);
WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get();
assertThat(response.isAcknowledged(), equalTo(false));
assertThat(response.getError().getReason(), equalTo("No processor type exists with name [not_found]"));
}
public void testPutWithProcessorFactoryError() throws Exception {
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.field("unused", ":sad_face:")
.endObject()
.endObject()
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source);
WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get();
assertThat(response.isAcknowledged(), equalTo(false));
assertThat(response.getError().getReason(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
@Override @Override
protected Collection<Class<? extends Plugin>> getMockPlugins() { protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Collections.singletonList(TestSeedPlugin.class); return Collections.singletonList(TestSeedPlugin.class);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -41,7 +42,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class PipelineStoreTests extends ESTestCase { public class PipelineStoreTests extends ESTestCase {
@ -102,6 +102,45 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(pipeline.getProcessors().size(), equalTo(0)); assertThat(pipeline.getProcessors().size(), equalTo(0));
} }
public void testPutWithErrorResponse() {
}
public void testConstructPipelineResponseSuccess() {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("field", "foo");
processorConfig.put("value", "bar");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig)));
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response, nullValue());
}
public void testConstructPipelineResponseMissingProcessorsFieldException() {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response.getError().getProcessorType(), is(nullValue()));
assertThat(response.getError().getProcessorTag(), is(nullValue()));
assertThat(response.getError().getProcessorPropertyName(), equalTo("processors"));
assertThat(response.getError().getReason(), equalTo("[processors] required property is missing"));
}
public void testConstructPipelineResponseConfigurationException() {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("field", "foo");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig)));
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response.getError().getProcessorTag(), nullValue());
assertThat(response.getError().getProcessorType(), equalTo("set"));
assertThat(response.getError().getProcessorPropertyName(), equalTo("value"));
assertThat(response.getError().getReason(), equalTo("[value] required property is missing"));
}
public void testDelete() { public void testDelete() {
PipelineConfiguration config = new PipelineConfiguration( PipelineConfiguration config = new PipelineConfiguration(
"_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}") "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")

View File

@ -19,11 +19,13 @@
package org.elasticsearch.ingest.core; package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -49,21 +51,21 @@ public class ConfigurationUtilsTests extends ESTestCase {
} }
public void testReadStringProperty() { public void testReadStringProperty() {
String val = ConfigurationUtils.readStringProperty(config, "foo"); String val = ConfigurationUtils.readStringProperty(null, null, config, "foo");
assertThat(val, equalTo("bar")); assertThat(val, equalTo("bar"));
} }
public void testReadStringPropertyInvalidType() { public void testReadStringPropertyInvalidType() {
try { try {
ConfigurationUtils.readStringProperty(config, "arr"); ConfigurationUtils.readStringProperty(null, null, config, "arr");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("property [arr] isn't a string, but of type [java.util.Arrays$ArrayList]")); assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
} }
} }
// TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer> // TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer>
public void testOptional_InvalidType() { public void testOptional_InvalidType() {
List<String> val = ConfigurationUtils.readList(config, "int"); List<String> val = ConfigurationUtils.readList(null, null, config, "int");
assertThat(val, equalTo(Arrays.asList(2))); assertThat(val, equalTo(Collections.singletonList(2)));
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.core; package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.Arrays; import java.util.Arrays;
@ -51,6 +52,18 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(pipeline.getProcessors().get(1).getTag(), nullValue()); assertThat(pipeline.getProcessors().get(1).getTag(), nullValue());
} }
public void testCreateWithNoProcessorsField() throws Exception {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
Pipeline.Factory factory = new Pipeline.Factory();
try {
factory.create("_id", pipelineConfig, Collections.emptyMap());
fail("should fail, missing required [processors] field");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
}
}
public void testCreateWithPipelineOnFailure() throws Exception { public void testCreateWithPipelineOnFailure() throws Exception {
Map<String, Object> processorConfig = new HashMap<>(); Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>(); Map<String, Object> pipelineConfig = new HashMap<>();
@ -78,7 +91,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
try { try {
factory.create("_id", pipelineConfig, processorRegistry); factory.create("_id", pipelineConfig, processorRegistry);
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
@ -64,8 +65,8 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
@ -75,8 +76,8 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing")); assertThat(e.getMessage(), equalTo("[value] required property is missing"));
} }
} }
@ -87,8 +88,8 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing")); assertThat(e.getMessage(), equalTo("[value] required property is missing"));
} }
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -66,8 +67,8 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), Matchers.equalTo("required property [field] is missing")); assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
} }
} }
@ -78,8 +79,8 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), Matchers.equalTo("required property [type] is missing")); assertThat(e.getMessage(), Matchers.equalTo("[type] required property is missing"));
} }
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -63,8 +64,8 @@ public class DateProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("processor creation should have failed"); fail("processor creation should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), containsString("required property [match_field] is missing")); assertThat(e.getMessage(), containsString("[match_field] required property is missing"));
} }
} }
@ -79,8 +80,8 @@ public class DateProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("processor creation should have failed"); fail("processor creation should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), containsString("required property [match_formats] is missing")); assertThat(e.getMessage(), containsString("[match_formats] required property is missing"));
} }
} }
@ -169,8 +170,8 @@ public class DateProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("processor creation should have failed"); fail("processor creation should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), containsString("property [match_formats] isn't a list, but of type [java.lang.String]")); assertThat(e.getMessage(), containsString("[match_formats] property isn't a list, but of type [java.lang.String]"));
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
@ -54,8 +55,8 @@ public class FailProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [message] is missing")); assertThat(e.getMessage(), equalTo("[message] required property is missing"));
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -52,8 +53,8 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
@ -65,8 +66,8 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [pattern] is missing")); assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
} }
} }
@ -78,8 +79,8 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [replacement] is missing")); assertThat(e.getMessage(), equalTo("[replacement] required property is missing"));
} }
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -49,8 +50,8 @@ public class JoinProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
@ -61,8 +62,8 @@ public class JoinProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [separator] is missing")); assertThat(e.getMessage(), equalTo("[separator] required property is missing"));
} }
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -46,8 +47,8 @@ public class LowercaseProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
@ -54,8 +55,8 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -49,8 +50,8 @@ public class RenameProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
@ -61,8 +62,8 @@ public class RenameProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [to] is missing")); assertThat(e.getMessage(), equalTo("[to] required property is missing"));
} }
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
@ -57,8 +58,8 @@ public class SetProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
@ -68,8 +69,8 @@ public class SetProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing")); assertThat(e.getMessage(), equalTo("[value] required property is missing"));
} }
} }
@ -80,8 +81,8 @@ public class SetProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing")); assertThat(e.getMessage(), equalTo("[value] required property is missing"));
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -49,8 +50,8 @@ public class SplitProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
@ -61,8 +62,8 @@ public class SplitProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [separator] is missing")); assertThat(e.getMessage(), equalTo("[separator] required property is missing"));
} }
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -46,8 +47,8 @@ public class TrimProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.HashMap; import java.util.HashMap;
@ -46,8 +47,8 @@ public class UppercaseProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("factory create should have failed"); fail("factory create should have failed");
} catch(IllegalArgumentException e) { } catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing")); assertThat(e.getMessage(), equalTo("[field] required property is missing"));
} }
} }
} }

View File

@ -74,9 +74,9 @@ public final class GrokProcessor extends AbstractProcessor {
@Override @Override
public GrokProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public GrokProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "field"); String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String matchPattern = ConfigurationUtils.readStringProperty(config, "pattern"); String matchPattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern");
Map<String, String> customPatternBank = ConfigurationUtils.readOptionalMap(config, "pattern_definitions"); Map<String, String> customPatternBank = ConfigurationUtils.readOptionalMap(TYPE, processorTag, config, "pattern_definitions");
Map<String, String> patternBank = new HashMap<>(builtinPatterns); Map<String, String> patternBank = new HashMap<>(builtinPatterns);
if (customPatternBank != null) { if (customPatternBank != null) {
patternBank.putAll(customPatternBank); patternBank.putAll(customPatternBank);

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest.grok; package org.elasticsearch.ingest.grok;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.Collections; import java.util.Collections;
@ -45,6 +47,32 @@ public class GrokProcessorFactoryTests extends ESTestCase {
assertThat(processor.getGrok(), notNullValue()); assertThat(processor.getGrok(), notNullValue());
} }
public void testBuildMissingField() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("pattern", "(?<foo>\\w+)");
try {
factory.create(config);
fail("should fail");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
public void testBuildMissingPattern() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
try {
factory.create(config);
fail("should fail");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
}
}
public void testCreateWithCustomPatterns() throws Exception { public void testCreateWithCustomPatterns() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());

View File

@ -35,6 +35,8 @@ import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.core.AbstractProcessor; import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -226,10 +228,10 @@ public final class GeoIpProcessor extends AbstractProcessor {
@Override @Override
public GeoIpProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception { public GeoIpProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String ipField = readStringProperty(config, "source_field"); String ipField = readStringProperty(TYPE, processorTag, config, "source_field");
String targetField = readStringProperty(config, "target_field", "geoip"); String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb"); String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> fieldNames = readOptionalList(config, "fields"); List<String> fieldNames = readOptionalList(TYPE, processorTag, config, "fields");
final Set<Field> fields; final Set<Field> fields;
if (fieldNames != null) { if (fieldNames != null) {
@ -238,7 +240,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
try { try {
fields.add(Field.parse(fieldName)); fields.add(Field.parse(fieldName));
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException("illegal field option [" + fieldName +"]. valid values are [" + Arrays.toString(Field.values()) +"]", e); throw new ConfigurationPropertyException(TYPE, processorTag, "fields", "illegal field option [" + fieldName + "]. valid values are [" + Arrays.toString(Field.values()) +"]");
} }
} }
} else { } else {
@ -247,7 +249,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
DatabaseReader databaseReader = databaseReaders.get(databaseFile); DatabaseReader databaseReader = databaseReaders.get(databaseFile);
if (databaseReader == null) { if (databaseReader == null) {
throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist"); throw new ConfigurationPropertyException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist");
} }
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, fields); return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, fields);
} }

View File

@ -21,6 +21,8 @@ package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ingest.core.AbstractProcessorFactory; import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.test.StreamsUtils;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -111,8 +113,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("Exception expected"); fail("Exception expected");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("database file [does-not-exist.mmdb] doesn't exist")); assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
} }
} }
@ -144,8 +146,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("exception expected"); fail("exception expected");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]")); assertThat(e.getMessage(), equalTo("[fields] illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]"));
} }
config = new HashMap<>(); config = new HashMap<>();
@ -154,8 +156,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try { try {
factory.create(config); factory.create(config);
fail("exception expected"); fail("exception expected");
} catch (IllegalArgumentException e) { } catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("property [fields] isn't a list, but of type [java.lang.String]")); assertThat(e.getMessage(), equalTo("[fields] property isn't a list, but of type [java.lang.String]"));
} }
} }
} }

View File

@ -50,6 +50,29 @@
] ]
} }
---
"Test invalid processor config":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"tag" : "fritag"
}
}
]
}
- match: { "acknowledged": false }
- length: { "error": 4 }
- match: { "error.reason": "[field] required property is missing" }
- match: { "error.property_name": "field" }
- match: { "error.type": "set" }
- match: { "error.tag": "fritag" }
--- ---
"Test basic pipeline with on_failure in processor": "Test basic pipeline with on_failure in processor":
- do: - do:

View File

@ -81,6 +81,7 @@
"processors": [ "processors": [
{ {
"set" : { "set" : {
"tag" : "fails",
"value" : "_value" "value" : "_value"
} }
} }
@ -97,10 +98,11 @@
} }
] ]
} }
- length: { error: 3 } - length: { error: 4 }
- match: { status: 400 } - match: { error.tag: "fails" }
- match: { error.type: "illegal_argument_exception" } - match: { error.type: "set" }
- match: { error.reason: "required property [field] is missing" } - match: { error.reason: "[field] required property is missing" }
- match: { error.property_name: "field" }
--- ---
"Test simulate without index type and id": "Test simulate without index type and id":
@ -189,10 +191,45 @@
} }
] ]
} }
- length: { error: 3 } - length: { error: 4 }
- match: { status: 400 } - is_false: error.processor_type
- match: { error.type: "illegal_argument_exception" } - is_false: error.processor_tag
- match: { error.reason: "required property [pipeline] is missing" } - match: { error.property_name: "pipeline" }
- match: { error.reason: "[pipeline] required property is missing" }
---
"Test simulate with invalid processor config":
- do:
catch: request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { error: 4 }
- match: { error.type: "set" }
- is_false: error.tag
- match: { error.reason: "[value] required property is missing" }
- match: { error.property_name: "value" }
--- ---
"Test simulate with verbose flag": "Test simulate with verbose flag":