updated with cosmetic changes
This commit is contained in:
parent
c4951ef74f
commit
af1de8e1cc
|
@ -111,6 +111,20 @@ public final class ConfigurationUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns and removes the specified property of type map from the specified configuration map.
|
||||||
|
*
|
||||||
|
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||||
|
* If the property is missing an {@link IllegalArgumentException} is thrown
|
||||||
|
*/
|
||||||
|
public static <T> Map<String, T> readMap(Map<String, Object> configuration, String propertyName) {
|
||||||
|
Object value = configuration.remove(propertyName);
|
||||||
|
if (value == null) {
|
||||||
|
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
|
||||||
|
}
|
||||||
|
|
||||||
|
return readMap(propertyName, value);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns and removes the specified property of type map from the specified configuration map.
|
* Returns and removes the specified property of type map from the specified configuration map.
|
||||||
|
@ -122,6 +136,11 @@ public final class ConfigurationUtils {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return readMap(propertyName, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> Map<String, T> readMap(String propertyName, Object value) {
|
||||||
if (value instanceof Map) {
|
if (value instanceof Map) {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, T> map = (Map<String, T>) value;
|
Map<String, T> map = (Map<String, T>) value;
|
||||||
|
@ -130,4 +149,5 @@ public final class ConfigurationUtils {
|
||||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.rest.RestChannel;
|
||||||
import org.elasticsearch.rest.RestController;
|
import org.elasticsearch.rest.RestController;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
import org.elasticsearch.rest.action.support.RestActions;
|
import org.elasticsearch.rest.action.support.RestActions;
|
||||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
import org.elasticsearch.rest.action.support.RestToXContentListener;
|
||||||
|
|
||||||
public class RestSimulatePipelineAction extends BaseRestHandler {
|
public class RestSimulatePipelineAction extends BaseRestHandler {
|
||||||
|
|
||||||
|
@ -45,13 +45,13 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
|
||||||
@Override
|
@Override
|
||||||
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
|
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
|
||||||
SimulatePipelineRequest request = new SimulatePipelineRequest();
|
SimulatePipelineRequest request = new SimulatePipelineRequest();
|
||||||
request.id(restRequest.param("id"));
|
request.setId(restRequest.param("id"));
|
||||||
request.verbose(restRequest.paramAsBoolean("verbose", false));
|
request.setVerbose(restRequest.paramAsBoolean("verbose", false));
|
||||||
|
|
||||||
if (RestActions.hasBodyContent(restRequest)) {
|
if (RestActions.hasBodyContent(restRequest)) {
|
||||||
request.source(RestActions.getRestContent(restRequest));
|
request.setSource(RestActions.getRestContent(restRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
client.execute(SimulatePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
|
client.execute(SimulatePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.plugin.ingest.transport;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
|
import org.elasticsearch.ingest.Data;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public class TransportData implements Streamable, ToXContent {
|
||||||
|
private Data data;
|
||||||
|
|
||||||
|
public TransportData() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public TransportData(Data data) {
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Data get() {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
|
String index = in.readString();
|
||||||
|
String type = in.readString();
|
||||||
|
String id = in.readString();
|
||||||
|
Map<String, Object> doc = in.readMap();
|
||||||
|
this.data = new Data(index, type, id, doc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeString(data.getIndex());
|
||||||
|
out.writeString(data.getType());
|
||||||
|
out.writeString(data.getId());
|
||||||
|
out.writeMap(data.getDocument());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
builder.startObject(Fields.DOCUMENT);
|
||||||
|
builder.field(Fields.MODIFIED, data.isModified());
|
||||||
|
builder.field(Fields.INDEX, data.getIndex());
|
||||||
|
builder.field(Fields.TYPE, data.getType());
|
||||||
|
builder.field(Fields.ID, data.getId());
|
||||||
|
builder.field(Fields.SOURCE, data.getDocument());
|
||||||
|
builder.endObject();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
TransportData that = (TransportData) o;
|
||||||
|
return Objects.equals(data, that.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class Fields {
|
||||||
|
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
|
||||||
|
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
|
||||||
|
static final XContentBuilderString INDEX = new XContentBuilderString("_index");
|
||||||
|
static final XContentBuilderString TYPE = new XContentBuilderString("_type");
|
||||||
|
static final XContentBuilderString ID = new XContentBuilderString("_id");
|
||||||
|
static final XContentBuilderString SOURCE = new XContentBuilderString("_source");
|
||||||
|
}
|
||||||
|
}
|
|
@ -68,34 +68,44 @@ public class ParsedSimulateRequest {
|
||||||
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
|
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
|
||||||
public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
|
public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
|
||||||
|
|
||||||
public ParsedSimulateRequest parse(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
|
private List<Data> parseDocs(Map<String, Object> config) {
|
||||||
Pipeline pipeline;
|
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS);
|
||||||
// if pipeline `id` passed to request, fetch pipeline from store.
|
|
||||||
if (pipelineId != null) {
|
|
||||||
pipeline = pipelineStore.get(pipelineId);
|
|
||||||
} else {
|
|
||||||
Map<String, Object> pipelineConfig = ConfigurationUtils.readOptionalMap(config, "pipeline");
|
|
||||||
pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, "docs");
|
|
||||||
|
|
||||||
List<Data> dataList = new ArrayList<>();
|
List<Data> dataList = new ArrayList<>();
|
||||||
|
for (Map<String, Object> dataMap : docs) {
|
||||||
for (int i = 0; i < docs.size(); i++) {
|
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
|
||||||
Map<String, Object> dataMap = docs.get(i);
|
Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
|
||||||
Map<String, Object> document = ConfigurationUtils.readOptionalMap(dataMap, "_source");
|
ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE),
|
||||||
if (document == null) {
|
ConfigurationUtils.readStringProperty(dataMap, Fields.ID),
|
||||||
document = Collections.emptyMap();
|
|
||||||
}
|
|
||||||
Data data = new Data(ConfigurationUtils.readOptionalStringProperty(dataMap, "_index"),
|
|
||||||
ConfigurationUtils.readOptionalStringProperty(dataMap, "_type"),
|
|
||||||
ConfigurationUtils.readOptionalStringProperty(dataMap, "_id"),
|
|
||||||
document);
|
document);
|
||||||
dataList.add(data);
|
dataList.add(data);
|
||||||
}
|
}
|
||||||
|
return dataList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ParsedSimulateRequest parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
|
||||||
|
if (pipelineId == null) {
|
||||||
|
throw new IllegalArgumentException("param [pipeline] is null");
|
||||||
|
}
|
||||||
|
Pipeline pipeline = pipelineStore.get(pipelineId);
|
||||||
|
List<Data> dataList = parseDocs(config);
|
||||||
|
return new ParsedSimulateRequest(pipeline, dataList, verbose);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public ParsedSimulateRequest parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
|
||||||
|
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
|
||||||
|
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
|
||||||
|
List<Data> dataList = parseDocs(config);
|
||||||
return new ParsedSimulateRequest(pipeline, dataList, verbose);
|
return new ParsedSimulateRequest(pipeline, dataList, verbose);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final class Fields {
|
||||||
|
static final String PIPELINE = "pipeline";
|
||||||
|
static final String DOCS = "docs";
|
||||||
|
static final String SOURCE = "_source";
|
||||||
|
static final String INDEX = "_index";
|
||||||
|
static final String TYPE = "_type";
|
||||||
|
static final String ID = "_id";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
|
@ -26,46 +26,39 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
import org.elasticsearch.ingest.Data;
|
import org.elasticsearch.ingest.Data;
|
||||||
|
import org.elasticsearch.plugin.ingest.transport.TransportData;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class SimulatedItemResponse implements Streamable, ToXContent {
|
public class SimulateDocumentResult implements Streamable, ToXContent {
|
||||||
|
|
||||||
private Data data;
|
private TransportData data;
|
||||||
private List<ProcessorResult> processorResultList;
|
private List<SimulateProcessorResult> processorResultList;
|
||||||
private Throwable failure;
|
private Throwable failure;
|
||||||
|
|
||||||
public SimulatedItemResponse() {
|
public SimulateDocumentResult() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatedItemResponse(Data data) {
|
public SimulateDocumentResult(Data data) {
|
||||||
this.data = data;
|
this.data = new TransportData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatedItemResponse(List<ProcessorResult> processorResultList) {
|
public SimulateDocumentResult(List<SimulateProcessorResult> processorResultList) {
|
||||||
this.processorResultList = processorResultList;
|
this.processorResultList = processorResultList;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatedItemResponse(Throwable failure) {
|
public SimulateDocumentResult(Throwable failure) {
|
||||||
this.failure = failure;
|
this.failure = failure;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isFailed() {
|
public boolean isFailed() {
|
||||||
if (failure != null) {
|
if (failure != null) {
|
||||||
return true;
|
return true;
|
||||||
} else if (processorResultList != null) {
|
|
||||||
for (ProcessorResult result : processorResultList) {
|
|
||||||
if (result.isFailed()) {
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,10 +67,10 @@ public class SimulatedItemResponse implements Streamable, ToXContent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Data getData() {
|
public Data getData() {
|
||||||
return data;
|
return data.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ProcessorResult> getProcessorResultList() {
|
public List<SimulateProcessorResult> getProcessorResultList() {
|
||||||
return processorResultList;
|
return processorResultList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,16 +84,13 @@ public class SimulatedItemResponse implements Streamable, ToXContent {
|
||||||
int size = in.readVInt();
|
int size = in.readVInt();
|
||||||
processorResultList = new ArrayList<>();
|
processorResultList = new ArrayList<>();
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
ProcessorResult processorResult = new ProcessorResult();
|
SimulateProcessorResult processorResult = new SimulateProcessorResult();
|
||||||
processorResult.readFrom(in);
|
processorResult.readFrom(in);
|
||||||
processorResultList.add(processorResult);
|
processorResultList.add(processorResult);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
String index = in.readString();
|
this.data = new TransportData();
|
||||||
String type = in.readString();
|
this.data.readFrom(in);
|
||||||
String id = in.readString();
|
|
||||||
Map<String, Object> doc = in.readMap();
|
|
||||||
this.data = new Data(index, type, id, doc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,32 +103,27 @@ public class SimulatedItemResponse implements Streamable, ToXContent {
|
||||||
out.writeThrowable(failure);
|
out.writeThrowable(failure);
|
||||||
} else if (isVerbose()) {
|
} else if (isVerbose()) {
|
||||||
out.writeVInt(processorResultList.size());
|
out.writeVInt(processorResultList.size());
|
||||||
for (ProcessorResult p : processorResultList) {
|
for (SimulateProcessorResult p : processorResultList) {
|
||||||
p.writeTo(out);
|
p.writeTo(out);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
out.writeString(data.getIndex());
|
data.writeTo(out);
|
||||||
out.writeString(data.getType());
|
|
||||||
out.writeString(data.getId());
|
|
||||||
out.writeMap(data.getDocument());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.field(Fields.ERROR, isFailed());
|
if (isFailed()) {
|
||||||
if (failure != null) {
|
ElasticsearchException.renderThrowable(builder, params, failure);
|
||||||
builder.field(Fields.ERROR_MESSAGE, ExceptionsHelper.detailedMessage(failure));
|
|
||||||
} else if (isVerbose()) {
|
} else if (isVerbose()) {
|
||||||
builder.startArray(Fields.PROCESSOR_RESULTS);
|
builder.startArray(Fields.PROCESSOR_RESULTS);
|
||||||
for (ProcessorResult processorResult : processorResultList) {
|
for (SimulateProcessorResult processorResult : processorResultList) {
|
||||||
builder.value(processorResult);
|
processorResult.toXContent(builder, params);
|
||||||
}
|
}
|
||||||
builder.endArray();
|
builder.endArray();
|
||||||
} else {
|
} else {
|
||||||
builder.field(Fields.MODIFIED, data.isModified());
|
data.toXContent(builder, params);
|
||||||
builder.field(Fields.DOCUMENT, data.asMap());
|
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
@ -150,7 +135,7 @@ public class SimulatedItemResponse implements Streamable, ToXContent {
|
||||||
if (obj == null || getClass() != obj.getClass()) {
|
if (obj == null || getClass() != obj.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
SimulatedItemResponse other = (SimulatedItemResponse) obj;
|
SimulateDocumentResult other = (SimulateDocumentResult) obj;
|
||||||
return Objects.equals(data, other.data) && Objects.equals(processorResultList, other.processorResultList) && Objects.equals(failure, other.failure);
|
return Objects.equals(data, other.data) && Objects.equals(processorResultList, other.processorResultList) && Objects.equals(failure, other.failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,10 +145,6 @@ public class SimulatedItemResponse implements Streamable, ToXContent {
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class Fields {
|
static final class Fields {
|
||||||
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
|
|
||||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
|
||||||
static final XContentBuilderString ERROR_MESSAGE = new XContentBuilderString("error_message");
|
|
||||||
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
|
|
||||||
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
|
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.ingest.Data;
|
import org.elasticsearch.ingest.Data;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
|
@ -30,7 +31,7 @@ import java.util.List;
|
||||||
|
|
||||||
public class SimulateExecutionService {
|
public class SimulateExecutionService {
|
||||||
|
|
||||||
static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
|
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
@ -40,18 +41,18 @@ public class SimulateExecutionService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SimulatedItemResponse executeItem(Pipeline pipeline, Data data) {
|
SimulateDocumentResult executeItem(Pipeline pipeline, Data data) {
|
||||||
try {
|
try {
|
||||||
pipeline.execute(data);
|
pipeline.execute(data);
|
||||||
return new SimulatedItemResponse(data);
|
return new SimulateDocumentResult(data);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return new SimulatedItemResponse(e);
|
return new SimulateDocumentResult(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SimulatedItemResponse executeVerboseItem(Pipeline pipeline, Data data) {
|
SimulateDocumentResult executeVerboseItem(Pipeline pipeline, Data data) {
|
||||||
List<ProcessorResult> processorResultList = new ArrayList<>();
|
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
|
||||||
Data currentData = new Data(data);
|
Data currentData = new Data(data);
|
||||||
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
|
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
|
||||||
Processor processor = pipeline.getProcessors().get(i);
|
Processor processor = pipeline.getProcessors().get(i);
|
||||||
|
@ -59,18 +60,21 @@ public class SimulateExecutionService {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
processor.execute(currentData);
|
processor.execute(currentData);
|
||||||
processorResultList.add(new ProcessorResult(processorId, currentData));
|
processorResultList.add(new SimulateProcessorResult(processorId, currentData));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
processorResultList.add(new ProcessorResult(processorId, e));
|
processorResultList.add(new SimulateProcessorResult(processorId, e));
|
||||||
}
|
}
|
||||||
|
|
||||||
currentData = new Data(currentData);
|
currentData = new Data(currentData);
|
||||||
}
|
}
|
||||||
return new SimulatedItemResponse(processorResultList);
|
return new SimulateDocumentResult(processorResultList);
|
||||||
}
|
}
|
||||||
|
|
||||||
SimulatePipelineResponse execute(ParsedSimulateRequest request) {
|
public void execute(ParsedSimulateRequest request, ActionListener<SimulatePipelineResponse> listener) {
|
||||||
List<SimulatedItemResponse> responses = new ArrayList<>();
|
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
List<SimulateDocumentResult> responses = new ArrayList<>();
|
||||||
for (Data data : request.getDocuments()) {
|
for (Data data : request.getDocuments()) {
|
||||||
if (request.isVerbose()) {
|
if (request.isVerbose()) {
|
||||||
responses.add(executeVerboseItem(request.getPipeline(), data));
|
responses.add(executeVerboseItem(request.getPipeline(), data));
|
||||||
|
@ -78,20 +82,8 @@ public class SimulateExecutionService {
|
||||||
responses.add(executeItem(request.getPipeline(), data));
|
responses.add(executeItem(request.getPipeline(), data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new SimulatePipelineResponse(request.getPipeline().getId(), responses);
|
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), responses));
|
||||||
}
|
|
||||||
|
|
||||||
public void execute(ParsedSimulateRequest request, Listener listener) {
|
|
||||||
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
SimulatePipelineResponse response = execute(request);
|
|
||||||
listener.onResponse(response);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Listener {
|
|
||||||
void onResponse(SimulatePipelineResponse response);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,27 +44,27 @@ public class SimulatePipelineRequest extends ActionRequest {
|
||||||
return validationException;
|
return validationException;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String id() {
|
public String getId() {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void id(String id) {
|
public void setId(String id) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean verbose() {
|
public boolean isVerbose() {
|
||||||
return verbose;
|
return verbose;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verbose(boolean verbose) {
|
public void setVerbose(boolean verbose) {
|
||||||
this.verbose = verbose;
|
this.verbose = verbose;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BytesReference source() {
|
public BytesReference getSource() {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void source(BytesReference source) {
|
public void setSource(BytesReference source) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,17 +30,17 @@ public class SimulatePipelineRequestBuilder extends ActionRequestBuilder<Simulat
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatePipelineRequestBuilder setId(String id) {
|
public SimulatePipelineRequestBuilder setId(String id) {
|
||||||
request.id(id);
|
request.setId(id);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatePipelineRequestBuilder setVerbose(boolean verbose) {
|
public SimulatePipelineRequestBuilder setVerbose(boolean verbose) {
|
||||||
request.verbose(verbose);
|
request.setVerbose(verbose);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatePipelineRequestBuilder setSource(BytesReference source) {
|
public SimulatePipelineRequestBuilder setSource(BytesReference source) {
|
||||||
request.source(source);
|
request.setSource(source);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,9 +22,9 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -32,33 +32,33 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class SimulatePipelineResponse extends ActionResponse implements StatusToXContent {
|
public class SimulatePipelineResponse extends ActionResponse implements ToXContent {
|
||||||
|
|
||||||
private String pipelineId;
|
private String pipelineId;
|
||||||
private List<SimulatedItemResponse> responses;
|
private List<SimulateDocumentResult> responses;
|
||||||
|
|
||||||
public SimulatePipelineResponse() {
|
public SimulatePipelineResponse() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimulatePipelineResponse(String pipelineId, List<SimulatedItemResponse> responses) {
|
public SimulatePipelineResponse(String pipelineId, List<SimulateDocumentResult> responses) {
|
||||||
this.pipelineId = pipelineId;
|
this.pipelineId = pipelineId;
|
||||||
this.responses = Collections.unmodifiableList(responses);
|
this.responses = Collections.unmodifiableList(responses);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String pipelineId() {
|
public String getPipelineId() {
|
||||||
return pipelineId;
|
return pipelineId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void pipelineId(String pipelineId) {
|
public void setPipelineId(String pipelineId) {
|
||||||
this.pipelineId = pipelineId;
|
this.pipelineId = pipelineId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<SimulatedItemResponse> responses() {
|
public List<SimulateDocumentResult> getResponses() {
|
||||||
return responses;
|
return responses;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void responses(List<SimulatedItemResponse> responses) {
|
public void setResponses(List<SimulateDocumentResult> responses) {
|
||||||
this.responses = responses;
|
this.responses = responses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeString(pipelineId);
|
out.writeString(pipelineId);
|
||||||
out.writeVInt(responses.size());
|
out.writeVInt(responses.size());
|
||||||
for (SimulatedItemResponse response : responses) {
|
for (SimulateDocumentResult response : responses) {
|
||||||
response.writeTo(out);
|
response.writeTo(out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
|
||||||
int responsesLength = in.readVInt();
|
int responsesLength = in.readVInt();
|
||||||
responses = new ArrayList<>();
|
responses = new ArrayList<>();
|
||||||
for (int i = 0; i < responsesLength; i++) {
|
for (int i = 0; i < responsesLength; i++) {
|
||||||
SimulatedItemResponse response = new SimulatedItemResponse();
|
SimulateDocumentResult response = new SimulateDocumentResult();
|
||||||
response.readFrom(in);
|
response.readFrom(in);
|
||||||
responses.add(response);
|
responses.add(response);
|
||||||
}
|
}
|
||||||
|
@ -88,25 +88,15 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startArray("docs");
|
builder.startArray(Fields.DOCUMENTS);
|
||||||
for (SimulatedItemResponse response : responses) {
|
for (SimulateDocumentResult response : responses) {
|
||||||
builder.value(response);
|
response.toXContent(builder, params);
|
||||||
}
|
}
|
||||||
builder.endArray();
|
builder.endArray();
|
||||||
|
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public RestStatus status() {
|
|
||||||
for (SimulatedItemResponse response : responses) {
|
|
||||||
if (response.isFailed()) {
|
|
||||||
return RestStatus.BAD_REQUEST;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return RestStatus.OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) return true;
|
if (this == o) return true;
|
||||||
|
@ -120,4 +110,8 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(pipelineId, responses);
|
return Objects.hash(pipelineId, responses);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final class Fields {
|
||||||
|
static final XContentBuilderString DOCUMENTS = new XContentBuilderString("docs");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,22 +46,21 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
|
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
|
||||||
Map<String, Object> source = XContentHelper.convertToMap(request.source(), false).v2();
|
Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false).v2();
|
||||||
|
|
||||||
ParsedSimulateRequest simulateRequest;
|
ParsedSimulateRequest simulateRequest;
|
||||||
ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser();
|
ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser();
|
||||||
try {
|
try {
|
||||||
simulateRequest = parser.parse(request.id(), source, request.verbose(), pipelineStore);
|
if (request.getId() != null) {
|
||||||
|
simulateRequest = parser.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
|
||||||
|
} else {
|
||||||
|
simulateRequest = parser.parse(source, request.isVerbose(), pipelineStore);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
executionService.execute(simulateRequest, new SimulateExecutionService.Listener() {
|
executionService.execute(simulateRequest, listener);
|
||||||
@Override
|
|
||||||
public void onResponse(SimulatePipelineResponse response) {
|
|
||||||
listener.onResponse(response);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
|
@ -26,27 +26,27 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
import org.elasticsearch.ingest.Data;
|
import org.elasticsearch.ingest.Data;
|
||||||
|
import org.elasticsearch.plugin.ingest.transport.TransportData;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class ProcessorResult implements Streamable, ToXContent {
|
public class SimulateProcessorResult implements Streamable, ToXContent {
|
||||||
|
|
||||||
private String processorId;
|
private String processorId;
|
||||||
private Data data;
|
private TransportData data;
|
||||||
private Throwable failure;
|
private Throwable failure;
|
||||||
|
|
||||||
public ProcessorResult() {
|
public SimulateProcessorResult() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProcessorResult(String processorId, Data data) {
|
public SimulateProcessorResult(String processorId, Data data) {
|
||||||
this.processorId = processorId;
|
this.processorId = processorId;
|
||||||
this.data = data;
|
this.data = new TransportData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProcessorResult(String processorId, Throwable failure) {
|
public SimulateProcessorResult(String processorId, Throwable failure) {
|
||||||
this.processorId = processorId;
|
this.processorId = processorId;
|
||||||
this.failure = failure;
|
this.failure = failure;
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@ public class ProcessorResult implements Streamable, ToXContent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Data getData() {
|
public Data getData() {
|
||||||
return data;
|
return data.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getProcessorId() {
|
public String getProcessorId() {
|
||||||
|
@ -69,12 +69,8 @@ public class ProcessorResult implements Streamable, ToXContent {
|
||||||
if (isFailure) {
|
if (isFailure) {
|
||||||
this.failure = in.readThrowable();
|
this.failure = in.readThrowable();
|
||||||
} else {
|
} else {
|
||||||
this.processorId = in.readString();
|
this.data = new TransportData();
|
||||||
String index = in.readString();
|
this.data.readFrom(in);
|
||||||
String type = in.readString();
|
|
||||||
String id = in.readString();
|
|
||||||
Map<String, Object> doc = in.readMap();
|
|
||||||
this.data = new Data(index, type, id, doc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,10 +81,7 @@ public class ProcessorResult implements Streamable, ToXContent {
|
||||||
out.writeThrowable(failure);
|
out.writeThrowable(failure);
|
||||||
} else {
|
} else {
|
||||||
out.writeString(processorId);
|
out.writeString(processorId);
|
||||||
out.writeString(data.getIndex());
|
data.writeTo(out);
|
||||||
out.writeString(data.getType());
|
|
||||||
out.writeString(data.getId());
|
|
||||||
out.writeMap(data.getDocument());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,12 +89,10 @@ public class ProcessorResult implements Streamable, ToXContent {
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.field(Fields.PROCESSOR_ID, processorId);
|
builder.field(Fields.PROCESSOR_ID, processorId);
|
||||||
builder.field(Fields.ERROR, isFailed());
|
|
||||||
if (isFailed()) {
|
if (isFailed()) {
|
||||||
builder.field(Fields.ERROR_MESSAGE, ExceptionsHelper.detailedMessage(failure));
|
ElasticsearchException.renderThrowable(builder, params, failure);
|
||||||
} else {
|
} else {
|
||||||
builder.field(Fields.MODIFIED, data.isModified());
|
data.toXContent(builder, params);
|
||||||
builder.field(Fields.DOCUMENT, data.asMap());
|
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
@ -109,11 +100,13 @@ public class ProcessorResult implements Streamable, ToXContent {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (obj == this) { return true; }
|
if (obj == this) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
if (obj == null || getClass() != obj.getClass()) {
|
if (obj == null || getClass() != obj.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
ProcessorResult other = (ProcessorResult) obj;
|
SimulateProcessorResult other = (SimulateProcessorResult) obj;
|
||||||
return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure);
|
return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,10 +116,6 @@ public class ProcessorResult implements Streamable, ToXContent {
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class Fields {
|
static final class Fields {
|
||||||
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
|
|
||||||
static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id");
|
static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id");
|
||||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
|
||||||
static final XContentBuilderString ERROR_MESSAGE = new XContentBuilderString("error_message");
|
|
||||||
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -84,4 +85,29 @@ public class DataTests extends ESTestCase {
|
||||||
data.addField("fizz.new", "bar");
|
data.addField("fizz.new", "bar");
|
||||||
assertThat(data.getProperty("fizz.new"), equalTo("bar"));
|
assertThat(data.getProperty("fizz.new"), equalTo("bar"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testEquals() {
|
||||||
|
Data otherData = new Data(data);
|
||||||
|
assertThat(otherData, equalTo(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNotEqualsDiffIndex() {
|
||||||
|
Data otherData = new Data(data.getIndex() + "foo", data.getType(), data.getId(), data.getDocument());
|
||||||
|
assertThat(otherData, not(equalTo(data)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNotEqualsDiffType() {
|
||||||
|
Data otherData = new Data(data.getIndex(), data.getType() + "foo", data.getId(), data.getDocument());
|
||||||
|
assertThat(otherData, not(equalTo(data)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNotEqualsDiffId() {
|
||||||
|
Data otherData = new Data(data.getIndex(), data.getType(), data.getId() + "foo", data.getDocument());
|
||||||
|
assertThat(otherData, not(equalTo(data)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNotEqualsDiffDocument() {
|
||||||
|
Data otherData = new Data(data.getIndex(), data.getType(), data.getId(), Collections.emptyMap());
|
||||||
|
assertThat(otherData, not(equalTo(data)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
|
||||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
|
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
|
||||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
|
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
|
||||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
|
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
|
||||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatedItemResponse;
|
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateDocumentResult;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
|
@ -104,8 +104,8 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
Map<String, Object> expectedDoc = new HashMap<>();
|
Map<String, Object> expectedDoc = new HashMap<>();
|
||||||
expectedDoc.put("foo", "bar");
|
expectedDoc.put("foo", "bar");
|
||||||
Data expectedData = new Data("index", "type", "id", expectedDoc);
|
Data expectedData = new Data("index", "type", "id", expectedDoc);
|
||||||
SimulatedItemResponse expectedResponse = new SimulatedItemResponse(expectedData);
|
SimulateDocumentResult expectedResponse = new SimulateDocumentResult(expectedData);
|
||||||
List<SimulatedItemResponse> expectedResponses = Arrays.asList(expectedResponse);
|
List<SimulateDocumentResult> expectedResponses = Arrays.asList(expectedResponse);
|
||||||
SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses);
|
SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses);
|
||||||
|
|
||||||
assertThat(response, equalTo(expected));
|
assertThat(response, equalTo(expected));
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.plugin.ingest.transport;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.Data;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.not;
|
||||||
|
|
||||||
|
public class TransportDataTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testEquals() throws Exception {
|
||||||
|
Data data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||||
|
Data otherData = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||||
|
assertThat(data, equalTo(otherData));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNotEquals() throws Exception {
|
||||||
|
Data data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||||
|
Data otherData = new Data("_index2", "_type", "_id", Collections.emptyMap());
|
||||||
|
assertThat(data, not(equalTo(otherData)));
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,28 +35,26 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class ParsedSimulateRequestParserTests extends ESTestCase {
|
public class ParsedSimulateRequestParserTests extends ESTestCase {
|
||||||
private static final ParsedSimulateRequest.Parser PARSER = new ParsedSimulateRequest.Parser();
|
|
||||||
|
|
||||||
private Map<String, Processor.Factory> processorRegistry;
|
|
||||||
private PipelineStore store;
|
private PipelineStore store;
|
||||||
private Processor processor;
|
private ParsedSimulateRequest.Parser parser;
|
||||||
private Pipeline pipeline;
|
private Pipeline pipeline;
|
||||||
private Data data;
|
private Data data;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
|
parser = new ParsedSimulateRequest.Parser();
|
||||||
List<String> uppercase = Collections.unmodifiableList(Collections.singletonList("foo"));
|
List<String> uppercase = Collections.unmodifiableList(Collections.singletonList("foo"));
|
||||||
processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
|
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
|
||||||
pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.unmodifiableList(Arrays.asList(processor)));
|
pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.unmodifiableList(Arrays.asList(processor)));
|
||||||
data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||||
processorRegistry = new HashMap<>();
|
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
|
||||||
processorRegistry.put("mutate", new MutateProcessor.Factory());
|
processorRegistry.put("mutate", new MutateProcessor.Factory());
|
||||||
store = mock(PipelineStore.class);
|
store = mock(PipelineStore.class);
|
||||||
when(store.get("_id")).thenReturn(pipeline);
|
when(store.get("_id")).thenReturn(pipeline);
|
||||||
when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry);
|
when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testParse_UsingPipelineStore() throws Exception {
|
public void testParseUsingPipelineStore() throws Exception {
|
||||||
ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
|
ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
|
||||||
|
|
||||||
Map<String, Object> raw = new HashMap<>();
|
Map<String, Object> raw = new HashMap<>();
|
||||||
|
@ -65,14 +63,15 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
|
||||||
doc.put("_index", "_index");
|
doc.put("_index", "_index");
|
||||||
doc.put("_type", "_type");
|
doc.put("_type", "_type");
|
||||||
doc.put("_id", "_id");
|
doc.put("_id", "_id");
|
||||||
|
doc.put("_source", data.getDocument());
|
||||||
docs.add(doc);
|
docs.add(doc);
|
||||||
raw.put("docs", docs);
|
raw.put("docs", docs);
|
||||||
|
|
||||||
ParsedSimulateRequest actualRequest = PARSER.parse("_id", raw, false, store);
|
ParsedSimulateRequest actualRequest = parser.parseWithPipelineId("_id", raw, false, store);
|
||||||
assertThat(actualRequest, equalTo(expectedRequest));
|
assertThat(actualRequest, equalTo(expectedRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testParse_ProvidedPipeline() throws Exception {
|
public void testParseWithProvidedPipeline() throws Exception {
|
||||||
ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
|
ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
|
||||||
|
|
||||||
Map<String, Object> raw = new HashMap<>();
|
Map<String, Object> raw = new HashMap<>();
|
||||||
|
@ -81,6 +80,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
|
||||||
doc.put("_index", "_index");
|
doc.put("_index", "_index");
|
||||||
doc.put("_type", "_type");
|
doc.put("_type", "_type");
|
||||||
doc.put("_id", "_id");
|
doc.put("_id", "_id");
|
||||||
|
doc.put("_source", data.getDocument());
|
||||||
docs.add(doc);
|
docs.add(doc);
|
||||||
|
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
|
@ -91,7 +91,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
|
||||||
raw.put("docs", docs);
|
raw.put("docs", docs);
|
||||||
raw.put("pipeline", pipelineConfig);
|
raw.put("pipeline", pipelineConfig);
|
||||||
|
|
||||||
ParsedSimulateRequest actualRequest = PARSER.parse(null, raw, false, store);
|
ParsedSimulateRequest actualRequest = parser.parse(raw, false, store);
|
||||||
assertThat(actualRequest, equalTo(expectedRequest));
|
assertThat(actualRequest, equalTo(expectedRequest));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,12 @@
|
||||||
|
|
||||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.ingest.Data;
|
import org.elasticsearch.ingest.Data;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
import org.elasticsearch.ingest.processor.Processor;
|
||||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
import org.elasticsearch.plugin.ingest.transport.TransportData;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -37,26 +38,28 @@ import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
public class SimulateExecutionServiceTests extends ESTestCase {
|
public class SimulateExecutionServiceTests extends ESTestCase {
|
||||||
|
|
||||||
private PipelineStore store;
|
|
||||||
private ThreadPool threadPool;
|
private ThreadPool threadPool;
|
||||||
private SimulateExecutionService executionService;
|
private SimulateExecutionService executionService;
|
||||||
private Pipeline pipeline;
|
private Pipeline pipeline;
|
||||||
private Processor processor;
|
private Processor processor;
|
||||||
private Data data;
|
private Data data;
|
||||||
|
private TransportData transportData;
|
||||||
|
private ActionListener<SimulatePipelineResponse> listener;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
store = mock(PipelineStore.class);
|
|
||||||
threadPool = new ThreadPool(
|
threadPool = new ThreadPool(
|
||||||
Settings.builder()
|
Settings.builder()
|
||||||
.put("name", "_name")
|
.put("name", getClass().getName())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
executionService = new SimulateExecutionService(threadPool);
|
executionService = new SimulateExecutionService(threadPool);
|
||||||
processor = mock(Processor.class);
|
processor = mock(Processor.class);
|
||||||
when(processor.getType()).thenReturn("mock");
|
when(processor.getType()).thenReturn("mock");
|
||||||
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor));
|
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor));
|
||||||
data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||||
|
transportData = new TransportData(data);
|
||||||
|
listener = mock(ActionListener.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -65,43 +68,42 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecuteVerboseItem() throws Exception {
|
public void testExecuteVerboseItem() throws Exception {
|
||||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
|
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(
|
||||||
Arrays.asList(new ProcessorResult("processor[mock]-0", data), new ProcessorResult("processor[mock]-1", data)));
|
Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data)));
|
||||||
SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
||||||
verify(processor, times(2)).execute(data);
|
verify(processor, times(2)).execute(data);
|
||||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecuteItem() throws Exception {
|
public void testExecuteItem() throws Exception {
|
||||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(data);
|
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(data);
|
||||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data);
|
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
|
||||||
verify(processor, times(2)).execute(data);
|
verify(processor, times(2)).execute(data);
|
||||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecuteVerboseItem_Failure() throws Exception {
|
public void testExecuteVerboseItemWithFailure() throws Exception {
|
||||||
Exception e = new RuntimeException("processor failed");
|
Exception e = new RuntimeException("processor failed");
|
||||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
|
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(
|
||||||
Arrays.asList(new ProcessorResult("processor[mock]-0", e), new ProcessorResult("processor[mock]-1", data))
|
Arrays.asList(new SimulateProcessorResult("processor[mock]-0", e), new SimulateProcessorResult("processor[mock]-1", data))
|
||||||
);
|
);
|
||||||
doThrow(e).doNothing().when(processor).execute(data);
|
doThrow(e).doNothing().when(processor).execute(data);
|
||||||
SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
||||||
verify(processor, times(2)).execute(data);
|
verify(processor, times(2)).execute(data);
|
||||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecuteItem_Failure() throws Exception {
|
public void testExecuteItemWithFailure() throws Exception {
|
||||||
Exception e = new RuntimeException("processor failed");
|
Exception e = new RuntimeException("processor failed");
|
||||||
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(e);
|
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(e);
|
||||||
doThrow(e).when(processor).execute(data);
|
doThrow(e).when(processor).execute(data);
|
||||||
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data);
|
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
|
||||||
verify(processor, times(1)).execute(data);
|
verify(processor, times(1)).execute(data);
|
||||||
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
assertThat(actualItemResponse, equalTo(expectedItemResponse));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecute() throws Exception {
|
public void testExecute() throws Exception {
|
||||||
SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class);
|
SimulateDocumentResult itemResponse = new SimulateDocumentResult(data);
|
||||||
SimulatedItemResponse itemResponse = new SimulatedItemResponse(data);
|
|
||||||
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
|
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
|
||||||
executionService.execute(request, listener);
|
executionService.execute(request, listener);
|
||||||
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
|
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
|
||||||
|
@ -114,11 +116,10 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecute_Verbose() throws Exception {
|
public void testExecuteWithVerbose() throws Exception {
|
||||||
SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class);
|
|
||||||
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true);
|
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true);
|
||||||
SimulatedItemResponse itemResponse = new SimulatedItemResponse(
|
SimulateDocumentResult itemResponse = new SimulateDocumentResult(
|
||||||
Arrays.asList(new ProcessorResult("processor[mock]-0", data), new ProcessorResult("processor[mock]-1", data)));
|
Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data)));
|
||||||
executionService.execute(request, listener);
|
executionService.execute(request, listener);
|
||||||
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
|
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
|
||||||
assertBusy(new Runnable() {
|
assertBusy(new Runnable() {
|
||||||
|
|
|
@ -47,8 +47,7 @@
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
- length: { docs: 1 }
|
- length: { docs: 1 }
|
||||||
- is_false: docs.0.error
|
- is_true: docs.0.doc.modified
|
||||||
- is_true: docs.0.modified
|
|
||||||
- match: { docs.0.doc._source.foo: "bar" }
|
- match: { docs.0.doc._source.foo: "bar" }
|
||||||
- match: { docs.0.doc._source.field2: "_value" }
|
- match: { docs.0.doc._source.field2: "_value" }
|
||||||
|
|
||||||
|
@ -132,8 +131,7 @@
|
||||||
- length: { docs: 1 }
|
- length: { docs: 1 }
|
||||||
- length: { docs.0.processor_results: 2 }
|
- length: { docs.0.processor_results: 2 }
|
||||||
- match: { docs.0.processor_results.0.processor_id: "processor[mutate]-0" }
|
- match: { docs.0.processor_results.0.processor_id: "processor[mutate]-0" }
|
||||||
- is_false: docs.0.processor_results.0.error
|
- is_true: docs.0.processor_results.0.doc.modified
|
||||||
- is_true: docs.0.processor_results.0.modified
|
|
||||||
- length: { docs.0.processor_results.0.doc._source: 2 }
|
- length: { docs.0.processor_results.0.doc._source: 2 }
|
||||||
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
|
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
|
||||||
- match: { docs.0.processor_results.0.doc._source.field2: "_value" }
|
- match: { docs.0.processor_results.0.doc._source.field2: "_value" }
|
||||||
|
@ -149,7 +147,6 @@
|
||||||
wait_for_status: green
|
wait_for_status: green
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
catch: request
|
|
||||||
ingest.simulate:
|
ingest.simulate:
|
||||||
body: >
|
body: >
|
||||||
{
|
{
|
||||||
|
@ -183,10 +180,8 @@
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
- length: { docs: 2 }
|
- length: { docs: 2 }
|
||||||
- is_true: docs.0.error
|
- match: { docs.0.error.type: "null_pointer_exception" }
|
||||||
- match: { docs.0.error_message: "NullPointerException[null]" }
|
- is_true: docs.1.doc.modified
|
||||||
- is_false: docs.1.error
|
|
||||||
- is_true: docs.1.modified
|
|
||||||
- match: { docs.1.doc._source.foo: "BAR" }
|
- match: { docs.1.doc._source.foo: "BAR" }
|
||||||
|
|
||||||
---
|
---
|
||||||
|
@ -196,7 +191,6 @@
|
||||||
wait_for_status: green
|
wait_for_status: green
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
catch: request
|
|
||||||
ingest.simulate:
|
ingest.simulate:
|
||||||
verbose: true
|
verbose: true
|
||||||
body: >
|
body: >
|
||||||
|
@ -240,12 +234,11 @@
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
- length: { docs: 2 }
|
- length: { docs: 2 }
|
||||||
- is_true: docs.0.error
|
|
||||||
- is_false: docs.1.error
|
|
||||||
- length: { docs.0.processor_results: 2 }
|
- length: { docs.0.processor_results: 2 }
|
||||||
- is_false: docs.1.processor_results.0.error
|
- match: { docs.0.processor_results.0.error.type: "number_format_exception" }
|
||||||
- match: { docs.0.processor_results.0.error_message: "NumberFormatException[For input string: \"bar\"]" }
|
- match: { docs.0.processor_results.1.doc._index: "index" }
|
||||||
- is_false: docs.1.processor_results.1.error
|
- match: { docs.0.processor_results.1.doc._type: "type" }
|
||||||
|
- match: { docs.0.processor_results.1.doc._id: "id" }
|
||||||
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
|
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
|
||||||
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
|
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
|
||||||
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
|
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
|
||||||
|
|
Loading…
Reference in New Issue