updates, moar verbose

This commit is contained in:
Tal Levy 2015-11-11 18:20:40 -08:00
parent c22c1e0f54
commit b40af1bcfd
26 changed files with 899 additions and 170 deletions

View File

@ -44,6 +44,10 @@ public final class Data {
this.document = document; this.document = document;
} }
public Data(Data other) {
this(other.index, other.type, other.id, new HashMap<>(other.document));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> T getProperty(String path) { public <T> T getProperty(String path) {
// TODO: we should not rely on any core class, so we should have custom map extract value logic: // TODO: we should not rely on any core class, so we should have custom map extract value logic:

View File

@ -70,6 +70,21 @@ public final class Pipeline {
return processors; return processors;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Pipeline pipeline = (Pipeline) o;
return Objects.equals(id, pipeline.id) &&
Objects.equals(description, pipeline.description) &&
Objects.equals(processors, pipeline.processors);
}
@Override
public int hashCode() {
return Objects.hash(id, description, processors);
}
public final static class Factory { public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws IOException { public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws IOException {

View File

@ -77,39 +77,41 @@ public final class ConfigurationUtils {
* Returns and removes the specified property of type list from the specified configuration map. * Returns and removes the specified property of type list from the specified configuration map.
* *
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown. * If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
*/ */
public static List<String> readStringList(Map<String, Object> configuration, String propertyName) { public static <T> List<T> readOptionalList(Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); return null;
} }
return readStringList(propertyName, value); return readList(propertyName, value);
} }
/** /**
* Returns and removes the specified property of type list from the specified configuration map. * Returns and removes the specified property of type list from the specified configuration map.
* *
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown. * If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
*/ */
public static List<String> readOptionalStringList(Map<String, Object> configuration, String propertyName) { public static <T> List<T> readList(Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
return null; throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
}
return readStringList(propertyName, value);
} }
private static List<String> readStringList(String propertyName, Object value) { return readList(propertyName, value);
}
private static <T> List<T> readList(String propertyName, Object value) {
if (value instanceof List) { if (value instanceof List) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> stringList = (List<String>) value; List<T> stringList = (List<T>) value;
return stringList; return stringList;
} else { } else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
} }
} }
/** /**
* Returns and removes the specified property of type map from the specified configuration map. * Returns and removes the specified property of type map from the specified configuration map.
* *

View File

@ -38,6 +38,11 @@ public interface Processor {
*/ */
void execute(Data data); void execute(Data data);
/**
* Gets the type of a processor
*/
String getType();
/** /**
* A factory that knows how to construct a processor based on a map of maps. * A factory that knows how to construct a processor based on a map of maps.
*/ */
@ -54,6 +59,7 @@ public interface Processor {
default void setConfigDirectory(Path configDirectory) { default void setConfigDirectory(Path configDirectory) {
} }
@Override @Override
default void close() throws IOException { default void close() throws IOException {
} }

View File

@ -78,6 +78,11 @@ public final class DateProcessor implements Processor {
data.addField(targetField, ISODateTimeFormat.dateTime().print(dateTime)); data.addField(targetField, ISODateTimeFormat.dateTime().print(dateTime));
} }
@Override
public String getType() {
return TYPE;
}
DateTimeZone getTimezone() { DateTimeZone getTimezone() {
return timezone; return timezone;
} }
@ -108,7 +113,7 @@ public final class DateProcessor implements Processor {
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString); DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale"); String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale");
Locale locale = localeString == null ? Locale.ENGLISH : Locale.forLanguageTag(localeString); Locale locale = localeString == null ? Locale.ENGLISH : Locale.forLanguageTag(localeString);
List<String> matchFormats = ConfigurationUtils.readStringList(config, "match_formats"); List<String> matchFormats = ConfigurationUtils.readList(config, "match_formats");
return new DateProcessor(timezone, locale, matchField, matchFormats, targetField); return new DateProcessor(timezone, locale, matchField, matchFormats, targetField);
} }
} }

View File

