Merge pull request #16276 from talevy/more_details_factory_factory

catch processor/pipeline factory exceptions and return structured error responses
This commit is contained in:
Tal Levy 2016-01-29 13:44:31 -08:00
commit 6e91d65a99
54 changed files with 801 additions and 203 deletions

View File

@ -139,24 +139,24 @@ public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineReque
}
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
private static List<IngestDocument> parseDocs(Map<String, Object> config) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS);
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, MetaData.INDEX.getFieldName(), "_index"),
ConfigurationUtils.readStringProperty(dataMap, MetaData.TYPE.getFieldName(), "_type"),
ConfigurationUtils.readStringProperty(dataMap, MetaData.ID.getFieldName(), "_id"),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.ROUTING.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.PARENT.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TIMESTAMP.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TTL.getFieldName()),
Map<String, Object> document = ConfigurationUtils.readMap(null, null, dataMap, Fields.SOURCE);
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.INDEX.getFieldName(), "_index"),
ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.TYPE.getFieldName(), "_type"),
ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.TIMESTAMP.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.TTL.getFieldName()),
document);
ingestDocumentList.add(ingestDocument);
}

View File

@ -22,24 +22,31 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SimulatePipelineResponse extends ActionResponse implements ToXContent {
public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent {
private String pipelineId;
private boolean verbose;
private List<SimulateDocumentResult> results;
private PipelineFactoryError error;
public SimulatePipelineResponse() {
}
public SimulatePipelineResponse(PipelineFactoryError error) {
this.error = error;
}
public SimulatePipelineResponse(String pipelineId, boolean verbose, List<SimulateDocumentResult> responses) {
this.pipelineId = pipelineId;
this.verbose = verbose;
@ -58,42 +65,69 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
return verbose;
}
public boolean isError() {
return error != null;
}
@Override
public RestStatus status() {
if (isError()) {
return RestStatus.BAD_REQUEST;
}
return RestStatus.OK;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pipelineId);
out.writeBoolean(verbose);
out.writeVInt(results.size());
for (SimulateDocumentResult response : results) {
response.writeTo(out);
out.writeBoolean(isError());
if (isError()) {
error.writeTo(out);
} else {
out.writeString(pipelineId);
out.writeBoolean(verbose);
out.writeVInt(results.size());
for (SimulateDocumentResult response : results) {
response.writeTo(out);
}
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.pipelineId = in.readString();
boolean verbose = in.readBoolean();
int responsesLength = in.readVInt();
results = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult<?> simulateDocumentResult;
if (verbose) {
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in);
} else {
simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in);
boolean isError = in.readBoolean();
if (isError) {
error = new PipelineFactoryError();
error.readFrom(in);
} else {
this.pipelineId = in.readString();
boolean verbose = in.readBoolean();
int responsesLength = in.readVInt();
results = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult<?> simulateDocumentResult;
if (verbose) {
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in);
} else {
simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in);
}
results.add(simulateDocumentResult);
}
results.add(simulateDocumentResult);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : results) {
response.toXContent(builder, params);
if (isError()) {
error.toXContent(builder, params);
} else {
builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : results) {
response.toXContent(builder, params);
}
builder.endArray();
}
builder.endArray();
return builder;
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -56,6 +58,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
}
} catch (ConfigurationPropertyException e) {
listener.onResponse(new SimulatePipelineResponse(new PipelineFactoryError(e)));
return;
} catch (Exception e) {
listener.onFailure(e);
return;

View File

@ -22,27 +22,49 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import java.io.IOException;
public class WritePipelineResponse extends AcknowledgedResponse {
private PipelineFactoryError error;
WritePipelineResponse() {
}
public WritePipelineResponse(boolean acknowledge) {
super(acknowledge);
public WritePipelineResponse(boolean acknowledged) {
super(acknowledged);
if (!isAcknowledged()) {
error = new PipelineFactoryError("pipeline write is not acknowledged");
}
}
public WritePipelineResponse(PipelineFactoryError error) {
super(false);
this.error = error;
}
public PipelineFactoryError getError() {
return error;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
if (!isAcknowledged()) {
error = new PipelineFactoryError();
error.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
if (!isAcknowledged()) {
error.writeTo(out);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import java.io.IOException;
public class WritePipelineResponseRestListener extends AcknowledgedRestListener<WritePipelineResponse> {
public WritePipelineResponseRestListener(RestChannel channel) {
super(channel);
}
@Override
protected void addCustomFields(XContentBuilder builder, WritePipelineResponse response) throws IOException {
if (!response.isAcknowledged()) {
response.getError().toXContent(builder, null);
}
}
}

View File

@ -36,8 +36,10 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.script.ScriptService;
import java.io.Closeable;
@ -101,7 +103,7 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
Map<String, Pipeline> pipelines = new HashMap<>();
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), constructPipeline(pipeline.getId(), pipeline.getConfigAsMap()));
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactoryRegistry));
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -148,16 +150,14 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
/**
* Stores the specified pipeline definition in the request.
*
* @throws IllegalArgumentException If the pipeline holds incorrect configuration
*/
public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) throws IllegalArgumentException {
try {
// validates the pipeline and processor configuration before submitting a cluster update task:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
constructPipeline(request.getId(), pipelineConfig);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid pipeline configuration", e);
public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) {
// validates the pipeline and processor configuration before submitting a cluster update task:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
WritePipelineResponse response = validatePipelineResponse(request.getId(), pipelineConfig);
if (response != null) {
listener.onResponse(response);
return;
}
clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
@ -235,8 +235,15 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
return result;
}
private Pipeline constructPipeline(String id, Map<String, Object> config) throws Exception {
return factory.create(id, config, processorFactoryRegistry);
WritePipelineResponse validatePipelineResponse(String id, Map<String, Object> config) {
try {
factory.create(id, config, processorFactoryRegistry);
return null;
} catch (ConfigurationPropertyException e) {
return new WritePipelineResponse(new PipelineFactoryError(e));
} catch (Exception e) {
return new WritePipelineResponse(new PipelineFactoryError(e.getMessage()));
}
}
}

View File

@ -31,7 +31,7 @@ public abstract class AbstractProcessorFactory<P extends Processor> implements P
@Override
public P create(Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(config, TAG_KEY);
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
return doCreate(tag, config);
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.List;
import java.util.Map;
@ -30,133 +32,133 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified optional property from the specified configuration map.
*
* If the property value isn't of type string a {@link IllegalArgumentException} is thrown.
* If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown.
*/
public static String readOptionalStringProperty(Map<String, Object> configuration, String propertyName) {
public static String readOptionalStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
return readString(propertyName, value);
return readString(processorType, processorTag, propertyName, value);
}
/**
* Returns and removes the specified property from the specified configuration map.
*
* If the property value isn't of type string an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
* If the property value isn't of type string an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link ConfigurationPropertyException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName) {
return readStringProperty(configuration, propertyName, null);
public static String readStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
return readStringProperty(processorType, processorTag, configuration, propertyName, null);
}
/**
* Returns and removes the specified property from the specified configuration map.
*
* If the property value isn't of type string a {@link IllegalArgumentException} is thrown.
* If the property is missing and no default value has been specified a {@link IllegalArgumentException} is thrown
* If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown.
* If the property is missing and no default value has been specified a {@link ConfigurationPropertyException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
public static String readStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.remove(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;
} else if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
}
return readString(propertyName, value);
return readString(processorType, processorTag, propertyName, value);
}
private static String readString(String propertyName, Object value) {
private static String readString(String processorType, String processorTag, String propertyName, Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return (String) value;
}
throw new IllegalArgumentException("property [" + propertyName + "] isn't a string, but of type [" + value.getClass().getName() + "]");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
}
/**
* Returns and removes the specified property of type list from the specified configuration map.
*
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
* If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown.
*/
public static <T> List<T> readOptionalList(Map<String, Object> configuration, String propertyName) {
public static <T> List<T> readOptionalList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
return null;
}
return readList(propertyName, value);
return readList(processorType, processorTag, propertyName, value);
}
/**
* Returns and removes the specified property of type list from the specified configuration map.
*
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
* If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link ConfigurationPropertyException} is thrown
*/
public static <T> List<T> readList(Map<String, Object> configuration, String propertyName) {
public static <T> List<T> readList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
}
return readList(propertyName, value);
return readList(processorType, processorTag, propertyName, value);
}
private static <T> List<T> readList(String propertyName, Object value) {
private static <T> List<T> readList(String processorType, String processorTag, String propertyName, Object value) {
if (value instanceof List) {
@SuppressWarnings("unchecked")
List<T> stringList = (List<T>) value;
return stringList;
} else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a list, but of type [" + value.getClass().getName() + "]");
}
}
/**
* Returns and removes the specified property of type map from the specified configuration map.
*
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
* If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link ConfigurationPropertyException} is thrown
*/
public static <T> Map<String, T> readMap(Map<String, Object> configuration, String propertyName) {
public static <T> Map<String, T> readMap(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
}
return readMap(propertyName, value);
return readMap(processorType, processorTag, propertyName, value);
}
/**
* Returns and removes the specified property of type map from the specified configuration map.
*
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
* If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown.
*/
public static <T> Map<String, T> readOptionalMap(Map<String, Object> configuration, String propertyName) {
public static <T> Map<String, T> readOptionalMap(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
return null;
}
return readMap(propertyName, value);
return readMap(processorType, processorTag, propertyName, value);
}
private static <T> Map<String, T> readMap(String propertyName, Object value) {
private static <T> Map<String, T> readMap(String processorType, String processorTag, String propertyName, Object value) {
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, T> map = (Map<String, T>) value;
return map;
} else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a map, but of type [" + value.getClass().getName() + "]");
}
}
/**
* Returns and removes the specified property as an {@link Object} from the specified configuration map.
*/
public static Object readObject(Map<String, Object> configuration, String propertyName) {
public static Object readObject(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
}
return value;
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -82,19 +84,20 @@ public final class Pipeline {
public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(config, DESCRIPTION_KEY);
List<Processor> processors = readProcessors(PROCESSORS_KEY, processorRegistry, config);
List<Processor> onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config);
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws ConfigurationPropertyException {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = readProcessorConfigs(processorConfigs, processorRegistry);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
if (config.isEmpty() == false) {
throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
throw new ConfigurationPropertyException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor);
}
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readOptionalList(config, fieldName);
private List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs, Map<String, Processor.Factory> processorRegistry) throws ConfigurationPropertyException {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
@ -107,20 +110,28 @@ public final class Pipeline {
return processors;
}
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws ConfigurationPropertyException {
Processor.Factory factory = processorRegistry.get(type);
if (factory != null) {
List<Processor> onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config);
Processor processor = factory.create(config);
if (config.isEmpty() == false) {
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor;
try {
processor = factory.create(config);
} catch (ConfigurationPropertyException e) {
throw e;
} catch (Exception e) {
throw new ConfigurationPropertyException(e.getMessage());
}
if (!config.isEmpty()) {
throw new ConfigurationPropertyException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.isEmpty()) {
return processor;
}
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
}
throw new IllegalArgumentException("No processor type exists with name [" + type + "]");
throw new ConfigurationPropertyException("No processor type exists with name [" + type + "]");
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.io.IOException;
public class PipelineFactoryError implements Streamable, ToXContent {
private String reason;
private String processorType;
private String processorTag;
private String processorPropertyName;
public PipelineFactoryError() {
}
public PipelineFactoryError(ConfigurationPropertyException e) {
this.reason = e.getMessage();
this.processorType = e.getProcessorType();
this.processorTag = e.getProcessorTag();
this.processorPropertyName = e.getPropertyName();
}
public PipelineFactoryError(String reason) {
this.reason = "Constructing Pipeline failed:" + reason;
}
public String getReason() {
return reason;
}
public String getProcessorTag() {
return processorTag;
}
public String getProcessorPropertyName() {
return processorPropertyName;
}
public String getProcessorType() {
return processorType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
reason = in.readString();
processorType = in.readOptionalString();
processorTag = in.readOptionalString();
processorPropertyName = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(reason);
out.writeOptionalString(processorType);
out.writeOptionalString(processorTag);
out.writeOptionalString(processorPropertyName);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("error");
builder.field("type", processorType);
builder.field("tag", processorTag);
builder.field("reason", reason);
builder.field("property_name", processorPropertyName);
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
public class PipelineFactoryResult {
private final Pipeline pipeline;
private final PipelineFactoryError error;
public PipelineFactoryResult(Pipeline pipeline) {
this.pipeline = pipeline;
this.error = null;
}
public PipelineFactoryResult(PipelineFactoryError error) {
this.error = error;
this.pipeline = null;
}
public Pipeline getPipeline() {
return pipeline;
}
public PipelineFactoryError getError() {
return error;
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.Map;
/**

View File

@ -55,10 +55,15 @@ public abstract class AbstractStringProcessor extends AbstractProcessor {
protected abstract String process(String value);
public static abstract class Factory<T extends AbstractStringProcessor> extends AbstractProcessorFactory<T> {
protected final String processorType;
protected Factory(String processorType) {
this.processorType = processorType;
}
@Override
public T doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field");
return newProcessor(processorTag, field);
}

View File

@ -74,8 +74,8 @@ public class AppendProcessor extends AbstractProcessor {
@Override
public AppendProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Object value = ConfigurationUtils.readObject(config, "value");
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
/**
* Exception class thrown by processor factories.
*/
public class ConfigurationPropertyException extends RuntimeException {
private String processorType;
private String processorTag;
private String propertyName;
public ConfigurationPropertyException(String processorType, String processorTag, String propertyName, String message) {
super("[" + propertyName + "] " + message);
this.processorTag = processorTag;
this.processorType = processorType;
this.propertyName = propertyName;
}
public ConfigurationPropertyException(String errorMessage) {
super(errorMessage);
}
public String getPropertyName() {
return propertyName;
}
public String getProcessorType() {
return processorType;
}
public String getProcessorTag() {
return processorTag;
}
}

View File

@ -137,8 +137,8 @@ public class ConvertProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<ConvertProcessor> {
@Override
public ConvertProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(config, "type"));
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type"));
return new ConvertProcessor(processorTag, field, convertType);
}
}

View File

@ -112,11 +112,11 @@ public final class DateProcessor extends AbstractProcessor {
@SuppressWarnings("unchecked")
public DateProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "match_field");
String targetField = ConfigurationUtils.readStringProperty(config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(config, "timezone");
String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "match_field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale");
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "locale");
Locale locale = Locale.ENGLISH;
if (localeString != null) {
try {
@ -125,7 +125,7 @@ public final class DateProcessor extends AbstractProcessor {
throw new IllegalArgumentException("Invalid language tag specified: " + localeString);
}
}
List<String> matchFormats = ConfigurationUtils.readList(config, "match_formats");
List<String> matchFormats = ConfigurationUtils.readList(TYPE, processorTag, config, "match_formats");
return new DateProcessor(processorTag, timezone, locale, matchField, matchFormats, targetField);
}
}

View File

@ -95,7 +95,7 @@ public class DeDotProcessor extends AbstractProcessor {
@Override
public DeDotProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String separator = ConfigurationUtils.readOptionalStringProperty(config, "separator");
String separator = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "separator");
if (separator == null) {
separator = DEFAULT_SEPARATOR;
}

View File

@ -66,7 +66,7 @@ public class FailProcessor extends AbstractProcessor {
@Override
public FailProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(config, "message");
String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
return new FailProcessor(processorTag, templateService.compile(message));
}
}

View File

@ -79,9 +79,9 @@ public class GsubProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<GsubProcessor> {
@Override
public GsubProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String pattern = ConfigurationUtils.readStringProperty(config, "pattern");
String replacement = ConfigurationUtils.readStringProperty(config, "replacement");
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String pattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern");
String replacement = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "replacement");
Pattern searchPattern = Pattern.compile(pattern);
return new GsubProcessor(processorTag, field, searchPattern, replacement);
}

