diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java index 3a8dbbf448d..49fd90e7afe 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java @@ -111,6 +111,20 @@ public final class ConfigurationUtils { } } + /** + * Returns and removes the specified property of type map from the specified configuration map. + * + * If the property value isn't of type map an {@link IllegalArgumentException} is thrown. + * If the property is missing an {@link IllegalArgumentException} is thrown + */ + public static Map readMap(Map configuration, String propertyName) { + Object value = configuration.remove(propertyName); + if (value == null) { + throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); + } + + return readMap(propertyName, value); + } /** * Returns and removes the specified property of type map from the specified configuration map. @@ -122,6 +136,11 @@ public final class ConfigurationUtils { if (value == null) { return null; } + + return readMap(propertyName, value); + } + + private static Map readMap(String propertyName, Object value) { if (value instanceof Map) { @SuppressWarnings("unchecked") Map map = (Map) value; @@ -130,4 +149,5 @@ public final class ConfigurationUtils { throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); } } + } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java index ed543b7a1eb..0b86e35b522 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/RestSimulatePipelineAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestActions; -import org.elasticsearch.rest.action.support.RestStatusToXContentListener; +import org.elasticsearch.rest.action.support.RestToXContentListener; public class RestSimulatePipelineAction extends BaseRestHandler { @@ -45,13 +45,13 @@ public class RestSimulatePipelineAction extends BaseRestHandler { @Override protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { SimulatePipelineRequest request = new SimulatePipelineRequest(); - request.id(restRequest.param("id")); - request.verbose(restRequest.paramAsBoolean("verbose", false)); + request.setId(restRequest.param("id")); + request.setVerbose(restRequest.paramAsBoolean("verbose", false)); if (RestActions.hasBodyContent(restRequest)) { - request.source(RestActions.getRestContent(restRequest)); + request.setSource(RestActions.getRestContent(restRequest)); } - client.execute(SimulatePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); + client.execute(SimulatePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java new file mode 100644 index 00000000000..b9fa46fe939 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.ingest.Data; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class TransportData implements Streamable, ToXContent { + private Data data; + + public TransportData() { + + } + + public TransportData(Data data) { + this.data = data; + } + + public Data get() { + return data; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + String index = in.readString(); + String type = in.readString(); + String id = in.readString(); + Map doc = in.readMap(); + this.data = new Data(index, type, id, doc); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(data.getIndex()); + out.writeString(data.getType()); + out.writeString(data.getId()); + out.writeMap(data.getDocument()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.DOCUMENT); + builder.field(Fields.MODIFIED, data.isModified()); + builder.field(Fields.INDEX, data.getIndex()); + builder.field(Fields.TYPE, data.getType()); + builder.field(Fields.ID, data.getId()); + builder.field(Fields.SOURCE, data.getDocument()); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TransportData that = (TransportData) o; + return Objects.equals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(data); + } + + static final class Fields { + static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc"); + static final XContentBuilderString MODIFIED = new XContentBuilderString("modified"); + static final XContentBuilderString INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString ID = new XContentBuilderString("_id"); + static final XContentBuilderString SOURCE = new XContentBuilderString("_source"); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java index e39596c777e..7d02f686469 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java @@ -68,34 +68,44 @@ public class ParsedSimulateRequest { private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory(); public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline"; - public ParsedSimulateRequest parse(String pipelineId, Map config, boolean verbose, PipelineStore pipelineStore) throws IOException { - Pipeline pipeline; - // if pipeline `id` passed to request, fetch pipeline from store. - if (pipelineId != null) { - pipeline = pipelineStore.get(pipelineId); - } else { - Map pipelineConfig = ConfigurationUtils.readOptionalMap(config, "pipeline"); - pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); - } - - List> docs = ConfigurationUtils.readList(config, "docs"); - + private List parseDocs(Map config) { + List> docs = ConfigurationUtils.readList(config, Fields.DOCS); List dataList = new ArrayList<>(); - - for (int i = 0; i < docs.size(); i++) { - Map dataMap = docs.get(i); - Map document = ConfigurationUtils.readOptionalMap(dataMap, "_source"); - if (document == null) { - document = Collections.emptyMap(); - } - Data data = new Data(ConfigurationUtils.readOptionalStringProperty(dataMap, "_index"), - ConfigurationUtils.readOptionalStringProperty(dataMap, "_type"), - ConfigurationUtils.readOptionalStringProperty(dataMap, "_id"), + for (Map dataMap : docs) { + Map document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE); + Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX), + ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE), + ConfigurationUtils.readStringProperty(dataMap, Fields.ID), document); dataList.add(data); } + return dataList; + } + public ParsedSimulateRequest parseWithPipelineId(String pipelineId, Map config, boolean verbose, PipelineStore pipelineStore) { + if (pipelineId == null) { + throw new IllegalArgumentException("param [pipeline] is null"); + } + Pipeline pipeline = pipelineStore.get(pipelineId); + List dataList = parseDocs(config); return new ParsedSimulateRequest(pipeline, dataList, verbose); + + } + + public ParsedSimulateRequest parse(Map config, boolean verbose, PipelineStore pipelineStore) throws IOException { + Map pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE); + Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); + List dataList = parseDocs(config); + return new ParsedSimulateRequest(pipeline, dataList, verbose); + } + + static final class Fields { + static final String PIPELINE = "pipeline"; + static final String DOCS = "docs"; + static final String SOURCE = "_source"; + static final String INDEX = "_index"; + static final String TYPE = "_type"; + static final String ID = "_id"; } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatedItemResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentResult.java similarity index 63% rename from plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatedItemResponse.java rename to plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentResult.java index d18e16bd56b..33ef7745404 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatedItemResponse.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentResult.java @@ -18,7 +18,7 @@ */ package org.elasticsearch.plugin.ingest.transport.simulate; -import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -26,46 +26,39 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.ingest.Data; +import org.elasticsearch.plugin.ingest.transport.TransportData; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Objects; -public class SimulatedItemResponse implements Streamable, ToXContent { +public class SimulateDocumentResult implements Streamable, ToXContent { - private Data data; - private List processorResultList; + private TransportData data; + private List processorResultList; private Throwable failure; - public SimulatedItemResponse() { + public SimulateDocumentResult() { } - public SimulatedItemResponse(Data data) { - this.data = data; + public SimulateDocumentResult(Data data) { + this.data = new TransportData(data); } - public SimulatedItemResponse(List processorResultList) { + public SimulateDocumentResult(List processorResultList) { this.processorResultList = processorResultList; } - public SimulatedItemResponse(Throwable failure) { + public SimulateDocumentResult(Throwable failure) { this.failure = failure; } public boolean isFailed() { if (failure != null) { return true; - } else if (processorResultList != null) { - for (ProcessorResult result : processorResultList) { - if (result.isFailed()) { - return true; - } - } } - return false; } @@ -74,10 +67,10 @@ public class SimulatedItemResponse implements Streamable, ToXContent { } public Data getData() { - return data; + return data.get(); } - public List getProcessorResultList() { + public List getProcessorResultList() { return processorResultList; } @@ -91,16 +84,13 @@ public class SimulatedItemResponse implements Streamable, ToXContent { int size = in.readVInt(); processorResultList = new ArrayList<>(); for (int i = 0; i < size; i++) { - ProcessorResult processorResult = new ProcessorResult(); + SimulateProcessorResult processorResult = new SimulateProcessorResult(); processorResult.readFrom(in); processorResultList.add(processorResult); } } else { - String index = in.readString(); - String type = in.readString(); - String id = in.readString(); - Map doc = in.readMap(); - this.data = new Data(index, type, id, doc); + this.data = new TransportData(); + this.data.readFrom(in); } } @@ -113,32 +103,27 @@ public class SimulatedItemResponse implements Streamable, ToXContent { out.writeThrowable(failure); } else if (isVerbose()) { out.writeVInt(processorResultList.size()); - for (ProcessorResult p : processorResultList) { + for (SimulateProcessorResult p : processorResultList) { p.writeTo(out); } } else { - out.writeString(data.getIndex()); - out.writeString(data.getType()); - out.writeString(data.getId()); - out.writeMap(data.getDocument()); + data.writeTo(out); } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(Fields.ERROR, isFailed()); - if (failure != null) { - builder.field(Fields.ERROR_MESSAGE, ExceptionsHelper.detailedMessage(failure)); + if (isFailed()) { + ElasticsearchException.renderThrowable(builder, params, failure); } else if (isVerbose()) { builder.startArray(Fields.PROCESSOR_RESULTS); - for (ProcessorResult processorResult : processorResultList) { - builder.value(processorResult); + for (SimulateProcessorResult processorResult : processorResultList) { + processorResult.toXContent(builder, params); } builder.endArray(); } else { - builder.field(Fields.MODIFIED, data.isModified()); - builder.field(Fields.DOCUMENT, data.asMap()); + data.toXContent(builder, params); } builder.endObject(); return builder; @@ -150,7 +135,7 @@ public class SimulatedItemResponse implements Streamable, ToXContent { if (obj == null || getClass() != obj.getClass()) { return false; } - SimulatedItemResponse other = (SimulatedItemResponse) obj; + SimulateDocumentResult other = (SimulateDocumentResult) obj; return Objects.equals(data, other.data) && Objects.equals(processorResultList, other.processorResultList) && Objects.equals(failure, other.failure); } @@ -160,10 +145,6 @@ public class SimulatedItemResponse implements Streamable, ToXContent { } static final class Fields { - static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc"); - static final XContentBuilderString ERROR = new XContentBuilderString("error"); - static final XContentBuilderString ERROR_MESSAGE = new XContentBuilderString("error_message"); - static final XContentBuilderString MODIFIED = new XContentBuilderString("modified"); static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results"); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java index 119bbc91ea8..a4518273467 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java @@ -19,6 +19,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.ingest.Data; import org.elasticsearch.ingest.Pipeline; @@ -30,7 +31,7 @@ import java.util.List; public class SimulateExecutionService { - static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT; + private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT; private final ThreadPool threadPool; @@ -40,18 +41,18 @@ public class SimulateExecutionService { } - SimulatedItemResponse executeItem(Pipeline pipeline, Data data) { + SimulateDocumentResult executeItem(Pipeline pipeline, Data data) { try { pipeline.execute(data); - return new SimulatedItemResponse(data); + return new SimulateDocumentResult(data); } catch (Exception e) { - return new SimulatedItemResponse(e); + return new SimulateDocumentResult(e); } } - SimulatedItemResponse executeVerboseItem(Pipeline pipeline, Data data) { - List processorResultList = new ArrayList<>(); + SimulateDocumentResult executeVerboseItem(Pipeline pipeline, Data data) { + List processorResultList = new ArrayList<>(); Data currentData = new Data(data); for (int i = 0; i < pipeline.getProcessors().size(); i++) { Processor processor = pipeline.getProcessors().get(i); @@ -59,39 +60,30 @@ public class SimulateExecutionService { try { processor.execute(currentData); - processorResultList.add(new ProcessorResult(processorId, currentData)); + processorResultList.add(new SimulateProcessorResult(processorId, currentData)); } catch (Exception e) { - processorResultList.add(new ProcessorResult(processorId, e)); + processorResultList.add(new SimulateProcessorResult(processorId, e)); } currentData = new Data(currentData); } - return new SimulatedItemResponse(processorResultList); + return new SimulateDocumentResult(processorResultList); } - SimulatePipelineResponse execute(ParsedSimulateRequest request) { - List responses = new ArrayList<>(); - for (Data data : request.getDocuments()) { - if (request.isVerbose()) { - responses.add(executeVerboseItem(request.getPipeline(), data)); - } else { - responses.add(executeItem(request.getPipeline(), data)); - } - } - return new SimulatePipelineResponse(request.getPipeline().getId(), responses); - } - - public void execute(ParsedSimulateRequest request, Listener listener) { + public void execute(ParsedSimulateRequest request, ActionListener listener) { threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() { @Override public void run() { - SimulatePipelineResponse response = execute(request); - listener.onResponse(response); + List responses = new ArrayList<>(); + for (Data data : request.getDocuments()) { + if (request.isVerbose()) { + responses.add(executeVerboseItem(request.getPipeline(), data)); + } else { + responses.add(executeItem(request.getPipeline(), data)); + } + } + listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), responses)); } }); } - - public interface Listener { - void onResponse(SimulatePipelineResponse response); - } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java index 8c5a5f1ef71..4aec025b12d 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java @@ -44,27 +44,27 @@ public class SimulatePipelineRequest extends ActionRequest { return validationException; } - public String id() { + public String getId() { return id; } - public void id(String id) { + public void setId(String id) { this.id = id; } - public boolean verbose() { + public boolean isVerbose() { return verbose; } - public void verbose(boolean verbose) { + public void setVerbose(boolean verbose) { this.verbose = verbose; } - public BytesReference source() { + public BytesReference getSource() { return source; } - public void source(BytesReference source) { + public void setSource(BytesReference source) { this.source = source; } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java index 7b140b345f4..07998291922 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestBuilder.java @@ -30,17 +30,17 @@ public class SimulatePipelineRequestBuilder extends ActionRequestBuilder responses; + private List responses; public SimulatePipelineResponse() { } - public SimulatePipelineResponse(String pipelineId, List responses) { + public SimulatePipelineResponse(String pipelineId, List responses) { this.pipelineId = pipelineId; this.responses = Collections.unmodifiableList(responses); } - public String pipelineId() { + public String getPipelineId() { return pipelineId; } - public void pipelineId(String pipelineId) { + public void setPipelineId(String pipelineId) { this.pipelineId = pipelineId; } - public List responses() { + public List getResponses() { return responses; } - public void responses(List responses) { + public void setResponses(List responses) { this.responses = responses; } @@ -67,7 +67,7 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo super.writeTo(out); out.writeString(pipelineId); out.writeVInt(responses.size()); - for (SimulatedItemResponse response : responses) { + for (SimulateDocumentResult response : responses) { response.writeTo(out); } } @@ -79,7 +79,7 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo int responsesLength = in.readVInt(); responses = new ArrayList<>(); for (int i = 0; i < responsesLength; i++) { - SimulatedItemResponse response = new SimulatedItemResponse(); + SimulateDocumentResult response = new SimulateDocumentResult(); response.readFrom(in); responses.add(response); } @@ -88,25 +88,15 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startArray("docs"); - for (SimulatedItemResponse response : responses) { - builder.value(response); + builder.startArray(Fields.DOCUMENTS); + for (SimulateDocumentResult response : responses) { + response.toXContent(builder, params); } builder.endArray(); return builder; } - @Override - public RestStatus status() { - for (SimulatedItemResponse response : responses) { - if (response.isFailed()) { - return RestStatus.BAD_REQUEST; - } - } - return RestStatus.OK; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -120,4 +110,8 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo public int hashCode() { return Objects.hash(pipelineId, responses); } + + static final class Fields { + static final XContentBuilderString DOCUMENTS = new XContentBuilderString("docs"); + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java index d52a92715a3..305c4122203 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java @@ -46,22 +46,21 @@ public class SimulatePipelineTransportAction extends HandledTransportAction listener) { - Map source = XContentHelper.convertToMap(request.source(), false).v2(); + Map source = XContentHelper.convertToMap(request.getSource(), false).v2(); ParsedSimulateRequest simulateRequest; ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser(); try { - simulateRequest = parser.parse(request.id(), source, request.verbose(), pipelineStore); + if (request.getId() != null) { + simulateRequest = parser.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore); + } else { + simulateRequest = parser.parse(source, request.isVerbose(), pipelineStore); + } } catch (IOException e) { listener.onFailure(e); return; } - executionService.execute(simulateRequest, new SimulateExecutionService.Listener() { - @Override - public void onResponse(SimulatePipelineResponse response) { - listener.onResponse(response); - } - }); + executionService.execute(simulateRequest, listener); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ProcessorResult.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java similarity index 66% rename from plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ProcessorResult.java rename to plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java index 3f2073e480b..ca97ec5a604 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ProcessorResult.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java @@ -18,7 +18,7 @@ */ package org.elasticsearch.plugin.ingest.transport.simulate; -import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -26,27 +26,27 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.ingest.Data; +import org.elasticsearch.plugin.ingest.transport.TransportData; import java.io.IOException; -import java.util.Map; import java.util.Objects; -public class ProcessorResult implements Streamable, ToXContent { +public class SimulateProcessorResult implements Streamable, ToXContent { private String processorId; - private Data data; + private TransportData data; private Throwable failure; - public ProcessorResult() { + public SimulateProcessorResult() { } - public ProcessorResult(String processorId, Data data) { + public SimulateProcessorResult(String processorId, Data data) { this.processorId = processorId; - this.data = data; + this.data = new TransportData(data); } - public ProcessorResult(String processorId, Throwable failure) { + public SimulateProcessorResult(String processorId, Throwable failure) { this.processorId = processorId; this.failure = failure; } @@ -56,7 +56,7 @@ public class ProcessorResult implements Streamable, ToXContent { } public Data getData() { - return data; + return data.get(); } public String getProcessorId() { @@ -69,12 +69,8 @@ public class ProcessorResult implements Streamable, ToXContent { if (isFailure) { this.failure = in.readThrowable(); } else { - this.processorId = in.readString(); - String index = in.readString(); - String type = in.readString(); - String id = in.readString(); - Map doc = in.readMap(); - this.data = new Data(index, type, id, doc); + this.data = new TransportData(); + this.data.readFrom(in); } } @@ -85,10 +81,7 @@ public class ProcessorResult implements Streamable, ToXContent { out.writeThrowable(failure); } else { out.writeString(processorId); - out.writeString(data.getIndex()); - out.writeString(data.getType()); - out.writeString(data.getId()); - out.writeMap(data.getDocument()); + data.writeTo(out); } } @@ -96,12 +89,10 @@ public class ProcessorResult implements Streamable, ToXContent { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(Fields.PROCESSOR_ID, processorId); - builder.field(Fields.ERROR, isFailed()); if (isFailed()) { - builder.field(Fields.ERROR_MESSAGE, ExceptionsHelper.detailedMessage(failure)); + ElasticsearchException.renderThrowable(builder, params, failure); } else { - builder.field(Fields.MODIFIED, data.isModified()); - builder.field(Fields.DOCUMENT, data.asMap()); + data.toXContent(builder, params); } builder.endObject(); return builder; @@ -109,11 +100,13 @@ public class ProcessorResult implements Streamable, ToXContent { @Override public boolean equals(Object obj) { - if (obj == this) { return true; } + if (obj == this) { + return true; + } if (obj == null || getClass() != obj.getClass()) { return false; } - ProcessorResult other = (ProcessorResult) obj; + SimulateProcessorResult other = (SimulateProcessorResult) obj; return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure); } @@ -123,10 +116,6 @@ public class ProcessorResult implements Streamable, ToXContent { } static final class Fields { - static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc"); static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id"); - static final XContentBuilderString ERROR = new XContentBuilderString("error"); - static final XContentBuilderString ERROR_MESSAGE = new XContentBuilderString("error_message"); - static final XContentBuilderString MODIFIED = new XContentBuilderString("modified"); } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java index e6cae34035e..918064a17c7 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -84,4 +85,29 @@ public class DataTests extends ESTestCase { data.addField("fizz.new", "bar"); assertThat(data.getProperty("fizz.new"), equalTo("bar")); } + + public void testEquals() { + Data otherData = new Data(data); + assertThat(otherData, equalTo(data)); + } + + public void testNotEqualsDiffIndex() { + Data otherData = new Data(data.getIndex() + "foo", data.getType(), data.getId(), data.getDocument()); + assertThat(otherData, not(equalTo(data))); + } + + public void testNotEqualsDiffType() { + Data otherData = new Data(data.getIndex(), data.getType() + "foo", data.getId(), data.getDocument()); + assertThat(otherData, not(equalTo(data))); + } + + public void testNotEqualsDiffId() { + Data otherData = new Data(data.getIndex(), data.getType(), data.getId() + "foo", data.getDocument()); + assertThat(otherData, not(equalTo(data))); + } + + public void testNotEqualsDiffDocument() { + Data otherData = new Data(data.getIndex(), data.getType(), data.getId(), Collections.emptyMap()); + assertThat(otherData, not(equalTo(data))); + } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index d0d11ae60a0..97ed3339090 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -33,7 +33,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse; -import org.elasticsearch.plugin.ingest.transport.simulate.SimulatedItemResponse; +import org.elasticsearch.plugin.ingest.transport.simulate.SimulateDocumentResult; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -104,8 +104,8 @@ public class IngestClientIT extends ESIntegTestCase { Map expectedDoc = new HashMap<>(); expectedDoc.put("foo", "bar"); Data expectedData = new Data("index", "type", "id", expectedDoc); - SimulatedItemResponse expectedResponse = new SimulatedItemResponse(expectedData); - List expectedResponses = Arrays.asList(expectedResponse); + SimulateDocumentResult expectedResponse = new SimulateDocumentResult(expectedData); + List expectedResponses = Arrays.asList(expectedResponse); SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses); assertThat(response, equalTo(expected)); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java new file mode 100644 index 00000000000..08940ee3ce2 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport; + +import org.elasticsearch.ingest.Data; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class TransportDataTests extends ESTestCase { + + public void testEquals() throws Exception { + Data data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + Data otherData = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + assertThat(data, equalTo(otherData)); + } + + public void testNotEquals() throws Exception { + Data data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + Data otherData = new Data("_index2", "_type", "_id", Collections.emptyMap()); + assertThat(data, not(equalTo(otherData))); + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java index d88bf7a5467..03c4c3af7f8 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java @@ -35,28 +35,26 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ParsedSimulateRequestParserTests extends ESTestCase { - private static final ParsedSimulateRequest.Parser PARSER = new ParsedSimulateRequest.Parser(); - - private Map processorRegistry; private PipelineStore store; - private Processor processor; + private ParsedSimulateRequest.Parser parser; private Pipeline pipeline; private Data data; @Before public void init() throws IOException { + parser = new ParsedSimulateRequest.Parser(); List uppercase = Collections.unmodifiableList(Collections.singletonList("foo")); - processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null); + Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null); pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.unmodifiableList(Arrays.asList(processor))); - data = new Data("_index", "_type", "_id", Collections.emptyMap()); - processorRegistry = new HashMap<>(); + data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + Map processorRegistry = new HashMap<>(); processorRegistry.put("mutate", new MutateProcessor.Factory()); store = mock(PipelineStore.class); when(store.get("_id")).thenReturn(pipeline); when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry); } - public void testParse_UsingPipelineStore() throws Exception { + public void testParseUsingPipelineStore() throws Exception { ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); Map raw = new HashMap<>(); @@ -65,14 +63,15 @@ public class ParsedSimulateRequestParserTests extends ESTestCase { doc.put("_index", "_index"); doc.put("_type", "_type"); doc.put("_id", "_id"); + doc.put("_source", data.getDocument()); docs.add(doc); raw.put("docs", docs); - ParsedSimulateRequest actualRequest = PARSER.parse("_id", raw, false, store); + ParsedSimulateRequest actualRequest = parser.parseWithPipelineId("_id", raw, false, store); assertThat(actualRequest, equalTo(expectedRequest)); } - public void testParse_ProvidedPipeline() throws Exception { + public void testParseWithProvidedPipeline() throws Exception { ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); Map raw = new HashMap<>(); @@ -81,6 +80,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase { doc.put("_index", "_index"); doc.put("_type", "_type"); doc.put("_id", "_id"); + doc.put("_source", data.getDocument()); docs.add(doc); Map processorConfig = new HashMap<>(); @@ -91,7 +91,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase { raw.put("docs", docs); raw.put("pipeline", pipelineConfig); - ParsedSimulateRequest actualRequest = PARSER.parse(null, raw, false, store); + ParsedSimulateRequest actualRequest = parser.parse(raw, false, store); assertThat(actualRequest, equalTo(expectedRequest)); } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java index 8ed2cc6abec..cf108b0db84 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java @@ -19,11 +19,12 @@ package org.elasticsearch.plugin.ingest.transport.simulate; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.Data; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; -import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.transport.TransportData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -37,26 +38,28 @@ import static org.mockito.Mockito.*; public class SimulateExecutionServiceTests extends ESTestCase { - private PipelineStore store; private ThreadPool threadPool; private SimulateExecutionService executionService; private Pipeline pipeline; private Processor processor; private Data data; + private TransportData transportData; + private ActionListener listener; @Before public void setup() { - store = mock(PipelineStore.class); threadPool = new ThreadPool( Settings.builder() - .put("name", "_name") + .put("name", getClass().getName()) .build() ); executionService = new SimulateExecutionService(threadPool); processor = mock(Processor.class); when(processor.getType()).thenReturn("mock"); pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor)); - data = new Data("_index", "_type", "_id", Collections.emptyMap()); + data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + transportData = new TransportData(data); + listener = mock(ActionListener.class); } @After @@ -65,43 +68,42 @@ public class SimulateExecutionServiceTests extends ESTestCase { } public void testExecuteVerboseItem() throws Exception { - SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse( - Arrays.asList(new ProcessorResult("processor[mock]-0", data), new ProcessorResult("processor[mock]-1", data))); - SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data); + SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult( + Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data))); + SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data); verify(processor, times(2)).execute(data); assertThat(actualItemResponse, equalTo(expectedItemResponse)); } public void testExecuteItem() throws Exception { - SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(data); - SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data); + SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(data); + SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data); verify(processor, times(2)).execute(data); assertThat(actualItemResponse, equalTo(expectedItemResponse)); } - public void testExecuteVerboseItem_Failure() throws Exception { + public void testExecuteVerboseItemWithFailure() throws Exception { Exception e = new RuntimeException("processor failed"); - SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse( - Arrays.asList(new ProcessorResult("processor[mock]-0", e), new ProcessorResult("processor[mock]-1", data)) + SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult( + Arrays.asList(new SimulateProcessorResult("processor[mock]-0", e), new SimulateProcessorResult("processor[mock]-1", data)) ); doThrow(e).doNothing().when(processor).execute(data); - SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data); + SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data); verify(processor, times(2)).execute(data); assertThat(actualItemResponse, equalTo(expectedItemResponse)); } - public void testExecuteItem_Failure() throws Exception { + public void testExecuteItemWithFailure() throws Exception { Exception e = new RuntimeException("processor failed"); - SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(e); + SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(e); doThrow(e).when(processor).execute(data); - SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data); + SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data); verify(processor, times(1)).execute(data); assertThat(actualItemResponse, equalTo(expectedItemResponse)); } public void testExecute() throws Exception { - SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class); - SimulatedItemResponse itemResponse = new SimulatedItemResponse(data); + SimulateDocumentResult itemResponse = new SimulateDocumentResult(data); ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); executionService.execute(request, listener); SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse)); @@ -114,11 +116,10 @@ public class SimulateExecutionServiceTests extends ESTestCase { }); } - public void testExecute_Verbose() throws Exception { - SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class); + public void testExecuteWithVerbose() throws Exception { ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true); - SimulatedItemResponse itemResponse = new SimulatedItemResponse( - Arrays.asList(new ProcessorResult("processor[mock]-0", data), new ProcessorResult("processor[mock]-1", data))); + SimulateDocumentResult itemResponse = new SimulateDocumentResult( + Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data))); executionService.execute(request, listener); SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse)); assertBusy(new Runnable() { diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml index 7596469b7f2..156ba8cfd67 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml @@ -47,8 +47,7 @@ ] } - length: { docs: 1 } - - is_false: docs.0.error - - is_true: docs.0.modified + - is_true: docs.0.doc.modified - match: { docs.0.doc._source.foo: "bar" } - match: { docs.0.doc._source.field2: "_value" } @@ -132,8 +131,7 @@ - length: { docs: 1 } - length: { docs.0.processor_results: 2 } - match: { docs.0.processor_results.0.processor_id: "processor[mutate]-0" } - - is_false: docs.0.processor_results.0.error - - is_true: docs.0.processor_results.0.modified + - is_true: docs.0.processor_results.0.doc.modified - length: { docs.0.processor_results.0.doc._source: 2 } - match: { docs.0.processor_results.0.doc._source.foo: "bar" } - match: { docs.0.processor_results.0.doc._source.field2: "_value" } @@ -149,7 +147,6 @@ wait_for_status: green - do: - catch: request ingest.simulate: body: > { @@ -183,10 +180,8 @@ ] } - length: { docs: 2 } - - is_true: docs.0.error - - match: { docs.0.error_message: "NullPointerException[null]" } - - is_false: docs.1.error - - is_true: docs.1.modified + - match: { docs.0.error.type: "null_pointer_exception" } + - is_true: docs.1.doc.modified - match: { docs.1.doc._source.foo: "BAR" } --- @@ -196,7 +191,6 @@ wait_for_status: green - do: - catch: request ingest.simulate: verbose: true body: > @@ -240,12 +234,11 @@ ] } - length: { docs: 2 } - - is_true: docs.0.error - - is_false: docs.1.error - length: { docs.0.processor_results: 2 } - - is_false: docs.1.processor_results.0.error - - match: { docs.0.processor_results.0.error_message: "NumberFormatException[For input string: \"bar\"]" } - - is_false: docs.1.processor_results.1.error + - match: { docs.0.processor_results.0.error.type: "number_format_exception" } + - match: { docs.0.processor_results.1.doc._index: "index" } + - match: { docs.0.processor_results.1.doc._type: "type" } + - match: { docs.0.processor_results.1.doc._id: "id" } - match: { docs.0.processor_results.1.doc._source.foo: "bar" } - match: { docs.1.processor_results.1.doc._source.bar: "HELLO" } - match: { docs.1.processor_results.0.doc._source.foo: 5 }