@ -40,7 +40,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.*; import java.util.*;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringList; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readList;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
public final class GeoIpProcessor implements Processor { public final class GeoIpProcessor implements Processor {
@ -91,6 +91,11 @@ public final class GeoIpProcessor implements Processor {
data.addField(targetField, geoData); data.addField(targetField, geoData);
} }
@Override
public String getType() {
return TYPE;
}
String getSourceField() { String getSourceField() {
return sourceField; return sourceField;
} }
@ -222,7 +227,7 @@ public final class GeoIpProcessor implements Processor {
final Set<Field> fields; final Set<Field> fields;
if (config.containsKey("fields")) { if (config.containsKey("fields")) {
fields = EnumSet.noneOf(Field.class); fields = EnumSet.noneOf(Field.class);
List<String> fieldNames = readStringList(config, "fields"); List<String> fieldNames = readList(config, "fields");
for (String fieldName : fieldNames) { for (String fieldName : fieldNames) {
try { try {
fields.add(Field.parse(fieldName)); fields.add(Field.parse(fieldName));

View File

@ -56,6 +56,11 @@ public final class GrokProcessor implements Processor {
} }
} }
@Override
public String getType() {
return TYPE;
}
String getMatchField() { String getMatchField() {
return matchField; return matchField;
} }

View File

@ -134,6 +134,11 @@ public final class MutateProcessor implements Processor {
} }
} }
@Override
public String getType() {
return TYPE;
}
private void doUpdate(Data data) { private void doUpdate(Data data) {
for(Map.Entry<String, Object> entry : update.entrySet()) { for(Map.Entry<String, Object> entry : update.entrySet()) {
data.addField(entry.getKey(), entry.getValue()); data.addField(entry.getKey(), entry.getValue());
@ -272,6 +277,28 @@ public final class MutateProcessor implements Processor {
} }
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MutateProcessor that = (MutateProcessor) o;
return Objects.equals(update, that.update) &&
Objects.equals(rename, that.rename) &&
Objects.equals(convert, that.convert) &&
Objects.equals(split, that.split) &&
Objects.equals(gsub, that.gsub) &&
Objects.equals(join, that.join) &&
Objects.equals(remove, that.remove) &&
Objects.equals(trim, that.trim) &&
Objects.equals(uppercase, that.uppercase) &&
Objects.equals(lowercase, that.lowercase);
}
@Override
public int hashCode() {
return Objects.hash(update, rename, convert, split, gsub, join, remove, trim, uppercase, lowercase);
}
public static final class Factory implements Processor.Factory<MutateProcessor> { public static final class Factory implements Processor.Factory<MutateProcessor> {
@Override @Override
public MutateProcessor create(Map<String, Object> config) throws IOException { public MutateProcessor create(Map<String, Object> config) throws IOException {
@ -281,10 +308,10 @@ public final class MutateProcessor implements Processor {
Map<String, String> split = ConfigurationUtils.readOptionalMap(config, "split"); Map<String, String> split = ConfigurationUtils.readOptionalMap(config, "split");
Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalMap(config, "gsub"); Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalMap(config, "gsub");
Map<String, String> join = ConfigurationUtils.readOptionalMap(config, "join"); Map<String, String> join = ConfigurationUtils.readOptionalMap(config, "join");
List<String> remove = ConfigurationUtils.readOptionalStringList(config, "remove"); List<String> remove = ConfigurationUtils.readOptionalList(config, "remove");
List<String> trim = ConfigurationUtils.readOptionalStringList(config, "trim"); List<String> trim = ConfigurationUtils.readOptionalList(config, "trim");
List<String> uppercase = ConfigurationUtils.readOptionalStringList(config, "uppercase"); List<String> uppercase = ConfigurationUtils.readOptionalList(config, "uppercase");
List<String> lowercase = ConfigurationUtils.readOptionalStringList(config, "lowercase"); List<String> lowercase = ConfigurationUtils.readOptionalList(config, "lowercase");
// pre-compile regex patterns // pre-compile regex patterns
List<GsubExpression> gsubExpressions = null; List<GsubExpression> gsubExpressions = null;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
import org.elasticsearch.ingest.processor.grok.GrokProcessor; import org.elasticsearch.ingest.processor.grok.GrokProcessor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor; import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.simulate.SimulateExecutionService;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -41,6 +42,7 @@ public class IngestModule extends AbstractModule {
binder().bind(PipelineExecutionService.class).asEagerSingleton(); binder().bind(PipelineExecutionService.class).asEagerSingleton();
binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton(); binder().bind(PipelineStoreClient.class).asEagerSingleton();
binder().bind(SimulateExecutionService.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory()); addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());

View File

@ -25,11 +25,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.Data; import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
public class PipelineExecutionService { public class PipelineExecutionService {
static final String THREAD_POOL_NAME = IngestPlugin.NAME; static final String THREAD_POOL_NAME = IngestPlugin.NAME;
@ -43,25 +40,13 @@ public class PipelineExecutionService {
this.threadPool = threadPool; this.threadPool = threadPool;
} }
public Pipeline getPipeline(String pipelineId) {
Pipeline pipeline = store.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId));
}
return pipeline;
}
public void execute(Data data, String pipelineId, Listener listener) { public void execute(Data data, String pipelineId, Listener listener) {
try { Pipeline pipeline = store.get(pipelineId);
execute(data, getPipeline(pipelineId), listener); if (pipeline == null) {
} catch (IllegalArgumentException e) { listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId)));
listener.failed(e); return;
}
} }
public void execute(Data data, Pipeline pipeline, Listener listener) {
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() { threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
public class RestSimulatePipelineAction extends BaseRestHandler { public class RestSimulatePipelineAction extends BaseRestHandler {
@ -37,15 +38,20 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
super(settings, controller, client); super(settings, controller, client);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this); controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this); controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this);
// controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this);
// controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
} }
@Override @Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
SimulatePipelineRequest request = new SimulatePipelineRequest(); SimulatePipelineRequest request = new SimulatePipelineRequest();
request.id(restRequest.param("id")); request.id(restRequest.param("id"));
if (restRequest.hasContent()) { request.verbose(restRequest.paramAsBoolean("verbose", false));
request.source(restRequest.content());
if (RestActions.hasBodyContent(restRequest)) {
request.source(RestActions.getRestContent(restRequest));
} }
client.execute(SimulatePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); client.execute(SimulatePipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
} }
} }

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.simulate;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException;
import java.util.*;
public class ParsedSimulateRequest {
private final List<Data> documents;
private final Pipeline pipeline;
private final boolean verbose;
ParsedSimulateRequest(Pipeline pipeline, List<Data> documents, boolean verbose) {
this.pipeline = pipeline;
this.documents = Collections.unmodifiableList(documents);
this.verbose = verbose;
}
public Pipeline getPipeline() {
return pipeline;
}
public List<Data> getDocuments() {
return documents;
}
public boolean isVerbose() {
return verbose;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ParsedSimulateRequest that = (ParsedSimulateRequest) o;
return Objects.equals(verbose, that.verbose) &&
Objects.equals(documents, that.documents) &&
Objects.equals(pipeline, that.pipeline);
}
@Override
public int hashCode() {
return Objects.hash(documents, pipeline, verbose);
}
public static class Parser {
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
public ParsedSimulateRequest parse(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
Pipeline pipeline;
// if pipeline `id` passed to request, fetch pipeline from store.
if (pipelineId != null) {
pipeline = pipelineStore.get(pipelineId);
} else {
Map<String, Object> pipelineConfig = ConfigurationUtils.readOptionalMap(config, "pipeline");
pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
}
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, "docs");
List<Data> dataList = new ArrayList<>();
for (int i = 0; i < docs.size(); i++) {
Map<String, Object> dataMap = docs.get(i);
Map<String, Object> document = ConfigurationUtils.readOptionalMap(dataMap, "_source");
if (document == null) {
document = Collections.emptyMap();
}
Data data = new Data(ConfigurationUtils.readOptionalStringProperty(dataMap, "_index"),
ConfigurationUtils.readOptionalStringProperty(dataMap, "_type"),
ConfigurationUtils.readOptionalStringProperty(dataMap, "_id"),
document);
dataList.add(data);
}
return new ParsedSimulateRequest(pipeline, dataList, verbose);
}
}
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.plugin.ingest.transport.simulate; package org.elasticsearch.plugin.ingest.simulate;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,24 +31,26 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class SimulatedItemResponse implements Streamable, StatusToXContent { public class ProcessedData implements Streamable, StatusToXContent {
private String processorId;
private Data data; private Data data;
private Throwable failure; private Throwable failure;
public SimulatedItemResponse() { public ProcessedData() {
} }
public SimulatedItemResponse(Data data) { public ProcessedData(String processorId, Data data) {
this.processorId = processorId;
this.data = data; this.data = data;
} }
public SimulatedItemResponse(Throwable failure) { public ProcessedData(Throwable failure) {
this.failure = failure; this.failure = failure;
} }
public boolean failed() { public boolean isFailed() {
return this.failure != null; return this.failure != null;
} }
@ -56,14 +58,18 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
return data; return data;
} }
public String getProcessorId() {
return processorId;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
boolean failed = in.readBoolean(); boolean isFailure = in.readBoolean();
if (isFailure) {
if (failed) {
this.failure = in.readThrowable(); this.failure = in.readThrowable();
// TODO(talevy): check out mget for throwable limitations // TODO(talevy): check out mget for throwable limitations
} else { } else {
this.processorId = in.readString();
String index = in.readString(); String index = in.readString();
String type = in.readString(); String type = in.readString();
String id = in.readString(); String id = in.readString();
@ -74,11 +80,11 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(failed()); out.writeBoolean(isFailed());
if (isFailed()) {
if (failed()) {
out.writeThrowable(failure); out.writeThrowable(failure);
} else { } else {
out.writeString(processorId);
out.writeString(data.getIndex()); out.writeString(data.getIndex());
out.writeString(data.getType()); out.writeString(data.getType());
out.writeString(data.getId()); out.writeString(data.getId());
@ -89,8 +95,9 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(Fields.ERROR, failed()); builder.field(Fields.PROCESSOR_ID, processorId);
if (failed()) { builder.field(Fields.ERROR, isFailed());
if (isFailed()) {
builder.field(Fields.FAILURE, failure.toString()); builder.field(Fields.FAILURE, failure.toString());
} else { } else {
builder.field(Fields.MODIFIED, data.isModified()); builder.field(Fields.MODIFIED, data.isModified());
@ -102,7 +109,7 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
@Override @Override
public RestStatus status() { public RestStatus status() {
if (failed()) { if (isFailed()) {
return RestStatus.BAD_REQUEST; return RestStatus.BAD_REQUEST;
} else { } else {
return RestStatus.OK; return RestStatus.OK;
@ -115,17 +122,18 @@ public class SimulatedItemResponse implements Streamable, StatusToXContent {
if (obj == null || getClass() != obj.getClass()) { if (obj == null || getClass() != obj.getClass()) {
return false; return false;
} }
SimulatedItemResponse other = (SimulatedItemResponse) obj; ProcessedData other = (ProcessedData) obj;
return Objects.equals(data, other.data) && Objects.equals(failure, other.failure); return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(data, failure); return Objects.hash(processorId, data, failure);
} }
static final class Fields { static final class Fields {
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc"); static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id");
static final XContentBuilderString ERROR = new XContentBuilderString("error"); static final XContentBuilderString ERROR = new XContentBuilderString("error");
static final XContentBuilderString FAILURE = new XContentBuilderString("failure"); static final XContentBuilderString FAILURE = new XContentBuilderString("failure");
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified"); static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.simulate;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
public class SimulateExecutionService {
static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
private final ThreadPool threadPool;
@Inject
public SimulateExecutionService(ThreadPool threadPool) {
this.threadPool = threadPool;
}
SimulatedItemResponse executeItem(Pipeline pipeline, Data data, boolean verbose) {
try {
if (verbose) {
return executeVerboseItem(pipeline, data);
} else {
pipeline.execute(data);
return new SimulatedItemResponse(data);
}
} catch (Exception e) {
return new SimulatedItemResponse(e);
}
}
SimulatedItemResponse executeVerboseItem(Pipeline pipeline, Data data) {
List<ProcessedData> processedDataList = new ArrayList<>();
Data currentData = new Data(data);
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
Processor processor = pipeline.getProcessors().get(i);
String processorId = "processor[" + processor.getType() + "]-" + i;
processor.execute(currentData);
processedDataList.add(new ProcessedData(processorId, currentData));
currentData = new Data(currentData);
}
return new SimulatedItemResponse(processedDataList);
}
SimulatePipelineResponse execute(ParsedSimulateRequest request) {
List<SimulatedItemResponse> responses = new ArrayList<>();
for (Data data : request.getDocuments()) {
responses.add(executeItem(request.getPipeline(), data, request.isVerbose()));
}
return new SimulatePipelineResponse(request.getPipeline().getId(), responses);
}
public void execute(ParsedSimulateRequest request, Listener listener) {
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
@Override
public void run() {
SimulatePipelineResponse response = execute(request);
listener.onResponse(response);
}
});
}
public interface Listener {
void onResponse(SimulatePipelineResponse response);
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.simulate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class SimulatedItemResponse implements Streamable, StatusToXContent {
private Data data;
private List<ProcessedData> processedDataList;
private Throwable failure;
public SimulatedItemResponse() {
}
public SimulatedItemResponse(Data data) {
this.data = data;
}
public SimulatedItemResponse(List<ProcessedData> processedDataList) {
this.processedDataList = processedDataList;
}
public SimulatedItemResponse(Throwable failure) {
this.failure = failure;
}
public boolean isFailed() {
return this.failure != null;
}
public boolean isVerbose() {
return this.processedDataList != null;
}
public Data getData() {
return data;
}
public List<ProcessedData> getProcessedDataList() {
return processedDataList;
}
@Override
public void readFrom(StreamInput in) throws IOException {
boolean isFailed = in.readBoolean();
boolean isVerbose = in.readBoolean();
if (isFailed) {
this.failure = in.readThrowable();
// TODO(talevy): check out mget for throwable limitations
} else if (isVerbose) {
int size = in.readVInt();
processedDataList = new ArrayList<>();
for (int i = 0; i < size; i++) {
ProcessedData processedData = new ProcessedData();
processedData.readFrom(in);
processedDataList.add(processedData);
}
} else {
String index = in.readString();
String type = in.readString();
String id = in.readString();
Map<String, Object> doc = in.readMap();
this.data = new Data(index, type, id, doc);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isFailed());
out.writeBoolean(isVerbose());
if (isFailed()) {
out.writeThrowable(failure);
} else if (isVerbose()) {
out.writeVInt(processedDataList.size());
for (ProcessedData p : processedDataList) {
p.writeTo(out);
}
} else {
out.writeString(data.getIndex());
out.writeString(data.getType());
out.writeString(data.getId());
out.writeMap(data.getDocument());
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.ERROR, isFailed());
builder.field(Fields.VERBOSE, isVerbose());
if (isFailed()) {
builder.field(Fields.FAILURE, failure.toString());
} else if (isVerbose()) {
builder.startArray(Fields.PROCESSOR_STEPS);
for (ProcessedData processedData : processedDataList) {
builder.value(processedData);
}
builder.endArray();
} else {
builder.field(Fields.MODIFIED, data.isModified());
builder.field(Fields.DOCUMENT, data.getDocument());
}
builder.endObject();
return builder;
}
@Override
public RestStatus status() {
if (isFailed()) {
return RestStatus.BAD_REQUEST;
} else {
return RestStatus.OK;
}
}
@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
if (obj == null || getClass() != obj.getClass()) {
return false;
}
SimulatedItemResponse other = (SimulatedItemResponse) obj;
return Objects.equals(data, other.data) && Objects.equals(processedDataList, other.processedDataList) && Objects.equals(failure, other.failure);
}
@Override
public int hashCode() {
return Objects.hash(data, processedDataList, failure);
}
static final class Fields {
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
static final XContentBuilderString VERBOSE = new XContentBuilderString("verbose");
static final XContentBuilderString FAILURE = new XContentBuilderString("failure");
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
static final XContentBuilderString PROCESSOR_STEPS = new XContentBuilderString("processor_steps");
}
}

View File

@ -32,6 +32,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SimulatePipelineRequest extends ActionRequest { public class SimulatePipelineRequest extends ActionRequest {
private String id; private String id;
private boolean verbose;
private BytesReference source; private BytesReference source;
@Override @Override
@ -51,6 +52,14 @@ public class SimulatePipelineRequest extends ActionRequest {
this.id = id; this.id = id;
} }
public boolean verbose() {
return verbose;
}
public void verbose(boolean verbose) {
this.verbose = verbose;
}
public BytesReference source() { public BytesReference source() {
return source; return source;
} }
@ -63,6 +72,7 @@ public class SimulatePipelineRequest extends ActionRequest {
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
id = in.readString(); id = in.readString();
verbose = in.readBoolean();
source = in.readBytesReference(); source = in.readBytesReference();
} }
@ -70,6 +80,7 @@ public class SimulatePipelineRequest extends ActionRequest {
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(id); out.writeString(id);
out.writeBoolean(verbose);
out.writeBytesReference(source); out.writeBytesReference(source);
} }
} }

View File

@ -34,6 +34,11 @@ public class SimulatePipelineRequestBuilder extends ActionRequestBuilder<Simulat
return this; return this;
} }
public SimulatePipelineRequestBuilder setVerbose(boolean verbose) {
request.verbose(verbose);
return this;
}
public SimulatePipelineRequestBuilder setSource(BytesReference source) { public SimulatePipelineRequestBuilder setSource(BytesReference source) {
request.source(source); request.source(source);
return this; return this;

View File

@ -1,99 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class SimulatePipelineRequestPayload {
private final List<Data> documents;
private final Pipeline pipeline;
public SimulatePipelineRequestPayload(Pipeline pipeline, List<Data> documents) {
this.pipeline = pipeline;
this.documents = Collections.unmodifiableList(documents);
}
public String pipelineId() {
return pipeline.getId();
}
public Pipeline pipeline() {
return pipeline;
}
public List<Data> documents() {
return documents;
}
public SimulatePipelineResponse execute() {
List<SimulatedItemResponse> responses = new ArrayList<>();
for (Data data : documents) {
try {
pipeline.execute(data);
responses.add(new SimulatedItemResponse(data));
} catch (Exception e) {
responses.add(new SimulatedItemResponse(e));
}
}
return new SimulatePipelineResponse(pipeline.getId(), responses);
}
public static class Factory {
public SimulatePipelineRequestPayload create(String pipelineId, Map<String, Object> config, PipelineStore pipelineStore) throws IOException {
Pipeline pipeline;
// if pipeline `id` passed to request, fetch pipeline from store.
if (pipelineId != null) {
pipeline = pipelineStore.get(pipelineId);
} else {
Map<String, Object> pipelineConfig = (Map<String, Object>) config.get("pipeline");
pipeline = (new Pipeline.Factory()).create("_pipeline_id", pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
}
// distribute docs by shard key to SimulateShardPipelineResponse
List<Map<String, Object>> docs = (List<Map<String, Object>>) config.get("docs");
List<Data> dataList = new ArrayList<>();
for (int i = 0; i < docs.size(); i++) {
Map<String, Object> dataMap = docs.get(i);
Map<String, Object> document = (Map<String, Object>) dataMap.get("_source");
Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, "_index", null),
ConfigurationUtils.readStringProperty(dataMap, "_type", null),
ConfigurationUtils.readStringProperty(dataMap, "_id", null),
document);
dataList.add(data);
}
return new SimulatePipelineRequestPayload(pipeline, dataList);
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent; import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.simulate.SimulatedItemResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
@ -100,7 +101,7 @@ public class SimulatePipelineResponse extends ActionResponse implements StatusTo
@Override @Override
public RestStatus status() { public RestStatus status() {
for (SimulatedItemResponse response : responses) { for (SimulatedItemResponse response : responses) {
if (response.failed()) { if (response.isFailed()) {
return RestStatus.BAD_REQUEST; return RestStatus.BAD_REQUEST;
} }
} }

View File

@ -26,7 +26,10 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.plugin.ingest.simulate.ParsedSimulateRequest;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.simulate.SimulateExecutionService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -34,32 +37,33 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> { public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
private final PipelineStore pipelineStore; private final PipelineStore pipelineStore;
private final SimulateExecutionService executionService;
@Inject @Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) { public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore, SimulateExecutionService executionService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.pipelineStore = pipelineStore; this.pipelineStore = pipelineStore;
this.executionService = executionService;
} }
@Override @Override
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) { protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
Map<String, Object> source = XContentHelper.convertToMap(request.source(), false).v2(); Map<String, Object> source = XContentHelper.convertToMap(request.source(), false).v2();
SimulatePipelineRequestPayload payload; ParsedSimulateRequest payload;
SimulatePipelineRequestPayload.Factory factory = new SimulatePipelineRequestPayload.Factory(); ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser();
try { try {
payload = factory.create(request.id(), source, pipelineStore); payload = parser.parse(request.id(), source, request.verbose(), pipelineStore);
} catch (IOException e) { } catch (IOException e) {
listener.onFailure(e); listener.onFailure(e);
return; return;
} }
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { executionService.execute(payload, new SimulateExecutionService.Listener() {
@Override @Override
public void run() { public void onResponse(SimulatePipelineResponse response) {
listener.onResponse(payload.execute()); listener.onResponse(response);
} }
}); });
} }

View File

@ -33,7 +33,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatedItemResponse; import org.elasticsearch.plugin.ingest.simulate.SimulatedItemResponse;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;

View File

@ -59,7 +59,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
// TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer> // TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer>
public void testOptional_InvalidType() { public void testOptional_InvalidType() {
List<String> val = ConfigurationUtils.readStringList(config, "int"); List<String> val = ConfigurationUtils.readList(config, "int");
assertThat(val, equalTo(Arrays.asList(2))); assertThat(val, equalTo(Arrays.asList(2)));
} }
} }

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.simulate;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ParsedSimulateRequestParserTests extends ESTestCase {
private static final ParsedSimulateRequest.Parser PARSER = new ParsedSimulateRequest.Parser();
private Map<String, Processor.Factory> processorRegistry;
private PipelineStore store;
private Processor processor;
private Pipeline pipeline;
private Data data;
@Before
public void init() throws IOException {
List<String> uppercase = Collections.unmodifiableList(Collections.singletonList("foo"));
processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.unmodifiableList(Arrays.asList(processor)));
data = new Data("_index", "_type", "_id", Collections.emptyMap());
processorRegistry = new HashMap<>();
processorRegistry.put("mutate", new MutateProcessor.Factory());
store = mock(PipelineStore.class);
when(store.get("_id")).thenReturn(pipeline);
when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry);
}
public void testParse_UsingPipelineStore() throws Exception {
ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
Map<String, Object> raw = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
Map<String, Object> doc = new HashMap<>();
doc.put("_index", "_index");
doc.put("_type", "_type");
doc.put("_id", "_id");
docs.add(doc);
raw.put("docs", docs);
ParsedSimulateRequest actualRequest = PARSER.parse("_id", raw, false, store);
assertThat(actualRequest, equalTo(expectedRequest));
}
public void testParse_ProvidedPipeline() throws Exception {
ParsedSimulateRequest expectedRequest = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
Map<String, Object> raw = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
Map<String, Object> doc = new HashMap<>();
doc.put("_index", "_index");
doc.put("_type", "_type");
doc.put("_id", "_id");
docs.add(doc);
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("uppercase", Arrays.asList("foo"));
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig)));
raw.put("docs", docs);
raw.put("pipeline", pipelineConfig);
ParsedSimulateRequest actualRequest = PARSER.parse(null, raw, false, store);
assertThat(actualRequest, equalTo(expectedRequest));
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.simulate;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.*;
public class SimulateExecutionServiceTests extends ESTestCase {
private PipelineStore store;
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private Pipeline pipeline;
private Processor processor;
private Data data;
@Before
public void setup() {
store = mock(PipelineStore.class);
threadPool = new ThreadPool(
Settings.builder()
.put("name", "_name")
.build()
);
executionService = new SimulateExecutionService(threadPool);
processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor));
data = new Data("_index", "_type", "_id", Collections.emptyMap());
}
@After
public void destroy() {
threadPool.shutdown();
}
public void testExecuteVerboseItem() throws Exception {
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data)));
SimulatedItemResponse actualItemResponse = executionService.executeVerboseItem(pipeline, data);
verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, equalTo(expectedItemResponse));
}
public void testExecuteItem_verboseSuccessful() throws Exception {
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(
Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data)));
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, true);
verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, equalTo(expectedItemResponse));
}
public void testExecuteItem_Simple() throws Exception {
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(data);
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, false);
verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, equalTo(expectedItemResponse));
}
public void testExecuteItem_Failure() throws Exception {
Exception e = new RuntimeException("processor failed");
SimulatedItemResponse expectedItemResponse = new SimulatedItemResponse(e);
doThrow(e).when(processor).execute(data);
SimulatedItemResponse actualItemResponse = executionService.executeItem(pipeline, data, false);
verify(processor, times(1)).execute(data);
assertThat(actualItemResponse, equalTo(expectedItemResponse));
}
public void testExecute() throws Exception {
SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class);
SimulatedItemResponse itemResponse = new SimulatedItemResponse(data);
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
executionService.execute(request, listener);
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
assertBusy(new Runnable() {
@Override
public void run() {
verify(processor, times(2)).execute(data);
verify(listener).onResponse(response);
}
});
}
public void testExecute_Verbose() throws Exception {
SimulateExecutionService.Listener listener = mock(SimulateExecutionService.Listener.class);
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true);
SimulatedItemResponse itemResponse = new SimulatedItemResponse(
Arrays.asList(new ProcessedData("processor[mock]-0", data), new ProcessedData("processor[mock]-1", data)));
executionService.execute(request, listener);
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
assertBusy(new Runnable() {
@Override
public void run() {
verify(processor, times(2)).execute(data);
verify(listener).onResponse(response);
}
});
}
}