View File

@ -73,8 +73,8 @@ public class JoinProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<JoinProcessor> {
@Override
public JoinProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String separator = ConfigurationUtils.readStringProperty(config, "separator");
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator");
return new JoinProcessor(processorTag, field, separator);
}
}

View File

@ -45,6 +45,11 @@ public class LowercaseProcessor extends AbstractStringProcessor {
}
public static class Factory extends AbstractStringProcessor.Factory<LowercaseProcessor> {
public Factory() {
super(TYPE);
}
@Override
protected LowercaseProcessor newProcessor(String tag, String field) {
return new LowercaseProcessor(tag, field);

View File

@ -65,7 +65,7 @@ public class RemoveProcessor extends AbstractProcessor {
@Override
public RemoveProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new RemoveProcessor(processorTag, templateService.compile(field));
}
}

View File

@ -78,8 +78,8 @@ public class RenameProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<RenameProcessor> {
@Override
public RenameProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String newField = ConfigurationUtils.readStringProperty(config, "to");
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "to");
return new RenameProcessor(processorTag, field, newField);
}
}

View File

@ -73,8 +73,8 @@ public class SetProcessor extends AbstractProcessor {
@Override
public SetProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Object value = ConfigurationUtils.readObject(config, "value");
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new SetProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
}
}

