revert PipelineFactoryError handling with throwing ElasticsearchParseException in ingest pipeline creation

This commit is contained in:
Tal Levy 2016-02-01 13:40:54 -08:00
parent dcb88909d5
commit 0a1580eefa
37 changed files with 190 additions and 510 deletions

View File

@ -22,31 +22,24 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent {
public class SimulatePipelineResponse extends ActionResponse implements ToXContent {
private String pipelineId;
private boolean verbose;
private List<SimulateDocumentResult> results;
private PipelineFactoryError error;
public SimulatePipelineResponse() {
}
public SimulatePipelineResponse(PipelineFactoryError error) {
this.error = error;
}
public SimulatePipelineResponse(String pipelineId, boolean verbose, List<SimulateDocumentResult> responses) {
this.pipelineId = pipelineId;
this.verbose = verbose;
@ -65,69 +58,42 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
return verbose;
}
public boolean isError() {
return error != null;
}
@Override
public RestStatus status() {
if (isError()) {
return RestStatus.BAD_REQUEST;
}
return RestStatus.OK;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(isError());
if (isError()) {
error.writeTo(out);
} else {
out.writeString(pipelineId);
out.writeBoolean(verbose);
out.writeVInt(results.size());
for (SimulateDocumentResult response : results) {
response.writeTo(out);
}
out.writeString(pipelineId);
out.writeBoolean(verbose);
out.writeVInt(results.size());
for (SimulateDocumentResult response : results) {
response.writeTo(out);
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
boolean isError = in.readBoolean();
if (isError) {
error = new PipelineFactoryError();
error.readFrom(in);
} else {
this.pipelineId = in.readString();
boolean verbose = in.readBoolean();
int responsesLength = in.readVInt();
results = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult<?> simulateDocumentResult;
if (verbose) {
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in);
} else {
simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in);
}
results.add(simulateDocumentResult);
this.pipelineId = in.readString();
boolean verbose = in.readBoolean();
int responsesLength = in.readVInt();
results = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult<?> simulateDocumentResult;
if (verbose) {
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in);
} else {
simulateDocumentResult = SimulateDocumentBaseResult.readSimulateDocumentSimpleResult(in);
}
results.add(simulateDocumentResult);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (isError()) {
error.toXContent(builder, params);
} else {
builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : results) {
response.toXContent(builder, params);
}
builder.endArray();
builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : results) {
response.toXContent(builder, params);
}
builder.endArray();
return builder;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -27,8 +28,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -58,9 +57,6 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
}
} catch (ConfigurationPropertyException e) {
listener.onResponse(new SimulatePipelineResponse(new PipelineFactoryError(e)));
return;
} catch (Exception e) {
listener.onFailure(e);
return;

View File

@ -22,12 +22,10 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import java.io.IOException;
public class WritePipelineResponse extends AcknowledgedResponse {
private PipelineFactoryError error;
WritePipelineResponse() {
@ -35,36 +33,17 @@ public class WritePipelineResponse extends AcknowledgedResponse {
public WritePipelineResponse(boolean acknowledged) {
super(acknowledged);
if (!isAcknowledged()) {
error = new PipelineFactoryError("pipeline write is not acknowledged");
}
}
public WritePipelineResponse(PipelineFactoryError error) {
super(false);
this.error = error;
}
public PipelineFactoryError getError() {
return error;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
if (!isAcknowledged()) {
error = new PipelineFactoryError();
error.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
if (!isAcknowledged()) {
error.writeTo(out);
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import java.io.IOException;
public class WritePipelineResponseRestListener extends AcknowledgedRestListener<WritePipelineResponse> {
public WritePipelineResponseRestListener(RestChannel channel) {
super(channel);
}
@Override
protected void addCustomFields(XContentBuilder builder, WritePipelineResponse response) throws IOException {
if (!response.isAcknowledged()) {
response.getError().toXContent(builder, null);
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
@ -36,10 +37,8 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.script.ScriptService;
import java.io.Closeable;
@ -104,8 +103,10 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactoryRegistry));
} catch (ElasticsearchParseException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
}
}
this.pipelines = Collections.unmodifiableMap(pipelines);
@ -154,9 +155,10 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) {
// validates the pipeline and processor configuration before submitting a cluster update task:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
WritePipelineResponse response = validatePipelineResponse(request.getId(), pipelineConfig);
if (response != null) {
listener.onResponse(response);
try {
factory.create(request.getId(), pipelineConfig, processorFactoryRegistry);
} catch(Exception e) {
listener.onFailure(e);
return;
}
clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
@ -234,16 +236,4 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
}
return result;
}
WritePipelineResponse validatePipelineResponse(String id, Map<String, Object> config) {
try {
factory.create(id, config, processorFactoryRegistry);
return null;
} catch (ConfigurationPropertyException e) {
return new WritePipelineResponse(new PipelineFactoryError(e));
} catch (Exception e) {
return new WritePipelineResponse(new PipelineFactoryError(e.getMessage()));
}
}
}

View File

@ -19,7 +19,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import java.util.List;
import java.util.Map;
@ -32,7 +33,7 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified optional property from the specified configuration map.
*
* If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown.
* If the property value isn't of type string a {@link ElasticsearchParseException} is thrown.
*/
public static String readOptionalStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
@ -42,8 +43,8 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified property from the specified configuration map.
*
* If the property value isn't of type string an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link ConfigurationPropertyException} is thrown
* If the property value isn't of type string an {@link ElasticsearchParseException} is thrown.
* If the property is missing an {@link ElasticsearchParseException} is thrown
*/
public static String readStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
return readStringProperty(processorType, processorTag, configuration, propertyName, null);
@ -52,15 +53,15 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified property from the specified configuration map.
*
* If the property value isn't of type string a {@link ConfigurationPropertyException} is thrown.
* If the property is missing and no default value has been specified a {@link ConfigurationPropertyException} is thrown
* If the property value isn't of type string a {@link ElasticsearchParseException} is thrown.
* If the property is missing and no default value has been specified a {@link ElasticsearchParseException} is thrown
*/
public static String readStringProperty(String processorType, String processorTag, Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.remove(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;
} else if (value == null) {
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
throw newConfigurationException(processorType, processorTag, propertyName, "required property is missing");
}
return readString(processorType, processorTag, propertyName, value);
}
@ -72,13 +73,13 @@ public final class ConfigurationUtils {
if (value instanceof String) {
return (String) value;
}
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
throw newConfigurationException(processorType, processorTag, propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
}
/**
* Returns and removes the specified property of type list from the specified configuration map.
*
* If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown.
* If the property value isn't of type list an {@link ElasticsearchParseException} is thrown.
*/
public static <T> List<T> readOptionalList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
@ -91,13 +92,13 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified property of type list from the specified configuration map.
*
* If the property value isn't of type list an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link ConfigurationPropertyException} is thrown
* If the property value isn't of type list an {@link ElasticsearchParseException} is thrown.
* If the property is missing an {@link ElasticsearchParseException} is thrown
*/
public static <T> List<T> readList(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
throw newConfigurationException(processorType, processorTag, propertyName, "required property is missing");
}
return readList(processorType, processorTag, propertyName, value);
@ -109,20 +110,20 @@ public final class ConfigurationUtils {
List<T> stringList = (List<T>) value;
return stringList;
} else {
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a list, but of type [" + value.getClass().getName() + "]");
throw newConfigurationException(processorType, processorTag, propertyName, "property isn't a list, but of type [" + value.getClass().getName() + "]");
}
}
/**
* Returns and removes the specified property of type map from the specified configuration map.
*
* If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown.
* If the property is missing an {@link ConfigurationPropertyException} is thrown
* If the property value isn't of type map an {@link ElasticsearchParseException} is thrown.
* If the property is missing an {@link ElasticsearchParseException} is thrown
*/
public static <T> Map<String, T> readMap(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
throw newConfigurationException(processorType, processorTag, propertyName, "required property is missing");
}
return readMap(processorType, processorTag, propertyName, value);
@ -131,7 +132,7 @@ public final class ConfigurationUtils {
/**
* Returns and removes the specified property of type map from the specified configuration map.
*
* If the property value isn't of type map an {@link ConfigurationPropertyException} is thrown.
* If the property value isn't of type map an {@link ElasticsearchParseException} is thrown.
*/
public static <T> Map<String, T> readOptionalMap(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
@ -148,7 +149,7 @@ public final class ConfigurationUtils {
Map<String, T> map = (Map<String, T>) value;
return map;
} else {
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "property isn't a map, but of type [" + value.getClass().getName() + "]");
throw newConfigurationException(processorType, processorTag, propertyName, "property isn't a map, but of type [" + value.getClass().getName() + "]");
}
}
@ -158,8 +159,23 @@ public final class ConfigurationUtils {
public static Object readObject(String processorType, String processorTag, Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new ConfigurationPropertyException(processorType, processorTag, propertyName, "required property is missing");
throw newConfigurationException(processorType, processorTag, propertyName, "required property is missing");
}
return value;
}
public static ElasticsearchParseException newConfigurationException(String processorType, String processorTag, String propertyName, String reason) {
ElasticsearchParseException exception = new ElasticsearchParseException("[" + propertyName + "] " + reason);
if (processorType != null) {
exception.addHeader("processor_type", processorType);
}
if (processorTag != null) {
exception.addHeader("processor_tag", processorTag);
}
if (propertyName != null) {
exception.addHeader("property_name", propertyName);
}
return exception;
}
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.ElasticsearchParseException;
import java.util.ArrayList;
import java.util.Arrays;
@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
*/
@ -84,20 +85,20 @@ public final class Pipeline {
public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws ConfigurationPropertyException {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = readProcessorConfigs(processorConfigs, processorRegistry);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
if (config.isEmpty() == false) {
throw new ConfigurationPropertyException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor);
}
private List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs, Map<String, Processor.Factory> processorRegistry) throws ConfigurationPropertyException {
private List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs, Map<String, Processor.Factory> processorRegistry) throws Exception {
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
@ -110,28 +111,22 @@ public final class Pipeline {
return processors;
}
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws ConfigurationPropertyException {
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.get(type);
if (factory != null) {
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor;
try {
processor = factory.create(config);
} catch (ConfigurationPropertyException e) {
throw e;
} catch (Exception e) {
throw new ConfigurationPropertyException(e.getMessage());
}
processor = factory.create(config);
if (!config.isEmpty()) {
throw new ConfigurationPropertyException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
throw new ElasticsearchParseException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.isEmpty()) {
return processor;
}
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
}
throw new ConfigurationPropertyException("No processor type exists with name [" + type + "]");
throw new ElasticsearchParseException("No processor type exists with name [" + type + "]");
}
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.io.IOException;
public class PipelineFactoryError implements Streamable, ToXContent {
private String reason;
private String processorType;
private String processorTag;
private String processorPropertyName;
public PipelineFactoryError() {
}
public PipelineFactoryError(ConfigurationPropertyException e) {
this.reason = e.getMessage();
this.processorType = e.getProcessorType();
this.processorTag = e.getProcessorTag();
this.processorPropertyName = e.getPropertyName();
}
public PipelineFactoryError(String reason) {
this.reason = "Constructing Pipeline failed:" + reason;
}
public String getReason() {
return reason;
}
public String getProcessorTag() {
return processorTag;
}
public String getProcessorPropertyName() {
return processorPropertyName;
}
public String getProcessorType() {
return processorType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
reason = in.readString();
processorType = in.readOptionalString();
processorTag = in.readOptionalString();
processorPropertyName = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(reason);
out.writeOptionalString(processorType);
out.writeOptionalString(processorTag);
out.writeOptionalString(processorPropertyName);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("error");
builder.field("type", processorType);
builder.field("tag", processorTag);
builder.field("reason", reason);
builder.field("property_name", processorPropertyName);
builder.endObject();
return builder;
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
public class PipelineFactoryResult {
private final Pipeline pipeline;
private final PipelineFactoryError error;
public PipelineFactoryResult(Pipeline pipeline) {
this.pipeline = pipeline;
this.error = null;
}
public PipelineFactoryResult(PipelineFactoryError error) {
this.error = error;
this.pipeline = null;
}
public Pipeline getPipeline() {
return pipeline;
}
public PipelineFactoryError getError() {
return error;
}
}

View File

@ -17,11 +17,8 @@
* under the License.
*/
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.util.Map;
/**

View File

@ -1,53 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
/**
* Exception class thrown by processor factories.
*/
public class ConfigurationPropertyException extends RuntimeException {
private String processorType;
private String processorTag;
private String propertyName;
public ConfigurationPropertyException(String processorType, String processorTag, String propertyName, String message) {
super("[" + propertyName + "] " + message);
this.processorTag = processorTag;
this.processorType = processorType;
this.propertyName = propertyName;
}
public ConfigurationPropertyException(String errorMessage) {
super(errorMessage);
}
public String getPropertyName() {
return propertyName;
}
public String getProcessorType() {
return processorType;
}
public String getProcessorTag() {
return processorTag;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
@ -29,6 +30,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.ingest.core.ConfigurationUtils.newConfigurationException;
/**
* Processor that converts fields content to a different type. Supported types are: integer, float, boolean and string.
* Throws exception if the field is not there or the conversion fails.
@ -80,11 +83,11 @@ public class ConvertProcessor extends AbstractProcessor {
public abstract Object convert(Object value);
public static Type fromString(String type) {
public static Type fromString(String processorTag, String propertyName, String type) {
try {
return Type.valueOf(type.toUpperCase(Locale.ROOT));
} catch(IllegalArgumentException e) {
throw new IllegalArgumentException("type [" + type + "] not supported, cannot convert field.", e);
throw newConfigurationException(TYPE, processorTag, propertyName, "type [" + type + "] not supported, cannot convert field.");
}
}
}
@ -138,7 +141,8 @@ public class ConvertProcessor extends AbstractProcessor {
@Override
public ConvertProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type"));
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");
Type convertType = Type.fromString(processorTag, "type", typeProperty);
return new ConvertProcessor(processorTag, field, convertType);
}
}

View File

@ -20,13 +20,9 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponseRestListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
@ -34,7 +30,6 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
public class RestPutPipelineAction extends BaseRestHandler {
@ -49,7 +44,7 @@ public class RestPutPipelineAction extends BaseRestHandler {
PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), RestActions.getRestContent(restRequest));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
client.admin().cluster().putPipeline(request, new WritePipelineResponseRestListener(channel));
client.admin().cluster().putPipeline(request, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -47,6 +47,6 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
SimulatePipelineRequest request = new SimulatePipelineRequest(RestActions.getRestContent(restRequest));
request.setId(restRequest.param("id"));
request.setVerbose(restRequest.paramAsBoolean("verbose", false));
client.admin().cluster().simulatePipeline(request, new RestStatusToXContentListener<>(channel));
client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel));
}
}

View File

@ -21,13 +21,11 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.core.PipelineFactoryError;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
public class WritePipelineResponseTests extends ESTestCase {
@ -45,17 +43,13 @@ public class WritePipelineResponseTests extends ESTestCase {
}
public void testSerializationWithError() throws IOException {
PipelineFactoryError error = new PipelineFactoryError("error");
WritePipelineResponse response = new WritePipelineResponse(error);
WritePipelineResponse response = new WritePipelineResponse();
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
WritePipelineResponse otherResponse = new WritePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(otherResponse.getError().getReason(), equalTo(response.getError().getReason()));
assertThat(otherResponse.getError().getProcessorType(), equalTo(response.getError().getProcessorType()));
assertThat(otherResponse.getError().getProcessorTag(), equalTo(response.getError().getProcessorTag()));
assertThat(otherResponse.getError().getProcessorPropertyName(), equalTo(response.getError().getProcessorPropertyName()));
assertThat(otherResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -38,11 +39,13 @@ import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -201,23 +204,7 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(getResponse.pipelines().size(), equalTo(0));
}
public void testPutWithPipelineError() throws Exception {
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("not_found")
.endObject()
.endObject()
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source);
WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get();
assertThat(response.isAcknowledged(), equalTo(false));
assertThat(response.getError().getReason(), equalTo("No processor type exists with name [not_found]"));
}
public void testPutWithProcessorFactoryError() throws Exception {
public void testPutWithPipelineFactoryError() throws Exception {
BytesReference source = jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
@ -229,9 +216,11 @@ public class IngestClientIT extends ESIntegTestCase {
.endArray()
.endObject().bytes();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source);
WritePipelineResponse response = client().admin().cluster().putPipeline(putPipelineRequest).get();
assertThat(response.isAcknowledged(), equalTo(false));
assertThat(response.getError().getReason(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
try {
client().admin().cluster().putPipeline(putPipelineRequest).get();
} catch (ExecutionException e) {
assertThat(e.getCause().getCause().getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
}
@Override

View File

@ -19,10 +19,10 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -103,42 +103,21 @@ public class PipelineStoreTests extends ESTestCase {
}
public void testPutWithErrorResponse() {
String id = "_id";
Pipeline pipeline = store.get(id);
assertThat(pipeline, nullValue());
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
}
public void testConstructPipelineResponseSuccess() {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("field", "foo");
processorConfig.put("value", "bar");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig)));
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response, nullValue());
}
public void testConstructPipelineResponseMissingProcessorsFieldException() {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response.getError().getProcessorType(), is(nullValue()));
assertThat(response.getError().getProcessorTag(), is(nullValue()));
assertThat(response.getError().getProcessorPropertyName(), equalTo("processors"));
assertThat(response.getError().getReason(), equalTo("[processors] required property is missing"));
}
public void testConstructPipelineResponseConfigurationException() {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("field", "foo");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("set", processorConfig)));
WritePipelineResponse response = store.validatePipelineResponse("test_id", pipelineConfig);
assertThat(response.getError().getProcessorTag(), nullValue());
assertThat(response.getError().getProcessorType(), equalTo("set"));
assertThat(response.getError().getProcessorPropertyName(), equalTo("value"));
assertThat(response.getError().getReason(), equalTo("[value] required property is missing"));
PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"));
clusterState = store.innerPut(putRequest, clusterState);
try {
store.innerUpdatePipelines(clusterState);
fail("should fail");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
}
pipeline = store.get(id);
assertThat(pipeline, nullValue());
}
public void testDelete() {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -58,7 +58,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
public void testReadStringPropertyInvalidType() {
try {
ConfigurationUtils.readStringProperty(null, null, config, "arr");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.ingest.core;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
@ -59,7 +59,7 @@ public class PipelineFactoryTests extends ESTestCase {
try {
factory.create("_id", pipelineConfig, Collections.emptyMap());
fail("should fail, missing required [processors] field");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
}
}
@ -91,7 +91,7 @@ public class PipelineFactoryTests extends ESTestCase {
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
}

View File

@ -19,9 +19,9 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -65,7 +65,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -76,7 +76,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}
@ -88,7 +88,7 @@ public class AppendProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -28,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class ConvertProcessorFactoryTests extends ESTestCase {
@ -54,8 +56,11 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), Matchers.equalTo("type [" + type + "] not supported, cannot convert field."));
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), Matchers.equalTo("[type] type [" + type + "] not supported, cannot convert field."));
assertThat(e.getHeader("processor_type").get(0), equalTo(ConvertProcessor.TYPE));
assertThat(e.getHeader("property_name").get(0), equalTo("type"));
assertThat(e.getHeader("processor_tag"), nullValue());
}
}
@ -67,7 +72,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
}
}
@ -79,7 +84,7 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), Matchers.equalTo("[type] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -64,7 +65,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("processor creation should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[match_field] required property is missing"));
}
}
@ -80,7 +81,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("processor creation should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[match_formats] required property is missing"));
}
}
@ -170,7 +171,7 @@ public class DateProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("processor creation should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("[match_formats] property isn't a list, but of type [java.lang.String]"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
@ -55,7 +56,7 @@ public class FailProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[message] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -53,7 +54,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -66,7 +67,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
}
}
@ -79,7 +80,7 @@ public class GsubProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[replacement] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -50,7 +51,7 @@ public class JoinProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -62,7 +63,7 @@ public class JoinProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[separator] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -47,7 +48,7 @@ public class LowercaseProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
@ -55,7 +56,7 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -50,7 +51,7 @@ public class RenameProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -62,7 +63,7 @@ public class RenameProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[to] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
@ -58,7 +59,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -69,7 +70,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}
@ -81,7 +82,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[value] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -50,7 +51,7 @@ public class SplitProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}
@ -62,7 +63,7 @@ public class SplitProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[separator] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -47,7 +48,7 @@ public class TrimProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
@ -47,7 +48,7 @@ public class UppercaseProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("factory create should have failed");
} catch(ConfigurationPropertyException e) {
} catch(ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

View File

@ -19,9 +19,8 @@
package org.elasticsearch.ingest.grok;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
@ -54,7 +53,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("should fail");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
@ -67,7 +66,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("should fail");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[pattern] required property is missing"));
}

View File

@ -29,14 +29,13 @@ import com.maxmind.geoip2.record.Country;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Subdivision;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import java.io.Closeable;
import java.io.IOException;
@ -52,6 +51,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.ingest.core.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty;
@ -94,7 +94,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
}
break;
default:
throw new IllegalStateException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType() + "]");
throw new ElasticsearchParseException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType() + "]", new IllegalStateException());
}
ingestDocument.setFieldValue(targetField, geoData);
}
@ -240,7 +240,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
try {
fields.add(Field.parse(fieldName));
} catch (Exception e) {
throw new ConfigurationPropertyException(TYPE, processorTag, "fields", "illegal field option [" + fieldName + "]. valid values are [" + Arrays.toString(Field.values()) +"]");
throw newConfigurationException(TYPE, processorTag, "fields", "illegal field option [" + fieldName + "]. valid values are [" + Arrays.toString(Field.values()) + "]");
}
}
} else {
@ -249,7 +249,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
DatabaseReader databaseReader = databaseReaders.get(databaseFile);
if (databaseReader == null) {
throw new ConfigurationPropertyException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist");
throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist");
}
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, fields);
}

View File

@ -20,9 +20,8 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.ConfigurationPropertyException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils;
import org.junit.AfterClass;
@ -113,7 +112,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("Exception expected");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
}
}
@ -146,7 +145,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("exception expected");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[fields] illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]"));
}
@ -156,7 +155,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
try {
factory.create(config);
fail("exception expected");
} catch (ConfigurationPropertyException e) {
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[fields] property isn't a list, but of type [java.lang.String]"));
}
}

View File

@ -53,6 +53,7 @@
---
"Test invalid processor config":
- do:
catch: request
ingest.put_pipeline:
id: "my_pipeline"
body: >
@ -66,12 +67,11 @@
}
]
}
- match: { "acknowledged": false }
- length: { "error": 4 }
- match: { "error.reason": "[field] required property is missing" }
- match: { "error.property_name": "field" }
- match: { "error.type": "set" }
- match: { "error.tag": "fritag" }
- match: { error.root_cause.0.type: "parse_exception" }
- match: { error.root_cause.0.reason: "[field] required property is missing" }
- match: { error.root_cause.0.header.processor_tag: "fritag" }
- match: { error.root_cause.0.header.processor_type: "set" }
- match: { error.root_cause.0.header.property_name: "field" }
---
"Test basic pipeline with on_failure in processor":

View File

@ -98,11 +98,11 @@
}
]
}
- length: { error: 4 }
- match: { error.tag: "fails" }
- match: { error.type: "set" }
- match: { error.reason: "[field] required property is missing" }
- match: { error.property_name: "field" }
- match: { error.root_cause.0.type: "parse_exception" }
- match: { error.root_cause.0.reason: "[field] required property is missing" }
- match: { error.root_cause.0.header.processor_tag: "fails" }
- match: { error.root_cause.0.header.processor_type: "set" }
- match: { error.root_cause.0.header.property_name: "field" }
---
"Test simulate without index type and id":
@ -191,10 +191,9 @@
}
]
}
- length: { error: 4 }
- is_false: error.processor_type
- is_false: error.processor_tag
- match: { error.property_name: "pipeline" }
- is_false: error.root_cause.0.header.processor_type
- is_false: error.root_cause.0.header.processor_tag
- match: { error.root_cause.0.header.property_name: "pipeline" }
- match: { error.reason: "[pipeline] required property is missing" }
---
@ -225,11 +224,11 @@
}
]
}
- length: { error: 4 }
- match: { error.type: "set" }
- is_false: error.tag
- match: { error.reason: "[value] required property is missing" }
- match: { error.property_name: "value" }
- match: { error.root_cause.0.type: "parse_exception" }
- match: { error.root_cause.0.reason: "[value] required property is missing" }
- match: { error.root_cause.0.header.processor_type: "set" }
- match: { error.root_cause.0.header.property_name: "value" }
- is_false: error.root_cause.0.header.processor_tag
---
"Test simulate with verbose flag":