View File

@ -13,6 +13,11 @@
} }
}, },
"params": { "params": {
"verbose": {
"type" : "boolean",
"description" : "Verbose mode. Display data output for each processor in executed pipeline",
"default" : false
}
} }
}, },
"body": { "body": {

View File

@ -12,11 +12,10 @@
"description": "_description", "description": "_description",
"processors": [ "processors": [
{ {
"simple" : { "mutate" : {
"path" : "field1", "update" : {
"expected_value" : "_value", "field2" : "_value"
"add_field" : "field2", }
"add_field_value" : "_value"
} }
} }
] ]
@ -48,4 +47,144 @@
] ]
} }
- length: { docs: 1 } - length: { docs: 1 }
- is_false: docs.0.error
- is_true: docs.0.modified
- match: { docs.0.foo: "bar" }
- match: { docs.0.field2: "_value" }
---
"Test simulate with provided pipeline definition":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"field2" : "_value"
}
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
---
"Test simulate with verbose flag":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"field2" : "_value"
}
}
},
{
"mutate" : {
"update" : {
"field3" : "third_val"
}
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_steps: 2 }
- match: { docs.0.processor_steps.0.processor_id: "processor[mutate]-0" }
- is_false: docs.0.processor_steps.0.error
- is_true: docs.0.processor_steps.0.modified
- length: { docs.0.processor_steps.0.doc: 2 }
- match: { docs.0.processor_steps.0.doc.foo: "bar" }
- match: { docs.0.processor_steps.0.doc.field2: "_value" }
- length: { docs.0.processor_steps.1.doc: 3 }
- match: { docs.0.processor_steps.1.doc.foo: "bar" }
- match: { docs.0.processor_steps.1.doc.field2: "_value" }
- match: { docs.0.processor_steps.1.doc.field3: "third_val" }
---
"Test simulate with exception thrown":
- do:
cluster.health:
wait_for_status: green
- do:
catch: request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"uppercase" : ["foo"]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"not_foo": "bar"
}
},
{
"_index": "index",
"_type": "type",
"_id": "id2",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 2 }
- is_true: docs.0.error
- match: { docs.0.failure: "java.lang.NullPointerException" }
- is_false: docs.1.error
- is_true: docs.1.modified
- match: { docs.1.doc.foo: "BAR" }