View File

@ -75,8 +75,8 @@ public class SplitProcessor extends AbstractProcessor {
public static class Factory extends AbstractProcessorFactory<SplitProcessor> {
@Override
public SplitProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(config, "separator"));
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator"));
}
}
}

View File

@ -42,6 +42,11 @@ public class TrimProcessor extends AbstractStringProcessor {
}
public static class Factory extends AbstractStringProcessor.Factory<TrimProcessor> {
public Factory() {
super(TYPE);
}
@Override
protected TrimProcessor newProcessor(String tag, String field) {
return new TrimProcessor(tag, field);

View File

@ -44,6 +44,11 @@ public class UppercaseProcessor extends AbstractStringProcessor {
}
public static class Factory extends AbstractStringProcessor.Factory<UppercaseProcessor> {
public Factory() {
super(TYPE);
}
@Override
protected UppercaseProcessor newProcessor(String tag, String field) {
return new UppercaseProcessor(tag, field);

View File

@ -20,9 +20,13 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponseRestListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -30,6 +34,8 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
public class RestPutPipelineAction extends BaseRestHandler {
@Inject
@ -43,6 +49,7 @@ public class RestPutPipelineAction extends BaseRestHandler {
PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), RestActions.getRestContent(restRequest));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
client.admin().cluster().putPipeline(request, new AcknowledgedRestListener<>(channel));
client.admin().cluster().putPipeline(request, new WritePipelineResponseRestListener(channel));
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestSimulatePipelineAction extends BaseRestHandler {
@ -46,6 +47,6 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
SimulatePipelineRequest request = new SimulatePipelineRequest(RestActions.getRestContent(restRequest));
request.setId(restRequest.param("id"));
request.setVerbose(restRequest.paramAsBoolean("verbose", false));
client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel));
client.admin().cluster().simulatePipeline(request, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
public class WritePipelineResponseTests extends ESTestCase {
public void testSerializationWithoutError() throws IOException {
boolean isAcknowledged = randomBoolean();
WritePipelineResponse response;
response = new WritePipelineResponse(isAcknowledged);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
WritePipelineResponse otherResponse = new WritePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(otherResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
}
public void testSerializationWithError() throws IOException {
PipelineFactoryError error = new PipelineFactoryError("error");
WritePipelineResponse response = new WritePipelineResponse(error);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
WritePipelineResponse otherResponse = new WritePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(otherResponse.getError().getReason(), equalTo(response.getError().getReason()));
assertThat(otherResponse.getError().getProcessorType(), equalTo(response.getError().getProcessorType()));
assertThat(otherResponse.getError().getProcessorTag(), equalTo(response.getError().getProcessorTag()));
assertThat(otherResponse.getError().getProcessorPropertyName(), equalTo(response.getError().getProcessorPropertyName()));
}
}

View File

@ -47,6 +47,7 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
@ -200,6 +201,39 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(getResponse.pipelines().size(), equalTo(0));
}
public void testPutWithPipelineError() throws Exception {
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("not_found")
.endObject()
.endObject()
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source);
WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get();
assertThat(response.isAcknowledged(), equalTo(false));
assertThat(response.getError().getReason(), equalTo("No processor type exists with name [not_found]"));
}
public void testPutWithProcessorFactoryError() throws Exception {
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.field("unused", ":sad_face:")
.endObject()
.endObject()
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source);
WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get();
assertThat(response.isAcknowledged(), equalTo(false));
assertThat(response.getError().getReason(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Collections.singletonList(TestSeedPlugin.class);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -41,7 +42,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class PipelineStoreTests extends ESTestCase {
@ -102,6 +102,45 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(pipeline.getProcessors().size(), equalTo(0));
}
public void testPutWithErrorResponse() {
}
public void testConstructPipelineResponseSuccess() {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("field", "foo");
processorConfig.put("value", "bar");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig)));
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response, nullValue());
}
public void testConstructPipelineResponseMissingProcessorsFieldException() {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response.getError().getProcessorType(), is(nullValue()));
assertThat(response.getError().getProcessorTag(), is(nullValue()));
assertThat(response.getError().getProcessorPropertyName(), equalTo("processors"));
assertThat(response.getError().getReason(), equalTo("[processors] required property is missing"));
}
public void testConstructPipelineResponseConfigurationException() {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("field", "foo");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig)));
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response.getError().getProcessorTag(), nullValue());
assertThat(response.getError().getProcessorType(), equalTo("set"));
assertThat(response.getError().getProcessorPropertyName(), equalTo("value"));
assertThat(response.getError().getReason(), equalTo("[value] required property is missing"));
}
public void testDelete() {
PipelineConfiguration config = new PipelineConfiguration(
"_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")

View File

@ -19,11 +19,13 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -49,21 +51,21 @@ public class ConfigurationUtilsTests extends ESTestCase {
}
public void testReadStringProperty() {
String val = ConfigurationUtils.readStringProperty(config, "foo");
String val = ConfigurationUtils.readStringProperty(null, null, config, "foo");
assertThat(val, equalTo("bar"));
}
public void testReadStringPropertyInvalidType() {
try {
ConfigurationUtils.readStringProperty(config, "arr");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("property [arr] isn't a string, but of type [java.util.Arrays$ArrayList]"));
ConfigurationUtils.readStringProperty(null, null, config, "arr");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
}
}
// TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer>
public void testOptional_InvalidType() {
List<String> val = ConfigurationUtils.readList(config, "int");
assertThat(val, equalTo(Arrays.asList(2)));
List<String> val = ConfigurationUtils.readList(null, null, config, "int");
assertThat(val, equalTo(Collections.singletonList(2)));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
@ -51,6 +52,18 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(pipeline.getProcessors().get(1).getTag(), nullValue());
}
public void testCreateWithNoProcessorsField() throws Exception {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
Pipeline.Factory factory = new Pipeline.Factory();
try {
factory.create("_id", pipelineConfig, Collections.emptyMap());
fail("should fail, missing required [processors] field");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
}
}
public void testCreateWithPipelineOnFailure() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
@ -78,7 +91,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (IllegalArgumentException e) {
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -64,8 +65,8 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -75,8 +76,8 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}
@ -87,8 +88,8 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
@ -66,8 +67,8 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), Matchers.equalTo("required property [field] is missing"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
}
}
@ -78,8 +79,8 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), Matchers.equalTo("required property [type] is missing"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), Matchers.equalTo("[type] required property is missing"));
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTimeZone;
@ -63,8 +64,8 @@ public class DateProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("processor creation should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("required property [match_field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), containsString("[match_field] required property is missing"));
}
}
@ -79,8 +80,8 @@ public class DateProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("processor creation should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("required property [match_formats] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), containsString("[match_formats] required property is missing"));
}
}
@ -169,8 +170,8 @@ public class DateProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("processor creation should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("property [match_formats] isn't a list, but of type [java.lang.String]"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), containsString("[match_formats] property isn't a list, but of type [java.lang.String]"));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -54,8 +55,8 @@ public class FailProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [message] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[message] required property is missing"));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -52,8 +53,8 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -65,8 +66,8 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [pattern] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
}
}
@ -78,8 +79,8 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [replacement] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[replacement] required property is missing"));
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -49,8 +50,8 @@ public class JoinProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -61,8 +62,8 @@ public class JoinProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [separator] is missing"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[separator] required property is missing"));
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -46,8 +47,8 @@ public class LowercaseProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -54,8 +55,8 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -49,8 +50,8 @@ public class RenameProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -61,8 +62,8 @@ public class RenameProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [to] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[to] required property is missing"));
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -57,8 +58,8 @@ public class SetProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -68,8 +69,8 @@ public class SetProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}
@ -80,8 +81,8 @@ public class SetProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -49,8 +50,8 @@ public class SplitProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -61,8 +62,8 @@ public class SplitProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [separator] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[separator] required property is missing"));
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -46,8 +47,8 @@ public class TrimProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -46,8 +47,8 @@ public class UppercaseProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
} catch(ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
}

