diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java index 407c1b894f1..dd1bfde7923 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java @@ -44,6 +44,10 @@ public final class Data { this.document = document; } + public Data(Data other) { + this(other.index, other.type, other.id, new HashMap<>(other.document)); + } + @SuppressWarnings("unchecked") public T getProperty(String path) { // TODO: we should not rely on any core class, so we should have custom map extract value logic: diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java index a719c4ec727..844a6889e74 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -70,6 +70,21 @@ public final class Pipeline { return processors; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Pipeline pipeline = (Pipeline) o; + return Objects.equals(id, pipeline.id) && + Objects.equals(description, pipeline.description) && + Objects.equals(processors, pipeline.processors); + } + + @Override + public int hashCode() { + return Objects.hash(id, description, processors); + } + public final static class Factory { public Pipeline create(String id, Map config, Map processorRegistry) throws IOException { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java index e3f77b4a141..3a8dbbf448d 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java @@ -77,39 +77,41 @@ public final class ConfigurationUtils { * Returns and removes the specified property of type list from the specified configuration map. * * If the property value isn't of type list an {@link IllegalArgumentException} is thrown. - * If the property is missing an {@link IllegalArgumentException} is thrown */ - public static List readStringList(Map configuration, String propertyName) { + public static List readOptionalList(Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { - throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); + return null; } - return readStringList(propertyName, value); + return readList(propertyName, value); } /** * Returns and removes the specified property of type list from the specified configuration map. * * If the property value isn't of type list an {@link IllegalArgumentException} is thrown. + * If the property is missing an {@link IllegalArgumentException} is thrown */ - public static List readOptionalStringList(Map configuration, String propertyName) { + public static List readList(Map configuration, String propertyName) { Object value = configuration.remove(propertyName); if (value == null) { - return null; + throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); } - return readStringList(propertyName, value); + + return readList(propertyName, value); } - private static List readStringList(String propertyName, Object value) { + private static List readList(String propertyName, Object value) { if (value instanceof List) { @SuppressWarnings("unchecked") - List stringList = (List) value; + List stringList = (List) value; return stringList; } else { throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); } } + /** * Returns and removes the specified property of type map from the specified configuration map. * diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java index a023b78ade4..46a9d43e280 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java @@ -38,6 +38,11 @@ public interface Processor { */ void execute(Data data); + /** + * Gets the type of a processor + */ + String getType(); + /** * A factory that knows how to construct a processor based on a map of maps. */ @@ -54,6 +59,7 @@ public interface Processor { default void setConfigDirectory(Path configDirectory) { } + @Override default void close() throws IOException { } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java index 4c61cc8ee7a..d19433b10ae 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java @@ -78,6 +78,11 @@ public final class DateProcessor implements Processor { data.addField(targetField, ISODateTimeFormat.dateTime().print(dateTime)); } + @Override + public String getType() { + return TYPE; + } + DateTimeZone getTimezone() { return timezone; } @@ -108,7 +113,7 @@ public final class DateProcessor implements Processor { DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString); String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale"); Locale locale = localeString == null ? Locale.ENGLISH : Locale.forLanguageTag(localeString); - List matchFormats = ConfigurationUtils.readStringList(config, "match_formats"); + List matchFormats = ConfigurationUtils.readList(config, "match_formats"); return new DateProcessor(timezone, locale, matchField, matchFormats, targetField); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java index ae63efd05cb..a50bde0e6cb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java @@ -40,7 +40,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.*; -import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringList; +import static org.elasticsearch.ingest.processor.ConfigurationUtils.readList; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty; public final class GeoIpProcessor implements Processor { @@ -91,6 +91,11 @@ public final class GeoIpProcessor implements Processor { data.addField(targetField, geoData); } + @Override + public String getType() { + return TYPE; + } + String getSourceField() { return sourceField; } @@ -222,7 +227,7 @@ public final class GeoIpProcessor implements Processor { final Set fields; if (config.containsKey("fields")) { fields = EnumSet.noneOf(Field.class); - List fieldNames = readStringList(config, "fields"); + List fieldNames = readList(config, "fields"); for (String fieldName : fieldNames) { try { fields.add(Field.parse(fieldName)); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java index bdba25c7c78..c0688c42220 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java @@ -56,6 +56,11 @@ public final class GrokProcessor implements Processor { } } + @Override + public String getType() { + return TYPE; + } + String getMatchField() { return matchField; } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java index d09454b1016..4a950bea083 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java @@ -134,6 +134,11 @@ public final class MutateProcessor implements Processor { } } + @Override + public String getType() { + return TYPE; + } + private void doUpdate(Data data) { for(Map.Entry entry : update.entrySet()) { data.addField(entry.getKey(), entry.getValue()); @@ -272,6 +277,28 @@ public final class MutateProcessor implements Processor { } } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MutateProcessor that = (MutateProcessor) o; + return Objects.equals(update, that.update) && + Objects.equals(rename, that.rename) && + Objects.equals(convert, that.convert) && + Objects.equals(split, that.split) && + Objects.equals(gsub, that.gsub) && + Objects.equals(join, that.join) && + Objects.equals(remove, that.remove) && + Objects.equals(trim, that.trim) && + Objects.equals(uppercase, that.uppercase) && + Objects.equals(lowercase, that.lowercase); + } + + @Override + public int hashCode() { + return Objects.hash(update, rename, convert, split, gsub, join, remove, trim, uppercase, lowercase); + } + public static final class Factory implements Processor.Factory { @Override public MutateProcessor create(Map config) throws IOException { @@ -281,10 +308,10 @@ public final class MutateProcessor implements Processor { Map split = ConfigurationUtils.readOptionalMap(config, "split"); Map> gsubConfig = ConfigurationUtils.readOptionalMap(config, "gsub"); Map join = ConfigurationUtils.readOptionalMap(config, "join"); - List remove = ConfigurationUtils.readOptionalStringList(config, "remove"); - List trim = ConfigurationUtils.readOptionalStringList(config, "trim"); - List uppercase = ConfigurationUtils.readOptionalStringList(config, "uppercase"); - List lowercase = ConfigurationUtils.readOptionalStringList(config, "lowercase"); + List remove = ConfigurationUtils.readOptionalList(config, "remove"); + List trim = ConfigurationUtils.readOptionalList(config, "trim"); + List uppercase = ConfigurationUtils.readOptionalList(config, "uppercase"); + List lowercase = ConfigurationUtils.readOptionalList(config, "lowercase"); // pre-compile regex patterns List gsubExpressions = null; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index 94dfb4fb690..e9f7cdd1f88 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -27,6 +27,7 @@ import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor; import org.elasticsearch.ingest.processor.grok.GrokProcessor; import org.elasticsearch.ingest.processor.mutate.MutateProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; +import org.elasticsearch.plugin.ingest.simulate.SimulateExecutionService; import java.util.HashMap; import java.util.Map; @@ -41,6 +42,7 @@ public class IngestModule extends AbstractModule { binder().bind(PipelineExecutionService.class).asEagerSingleton(); binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(PipelineStoreClient.class).asEagerSingleton(); + binder().bind(SimulateExecutionService.class).asEagerSingleton(); addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory()); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index 3b2f71aa142..18d656813ec 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -25,11 +25,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.ingest.Data; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Map; - public class PipelineExecutionService { static final String THREAD_POOL_NAME = IngestPlugin.NAME; @@ -43,25 +40,13 @@ public class PipelineExecutionService { this.threadPool = threadPool; } - public Pipeline getPipeline(String pipelineId) { - Pipeline pipeline = store.get(pipelineId); - - if (pipeline == null) { - throw new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId)); - } - - return pipeline; - } - public void execute(Data data, String pipelineId, Listener listener) { - try { - execute(data, getPipeline(pipelineId), listener); - } catch (IllegalArgumentException e) { - listener.failed(e); + Pipeline pipeline = store.get(pipelineId); + if (pipeline == null) { + listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId))); + return; } - } - public void execute(Data data, Pipeline pipeline, Listener listener) { threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() { @Override public void run() { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java index 983f43d6a1a..c110da28b99 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestStatusToXContentListener; public class RestSimulatePipelineAction extends BaseRestHandler { @@ -37,15 +38,20 @@ public class RestSimulatePipelineAction extends BaseRestHandler { super(settings, controller, client); controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this); controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this); + // controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this); + // controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this); } @Override protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { SimulatePipelineRequest request = new SimulatePipelineRequest(); request.id(restRequest.param("id")); - if (restRequest.hasContent()) { - request.source(restRequest.content()); + request.verbose(restRequest.paramAsBoolean("verbose", false)); + + if (RestActions.hasBodyContent(restRequest)) { + request.source(RestActions.getRestContent(restRequest)); } + client.execute(SimulatePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/ParsedSimulateRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/ParsedSimulateRequest.java new file mode 100644 index 00000000000..d9f2dc4dc0c --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/ParsedSimulateRequest.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugin.ingest.simulate; + +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.processor.ConfigurationUtils; +import org.elasticsearch.plugin.ingest.PipelineStore; + +import java.io.IOException; +import java.util.*; + +public class ParsedSimulateRequest { + private final List documents; + private final Pipeline pipeline; + private final boolean verbose; + + ParsedSimulateRequest(Pipeline pipeline, List documents, boolean verbose) { + this.pipeline = pipeline; + this.documents = Collections.unmodifiableList(documents); + this.verbose = verbose; + } + + public Pipeline getPipeline() { + return pipeline; + } + + public List getDocuments() { + return documents; + } + + public boolean isVerbose() { + return verbose; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ParsedSimulateRequest that = (ParsedSimulateRequest) o; + return Objects.equals(verbose, that.verbose) && + Objects.equals(documents, that.documents) && + Objects.equals(pipeline, that.pipeline); + } + + @Override + public int hashCode() { + return Objects.hash(documents, pipeline, verbose); + } + + public static class Parser { + private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory(); + public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline"; + + public ParsedSimulateRequest parse(String pipelineId, Map config, boolean verbose, PipelineStore pipelineStore) throws IOException { + Pipeline pipeline; + // if pipeline `id` passed to request, fetch pipeline from store. + if (pipelineId != null) { + pipeline = pipelineStore.get(pipelineId); + } else { + Map pipelineConfig = ConfigurationUtils.readOptionalMap(config, "pipeline"); + pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); + } + + List> docs = ConfigurationUtils.readList(config, "docs"); + + List dataList = new ArrayList<>(); + + for (int i = 0; i < docs.size(); i++) { + Map dataMap = docs.get(i); + Map document = ConfigurationUtils.readOptionalMap(dataMap, "_source"); + if (document == null) { + document = Collections.emptyMap(); + } + Data data = new Data(ConfigurationUtils.readOptionalStringProperty(dataMap, "_index"), + ConfigurationUtils.readOptionalStringProperty(dataMap, "_type"), + ConfigurationUtils.readOptionalStringProperty(dataMap, "_id"), + document); + dataList.add(data); + } + + return new ParsedSimulateRequest(pipeline, dataList, verbose); + } + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatedItemResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/ProcessedData.java similarity index 75% rename from plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatedItemResponse.java rename to plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/ProcessedData.java index 5377b8e0ea0..238a1e79d40 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatedItemResponse.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/ProcessedData.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.plugin.ingest.transport.simulate; +package org.elasticsearch.plugin.ingest.simulate; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,24 +31,26 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; -public class SimulatedItemResponse implements Streamable, StatusToXContent { +public class ProcessedData implements Streamable, StatusToXContent { + private String processorId; private Data data; private Throwable failure; - public SimulatedItemResponse() { + public ProcessedData() { } - public SimulatedItemResponse(Data data) { + public ProcessedData(String processorId, Data data) { + this.processorId = processorId; this.data = data; } - public SimulatedItemResponse(Throwable failure) { + public ProcessedData(Throwable failure) { this.failure = failure; } - public boolean failed() { + public boolean isFailed() { return this.failure != null; } @@ -56,14 +58,18 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent { return data; } + public String getProcessorId() { + return processorId; + } + @Override public void readFrom(StreamInput in) throws IOException { - boolean failed = in.readBoolean(); - - if (failed) { + boolean isFailure = in.readBoolean(); + if (isFailure) { this.failure = in.readThrowable(); // TODO(talevy): check out mget for throwable limitations } else { + this.processorId = in.readString(); String index = in.readString(); String type = in.readString(); String id = in.readString(); @@ -74,11 +80,11 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(failed()); - - if (failed()) { + out.writeBoolean(isFailed()); + if (isFailed()) { out.writeThrowable(failure); } else { + out.writeString(processorId); out.writeString(data.getIndex()); out.writeString(data.getType()); out.writeString(data.getId()); @@ -89,8 +95,9 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(Fields.ERROR, failed()); - if (failed()) { + builder.field(Fields.PROCESSOR_ID, processorId); + builder.field(Fields.ERROR, isFailed()); + if (isFailed()) { builder.field(Fields.FAILURE, failure.toString()); } else { builder.field(Fields.MODIFIED, data.isModified()); @@ -102,7 +109,7 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent { @Override public RestStatus status() { - if (failed()) { + if (isFailed()) { return RestStatus.BAD_REQUEST; } else { return RestStatus.OK; @@ -115,17 +122,18 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent { if (obj == null || getClass() != obj.getClass()) { return false; } - SimulatedItemResponse other = (SimulatedItemResponse) obj; - return Objects.equals(data, other.data) && Objects.equals(failure, other.failure); + ProcessedData other = (ProcessedData) obj; + return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure); } @Override public int hashCode() { - return Objects.hash(data, failure); + return Objects.hash(processorId, data, failure); } static final class Fields { static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc"); + static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id"); static final XContentBuilderString ERROR = new XContentBuilderString("error"); static final XContentBuilderString FAILURE = new XContentBuilderString("failure"); static final XContentBuilderString MODIFIED = new XContentBuilderString("modified"); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/SimulateExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/SimulateExecutionService.java new file mode 100644 index 00000000000..d7949532c0b --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/SimulateExecutionService.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.simulate; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; + +public class SimulateExecutionService { + + static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT; + + private final ThreadPool threadPool; + + @Inject + public SimulateExecutionService(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + + SimulatedItemResponse executeItem(Pipeline pipeline, Data data, boolean verbose) { + try { + if (verbose) { + return executeVerboseItem(pipeline, data); + } else { + pipeline.execute(data); + return new SimulatedItemResponse(data); + } + } catch (Exception e) { + return new SimulatedItemResponse(e); + } + + } + + SimulatedItemResponse executeVerboseItem(Pipeline pipeline, Data data) { + List processedDataList = new ArrayList<>(); + Data currentData = new Data(data); + for (int i = 0; i < pipeline.getProcessors().size(); i++) { + Processor processor = pipeline.getProcessors().get(i); + String processorId = "processor[" + processor.getType() + "]-" + i; + + processor.execute(currentData); + processedDataList.add(new ProcessedData(processorId, currentData)); + + currentData = new Data(currentData); + } + return new SimulatedItemResponse(processedDataList); + } + + SimulatePipelineResponse execute(ParsedSimulateRequest request) { + List responses = new ArrayList<>(); + for (Data data : request.getDocuments()) { + responses.add(executeItem(request.getPipeline(), data, request.isVerbose())); + } + return new SimulatePipelineResponse(request.getPipeline().getId(), responses); + } + + public void execute(ParsedSimulateRequest request, Listener listener) { + threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() { + @Override + public void run() { + SimulatePipelineResponse response = execute(request); + listener.onResponse(response); + } + }); + } + + public interface Listener { + void onResponse(SimulatePipelineResponse response); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/SimulatedItemResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/SimulatedItemResponse.java new file mode 100644 index 00000000000..0fff29ed0c4 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/simulate/SimulatedItemResponse.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugin.ingest.simulate; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class SimulatedItemResponse implements Streamable, StatusToXContent { + + private Data data; + private List processedDataList; + private Throwable failure; + + public SimulatedItemResponse() { + + } + + public SimulatedItemResponse(Data data) { + this.data = data; + } + + public SimulatedItemResponse(List processedDataList) { + this.processedDataList = processedDataList; + } + + public SimulatedItemResponse(Throwable failure) { + this.failure = failure; + } + + public boolean isFailed() { + return this.failure != null; + } + + public boolean isVerbose() { + return this.processedDataList != null; + } + + public Data getData() { + return data; + } + + public List getProcessedDataList() { + return processedDataList; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + boolean isFailed = in.readBoolean(); + boolean isVerbose = in.readBoolean(); + if (isFailed) { + this.failure = in.readThrowable(); + // TODO(talevy): check out mget for throwable limitations + } else if (isVerbose) { + int size = in.readVInt(); + processedDataList = new ArrayList<>(); + for (int i = 0; i < size; i++) { + ProcessedData processedData = new ProcessedData(); + processedData.readFrom(in); + processedDataList.add(processedData); + } + } else { + String index = in.readString(); + String type = in.readString(); + String id = in.readString(); + Map doc = in.readMap(); + this.data = new Data(index, type, id, doc); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(isFailed()); + out.writeBoolean(isVerbose()); + + if (isFailed()) { + out.writeThrowable(failure); + } else if (isVerbose()) { + out.writeVInt(processedDataList.size()); + for (ProcessedData p : processedDataList) { + p.writeTo(out); + } + } else { + out.writeString(data.getIndex()); + out.writeString(data.getType()); + out.writeString(data.getId()); + out.writeMap(data.getDocument()); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Fields.ERROR, isFailed()); + builder.field(Fields.VERBOSE, isVerbose()); + if (isFailed()) { + builder.field(Fields.FAILURE, failure.toString()); + } else if (isVerbose()) { + builder.startArray(Fields.PROCESSOR_STEPS); + for (ProcessedData processedData : processedDataList) { + builder.value(processedData); + } + builder.endArray(); + } else { + builder.field(Fields.MODIFIED, data.isModified()); + builder.field(Fields.DOCUMENT, data.getDocument()); + } + builder.endObject(); + return builder; + } + + @Override + public RestStatus status() { + if (isFailed()) { + return RestStatus.BAD_REQUEST; + } else { + return RestStatus.OK; + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { return true; } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + SimulatedItemResponse other = (SimulatedItemResponse) obj; + return Objects.equals(data, other.data) && Objects.equals(processedDataList, other.processedDataList) && Objects.equals(failure, other.failure); + } + + @Override + public int hashCode() { + return Objects.hash(data, processedDataList, failure); + } + + static final class Fields { + static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc"); + static final XContentBuilderString ERROR = new XContentBuilderString("error"); + static final XContentBuilderString VERBOSE = new XContentBuilderString("verbose"); + static final XContentBuilderString FAILURE = new XContentBuilderString("failure"); + static final XContentBuilderString MODIFIED = new XContentBuilderString("modified"); + static final XContentBuilderString PROCESSOR_STEPS = new XContentBuilderString("processor_steps"); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java index 2bf7c01bf24..8c5a5f1ef71 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java @@ -32,6 +32,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; public class SimulatePipelineRequest extends ActionRequest { private String id; + private boolean verbose; private BytesReference source; @Override @@ -51,6 +52,14 @@ public class SimulatePipelineRequest extends ActionRequest { this.id = id; } + public boolean verbose() { + return verbose; + } + + public void verbose(boolean verbose) { + this.verbose = verbose; + } + public BytesReference source() { return source; } @@ -63,6 +72,7 @@ public class SimulatePipelineRequest extends ActionRequest { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readString(); + verbose = in.readBoolean(); source = in.readBytesReference(); } @@ -70,6 +80,7 @@ public class SimulatePipelineRequest extends ActionRequest { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); + out.writeBoolean(verbose); out.writeBytesReference(source); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java index 8f446b75238..7b140b345f4 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java @@ -34,6 +34,11 @@ public class SimulatePipelineRequestBuilder extends ActionRequestBuilder documents; - private final Pipeline pipeline; - - public SimulatePipelineRequestPayload(Pipeline pipeline, List documents) { - this.pipeline = pipeline; - this.documents = Collections.unmodifiableList(documents); - } - - public String pipelineId() { - return pipeline.getId(); - } - - public Pipeline pipeline() { - return pipeline; - } - - - public List documents() { - return documents; - } - - public SimulatePipelineResponse execute() { - List responses = new ArrayList<>(); - for (Data data : documents) { - try { - pipeline.execute(data); - responses.add(new SimulatedItemResponse(data)); - } catch (Exception e) { - responses.add(new SimulatedItemResponse(e)); - } - } - return new SimulatePipelineResponse(pipeline.getId(), responses); - } - - public static class Factory { - - public SimulatePipelineRequestPayload create(String pipelineId, Map config, PipelineStore pipelineStore) throws IOException { - Pipeline pipeline; - // if pipeline `id` passed to request, fetch pipeline from store. - if (pipelineId != null) { - pipeline = pipelineStore.get(pipelineId); - } else { - Map pipelineConfig = (Map) config.get("pipeline"); - pipeline = (new Pipeline.Factory()).create("_pipeline_id", pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); - } - - // distribute docs by shard key to SimulateShardPipelineResponse - List> docs = (List>) config.get("docs"); - - List dataList = new ArrayList<>(); - - for (int i = 0; i < docs.size(); i++) { - Map dataMap = docs.get(i); - Map document = (Map) dataMap.get("_source"); - Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, "_index", null), - ConfigurationUtils.readStringProperty(dataMap, "_type", null), - ConfigurationUtils.readStringProperty(dataMap, "_id", null), - document); - dataList.add(data); - } - - return new SimulatePipelineRequestPayload(pipeline, dataList); - } - } -} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java index aeed148d74a..6659635f7b0 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponse.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.StatusToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.plugin.ingest.simulate.SimulatedItemResponse; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -100,7 +101,7 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo @Override public RestStatus status() { for (SimulatedItemResponse response : responses) { - if (response.failed()) { + if (response.isFailed()) { return RestStatus.BAD_REQUEST; } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java index c4f3484ca82..f43d0ec03d0 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java @@ -26,7 +26,10 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.plugin.ingest.simulate.ParsedSimulateRequest; import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.simulate.SimulateExecutionService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -34,32 +37,33 @@ import java.io.IOException; import java.util.Map; public class SimulatePipelineTransportAction extends HandledTransportAction { - private final PipelineStore pipelineStore; + private final SimulateExecutionService executionService; @Inject - public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) { + public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore, SimulateExecutionService executionService) { super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); this.pipelineStore = pipelineStore; + this.executionService = executionService; } @Override protected void doExecute(SimulatePipelineRequest request, ActionListener listener) { Map source = XContentHelper.convertToMap(request.source(), false).v2(); - SimulatePipelineRequestPayload payload; - SimulatePipelineRequestPayload.Factory factory = new SimulatePipelineRequestPayload.Factory(); + ParsedSimulateRequest payload; + ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser(); try { - payload = factory.create(request.id(), source, pipelineStore); + payload = parser.parse(request.id(), source, request.verbose(), pipelineStore); } catch (IOException e) { listener.onFailure(e); return; } - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { + executionService.execute(payload, new SimulateExecutionService.Listener() { @Override - public void run() { - listener.onResponse(payload.execute()); + public void onResponse(SimulatePipelineResponse response) { + listener.onResponse(response); } }); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index d0d11ae60a0..61fcad9a3c3 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -33,7 +33,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse; -import org.elasticsearch.plugin.ingest.transport.simulate.SimulatedItemResponse; +import org.elasticsearch.plugin.ingest.simulate.SimulatedItemResponse; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java index 274a952f935..38c4e2493a2 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java @@ -59,7 +59,7 @@ public class ConfigurationUtilsTests extends ESTestCase { // TODO(talevy): Issue with generics. This test should fail, "int" is of type List public void testOptional_InvalidType() { - List val = ConfigurationUtils.readStringList(config, "int"); + List val = ConfigurationUtils.readList(config, "int"); assertThat(val, equalTo(Arrays.asList(2))); } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/simulate/ParsedSimulateRequestParserTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/simulate/ParsedSimulateRequestParserTests.java new file mode 100644 index 00000000000..01f3beee391 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/simulate/ParsedSimulateRequestParserTests.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.simulate; + +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.processor.mutate.MutateProcessor; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.util.*; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ParsedSimulateRequestParserTests extends ESTestCase { + private static final ParsedSimulateRequest.Parser PARSER = new ParsedSimulateRequest.Parser(); + + private Map processorRegistry; + private PipelineStore store; + private Processor processor; + private Pipeline pipeline; + private Data data; + + @Before + public void init() throws IOException { + List uppercase = Collections.unmodifiableList(Collections.singletonList("foo")); + processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null); + pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.unmodifiableList(Arrays.asList(processor))); + data = new Data("_index", "_type", "_id", Collections.emptyMap()); + processorRegistry = new HashMap<>(); + processorRegistry.put("mutate", new MutateProcessor.Factory()); + store = mock(PipelineStore.class); + when(store.get("_id")).thenReturn(pipeline); + when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry); + } + + public void testParse_UsingPipelineStore() throws Exception { + ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); + + Map raw = new HashMap<>(); + List> docs = new ArrayList<>(); + Map doc = new HashMap<>(); + doc.put("_index", "_index"); + doc.put("_type", "_type"); + doc.put("_id", "_id"); + docs.add(doc); + raw.put("docs", docs); + + ParsedSimulateRequest actualRequest = PARSER.parse("_id", raw, false, store); + assertThat(actualRequest, equalTo(expectedRequest)); + } + + public void testParse_ProvidedPipeline() throws Exception { + ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); + + Map raw = new HashMap<>(); + List> docs = new ArrayList<>(); + Map doc = new HashMap<>(); + doc.put("_index", "_index"); + doc.put("_type", "_type"); + doc.put("_id", "_id"); + docs.add(doc); + + Map processorConfig = new HashMap<>(); + processorConfig.put("uppercase", Arrays.asList("foo")); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig))); + + raw.put("docs", docs); + raw.put("pipeline", pipelineConfig); + + ParsedSimulateRequest actualRequest = PARSER.parse(null, raw, false, store); + assertThat(actualRequest, equalTo(expectedRequest)); + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/simulate/SimulateExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/simulate/SimulateExecutionServiceTests.java new file mode 100644 index 00000000000..f622db68d95 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/simulate/SimulateExecutionServiceTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.simulate; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.*; + +public class SimulateExecutionServiceTests extends ESTestCase { + + private PipelineStore store; + private ThreadPool threadPool; + private SimulateExecutionService executionService; + private Pipeline pipeline; + private Processor processor; + private Data data; + + @Before + public void setup() { + store = mock(PipelineStore.class); + threadPool = new ThreadPool( + Settings.builder() + .put("name", "_name") + .build() + ); + executionService = new SimulateExecutionService(threadPool); + processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor)); + data = new Data("_index", "_type", "_id", Collections.emptyMap()); + } + + @After + public void destroy() { + threadPool.shutdown(); + } + + public void testExecuteVerboseItem() throws Exception { + SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse( + Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data))); + SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data); + verify(processor, times(2)).execute(data); + assertThat(actualItemResponse, equalTo(expectedItemResponse)); + } + + public void testExecuteItem_verboseSuccessful() throws Exception { + SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse( + Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data))); + SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, true); + verify(processor, times(2)).execute(data); + assertThat(actualItemResponse, equalTo(expectedItemResponse)); + } + + public void testExecuteItem_Simple() throws Exception { + SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(data); + SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, false); + verify(processor, times(2)).execute(data); + assertThat(actualItemResponse, equalTo(expectedItemResponse)); + } + + public void testExecuteItem_Failure() throws Exception { + Exception e = new RuntimeException("processor failed"); + SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(e); + doThrow(e).when(processor).execute(data); + SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, false); + verify(processor, times(1)).execute(data); + assertThat(actualItemResponse, equalTo(expectedItemResponse)); + } + + public void testExecute() throws Exception { + SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class); + SimulatedItemResponse itemResponse = new SimulatedItemResponse(data); + ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); + executionService.execute(request, listener); + SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse)); + assertBusy(new Runnable() { + @Override + public void run() { + verify(processor, times(2)).execute(data); + verify(listener).onResponse(response); + } + }); + } + + public void testExecute_Verbose() throws Exception { + SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class); + ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true); + SimulatedItemResponse itemResponse = new SimulatedItemResponse( + Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data))); + executionService.execute(request, listener); + SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse)); + assertBusy(new Runnable() { + @Override + public void run() { + verify(processor, times(2)).execute(data); + verify(listener).onResponse(response); + } + }); + } +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json index bf08435eb8e..bef89fed54d 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json +++ b/plugins/ingest/src/test/resources/rest-api-spec/api/ingest.simulate.json @@ -13,6 +13,11 @@ } }, "params": { + "verbose": { + "type" : "boolean", + "description" : "Verbose mode. Display data output for each processor in executed pipeline", + "default" : false + } } }, "body": { diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml index eb68a439bcb..7c8764472e0 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml @@ -12,11 +12,10 @@ "description": "_description", "processors": [ { - "simple" : { - "path" : "field1", - "expected_value" : "_value", - "add_field" : "field2", - "add_field_value" : "_value" + "mutate" : { + "update" : { + "field2" : "_value" + } } } ] @@ -48,4 +47,144 @@ ] } - length: { docs: 1 } + - is_false: docs.0.error + - is_true: docs.0.modified + - match: { docs.0.foo: "bar" } + - match: { docs.0.field2: "_value" } + +--- +"Test simulate with provided pipeline definition": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.simulate: + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "mutate" : { + "update" : { + "field2" : "_value" + } + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { docs: 1 } + +--- +"Test simulate with verbose flag": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "mutate" : { + "update" : { + "field2" : "_value" + } + } + }, + { + "mutate" : { + "update" : { + "field3" : "third_val" + } + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { docs: 1 } + - length: { docs.0.processor_steps: 2 } + - match: { docs.0.processor_steps.0.processor_id: "processor[mutate]-0" } + - is_false: docs.0.processor_steps.0.error + - is_true: docs.0.processor_steps.0.modified + - length: { docs.0.processor_steps.0.doc: 2 } + - match: { docs.0.processor_steps.0.doc.foo: "bar" } + - match: { docs.0.processor_steps.0.doc.field2: "_value" } + - length: { docs.0.processor_steps.1.doc: 3 } + - match: { docs.0.processor_steps.1.doc.foo: "bar" } + - match: { docs.0.processor_steps.1.doc.field2: "_value" } + - match: { docs.0.processor_steps.1.doc.field3: "third_val" } + +--- +"Test simulate with exception thrown": + - do: + cluster.health: + wait_for_status: green + + - do: + catch: request + ingest.simulate: + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "mutate" : { + "uppercase" : ["foo"] + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "not_foo": "bar" + } + }, + { + "_index": "index", + "_type": "type", + "_id": "id2", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { docs: 2 } + - is_true: docs.0.error + - match: { docs.0.failure: "java.lang.NullPointerException" } + - is_false: docs.1.error + - is_true: docs.1.modified + - match: { docs.1.doc.foo: "BAR" }