View File

@ -74,9 +74,9 @@ public final class GrokProcessor extends AbstractProcessor {
@Override
public GrokProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "field");
String matchPattern = ConfigurationUtils.readStringProperty(config, "pattern");
Map<String, String> customPatternBank = ConfigurationUtils.readOptionalMap(config, "pattern_definitions");
String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String matchPattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern");
Map<String, String> customPatternBank = ConfigurationUtils.readOptionalMap(TYPE, processorTag, config, "pattern_definitions");
Map<String, String> patternBank = new HashMap<>(builtinPatterns);
if (customPatternBank != null) {
patternBank.putAll(customPatternBank);

View File

@ -20,6 +20,8 @@
package org.elasticsearch.ingest.grok;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
@ -45,6 +47,32 @@ public class GrokProcessorFactoryTests extends ESTestCase {
assertThat(processor.getGrok(), notNullValue());
}
public void testBuildMissingField() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("pattern", "(?<foo>\\w+)");
try {
factory.create(config);
fail("should fail");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
public void testBuildMissingPattern() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
try {
factory.create(config);
fail("should fail");
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
}
}
public void testCreateWithCustomPatterns() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());

View File

@ -35,6 +35,8 @@ import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.io.Closeable;
import java.io.IOException;
@ -226,10 +228,10 @@ public final class GeoIpProcessor extends AbstractProcessor {
@Override
public GeoIpProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String ipField = readStringProperty(config, "source_field");
String targetField = readStringProperty(config, "target_field", "geoip");
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb");
List<String> fieldNames = readOptionalList(config, "fields");
String ipField = readStringProperty(TYPE, processorTag, config, "source_field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> fieldNames = readOptionalList(TYPE, processorTag, config, "fields");
final Set<Field> fields;
if (fieldNames != null) {
@ -238,7 +240,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
try {
fields.add(Field.parse(fieldName));
} catch (Exception e) {
throw new IllegalArgumentException("illegal field option [" + fieldName +"]. valid values are [" + Arrays.toString(Field.values()) +"]", e);
throw new ConfigurationPropertyException(TYPE, processorTag, "fields", "illegal field option [" + fieldName + "]. valid values are [" + Arrays.toString(Field.values()) +"]");
}
}
} else {
@ -247,7 +249,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
DatabaseReader databaseReader = databaseReaders.get(databaseFile);
if (databaseReader == null) {
throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist");
throw new ConfigurationPropertyException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist");
}
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, fields);
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils;
import org.junit.AfterClass;
@ -111,8 +113,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("Exception expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("database file [does-not-exist.mmdb] doesn't exist"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
}
}
@ -144,8 +146,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("exception expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[fields] illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]"));
}
config = new HashMap<>();
@ -154,8 +156,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("exception expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("property [fields] isn't a list, but of type [java.lang.String]"));
} catch (ConfigurationPropertyException e) {
assertThat(e.getMessage(), equalTo("[fields] property isn't a list, but of type [java.lang.String]"));
}
}
}

View File

@ -50,6 +50,29 @@
]
}
---
"Test invalid processor config":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"tag" : "fritag"
}
}
]
}
- match: { "acknowledged": false }
- length: { "error": 4 }
- match: { "error.reason": "[field] required property is missing" }
- match: { "error.property_name": "field" }
- match: { "error.type": "set" }
- match: { "error.tag": "fritag" }
---
"Test basic pipeline with on_failure in processor":
- do:

View File

@ -81,6 +81,7 @@
"processors": [
{
"set" : {
"tag" : "fails",
"value" : "_value"
}
}
@ -97,10 +98,11 @@
}
]
}
- length: { error: 3 }
- match: { status: 400 }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "required property [field] is missing" }
- length: { error: 4 }
- match: { error.tag: "fails" }
- match: { error.type: "set" }
- match: { error.reason: "[field] required property is missing" }
- match: { error.property_name: "field" }
---
"Test simulate without index type and id":
@ -189,10 +191,45 @@
}
]
}
- length: { error: 3 }
- match: { status: 400 }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "required property [pipeline] is missing" }
- length: { error: 4 }
- is_false: error.processor_type
- is_false: error.processor_tag
- match: { error.property_name: "pipeline" }
- match: { error.reason: "[pipeline] required property is missing" }
---
"Test simulate with invalid processor config":
- do:
catch: request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { error: 4 }
- match: { error.type: "set" }
- is_false: error.tag
- match: { error.reason: "[value] required property is missing" }
- match: { error.property_name: "value" }
---
"Test simulate with verbose flag":