Merge pull request #14572 from talevy/simulate_ingest

[Ingest] Simulate Endpoint
This commit is contained in:
Tal Levy 2015-11-13 11:22:27 -08:00
commit 082686d9c5
38 changed files with 2130 additions and 42 deletions

View File

@ -22,11 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import java.lang.reflect.Array; import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* Represents the data and meta data (like id and type) of a single document that is going to be indexed. * Represents the data and meta data (like id and type) of a single document that is going to be indexed.
@ -47,6 +43,10 @@ public final class Data {
this.document = document; this.document = document;
} }
public Data(Data other) {
this(other.index, other.type, other.id, new HashMap<>(other.document));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> T getProperty(String path) { public <T> T getProperty(String path) {
// TODO: we should not rely on any core class, so we should have custom map extract value logic: // TODO: we should not rely on any core class, so we should have custom map extract value logic:
@ -129,4 +129,23 @@ public final class Data {
public boolean isModified() { public boolean isModified() {
return modified; return modified;
} }
@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Data other = (Data) obj;
return Objects.equals(document, other.document) &&
Objects.equals(index, other.index) &&
Objects.equals(type, other.type) &&
Objects.equals(id, other.id);
}
@Override
public int hashCode() {
return Objects.hash(index, type, id, document);
}
} }

View File

@ -74,7 +74,7 @@ public final class Pipeline {
public final static class Factory { public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws IOException { public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws IOException {
String description = ConfigurationUtils.readStringProperty(config, "description"); String description = ConfigurationUtils.readOptionalStringProperty(config, "description");
List<Processor> processors = new ArrayList<>(); List<Processor> processors = new ArrayList<>();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<Map<String, Map<String, Object>>> processorConfigs = (List<Map<String, Map<String, Object>>>) config.get("processors"); List<Map<String, Map<String, Object>>> processorConfigs = (List<Map<String, Map<String, Object>>>) config.get("processors");

View File

@ -77,39 +77,55 @@ public final class ConfigurationUtils {
* Returns and removes the specified property of type list from the specified configuration map. * Returns and removes the specified property of type list from the specified configuration map.
* *
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown. * If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
*/ */
public static List<String> readStringList(Map<String, Object> configuration, String propertyName) { public static <T> List<T> readOptionalList(Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing"); return null;
} }
return readStringList(propertyName, value); return readList(propertyName, value);
} }
/** /**
* Returns and removes the specified property of type list from the specified configuration map. * Returns and removes the specified property of type list from the specified configuration map.
* *
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown. * If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
*/ */
public static List<String> readOptionalStringList(Map<String, Object> configuration, String propertyName) { public static <T> List<T> readList(Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName); Object value = configuration.remove(propertyName);
if (value == null) { if (value == null) {
return null; throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
}
return readStringList(propertyName, value);
} }
private static List<String> readStringList(String propertyName, Object value) { return readList(propertyName, value);
}
private static <T> List<T> readList(String propertyName, Object value) {
if (value instanceof List) { if (value instanceof List) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<String> stringList = (List<String>) value; List<T> stringList = (List<T>) value;
return stringList; return stringList;
} else { } else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
} }
} }
/**
* Returns and removes the specified property of type map from the specified configuration map.
*
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
* If the property is missing an {@link IllegalArgumentException} is thrown
*/
public static <T> Map<String, T> readMap(Map<String, Object> configuration, String propertyName) {
Object value = configuration.remove(propertyName);
if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
}
return readMap(propertyName, value);
}
/** /**
* Returns and removes the specified property of type map from the specified configuration map. * Returns and removes the specified property of type map from the specified configuration map.
* *
@ -120,6 +136,11 @@ public final class ConfigurationUtils {
if (value == null) { if (value == null) {
return null; return null;
} }
return readMap(propertyName, value);
}
private static <T> Map<String, T> readMap(String propertyName, Object value) {
if (value instanceof Map) { if (value instanceof Map) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, T> map = (Map<String, T>) value; Map<String, T> map = (Map<String, T>) value;
@ -128,4 +149,5 @@ public final class ConfigurationUtils {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
} }
} }
} }

View File

@ -38,6 +38,11 @@ public interface Processor {
*/ */
void execute(Data data); void execute(Data data);
/**
* Gets the type of a processor
*/
String getType();
/** /**
* A factory that knows how to construct a processor based on a map of maps. * A factory that knows how to construct a processor based on a map of maps.
*/ */
@ -57,7 +62,5 @@ public interface Processor {
@Override @Override
default void close() throws IOException { default void close() throws IOException {
} }
} }
} }

View File

@ -78,6 +78,11 @@ public final class DateProcessor implements Processor {
data.addField(targetField, ISODateTimeFormat.dateTime().print(dateTime)); data.addField(targetField, ISODateTimeFormat.dateTime().print(dateTime));
} }
@Override
public String getType() {
return TYPE;
}
DateTimeZone getTimezone() { DateTimeZone getTimezone() {
return timezone; return timezone;
} }
@ -108,7 +113,7 @@ public final class DateProcessor implements Processor {
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString); DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale"); String localeString = ConfigurationUtils.readOptionalStringProperty(config, "locale");
Locale locale = localeString == null ? Locale.ENGLISH : Locale.forLanguageTag(localeString); Locale locale = localeString == null ? Locale.ENGLISH : Locale.forLanguageTag(localeString);
List<String> matchFormats = ConfigurationUtils.readStringList(config, "match_formats"); List<String> matchFormats = ConfigurationUtils.readList(config, "match_formats");
return new DateProcessor(timezone, locale, matchField, matchFormats, targetField); return new DateProcessor(timezone, locale, matchField, matchFormats, targetField);
} }
} }

View File

@ -40,7 +40,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.*; import java.util.*;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringList; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readList;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
public final class GeoIpProcessor implements Processor { public final class GeoIpProcessor implements Processor {
@ -91,6 +91,11 @@ public final class GeoIpProcessor implements Processor {
data.addField(targetField, geoData); data.addField(targetField, geoData);
} }
@Override
public String getType() {
return TYPE;
}
String getSourceField() { String getSourceField() {
return sourceField; return sourceField;
} }
@ -222,7 +227,7 @@ public final class GeoIpProcessor implements Processor {
final Set<Field> fields; final Set<Field> fields;
if (config.containsKey("fields")) { if (config.containsKey("fields")) {
fields = EnumSet.noneOf(Field.class); fields = EnumSet.noneOf(Field.class);
List<String> fieldNames = readStringList(config, "fields"); List<String> fieldNames = readList(config, "fields");
for (String fieldName : fieldNames) { for (String fieldName : fieldNames) {
try { try {
fields.add(Field.parse(fieldName)); fields.add(Field.parse(fieldName));

View File

@ -56,6 +56,11 @@ public final class GrokProcessor implements Processor {
} }
} }
@Override
public String getType() {
return TYPE;
}
String getMatchField() { String getMatchField() {
return matchField; return matchField;
} }

View File

@ -45,7 +45,7 @@ public final class MutateProcessor implements Processor {
private final List<String> uppercase; private final List<String> uppercase;
private final List<String> lowercase; private final List<String> lowercase;
public MutateProcessor(Map<String, Object> update, Map<String, String> rename, Map<String, String> convert, MutateProcessor(Map<String, Object> update, Map<String, String> rename, Map<String, String> convert,
Map<String, String> split, List<GsubExpression> gsub, Map<String, String> join, Map<String, String> split, List<GsubExpression> gsub, Map<String, String> join,
List<String> remove, List<String> trim, List<String> uppercase, List<String> lowercase) { List<String> remove, List<String> trim, List<String> uppercase, List<String> lowercase) {
this.update = update; this.update = update;
@ -134,6 +134,11 @@ public final class MutateProcessor implements Processor {
} }
} }
@Override
public String getType() {
return TYPE;
}
private void doUpdate(Data data) { private void doUpdate(Data data) {
for(Map.Entry<String, Object> entry : update.entrySet()) { for(Map.Entry<String, Object> entry : update.entrySet()) {
data.addField(entry.getKey(), entry.getValue()); data.addField(entry.getKey(), entry.getValue());
@ -281,10 +286,10 @@ public final class MutateProcessor implements Processor {
Map<String, String> split = ConfigurationUtils.readOptionalMap(config, "split"); Map<String, String> split = ConfigurationUtils.readOptionalMap(config, "split");
Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalMap(config, "gsub"); Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalMap(config, "gsub");
Map<String, String> join = ConfigurationUtils.readOptionalMap(config, "join"); Map<String, String> join = ConfigurationUtils.readOptionalMap(config, "join");
List<String> remove = ConfigurationUtils.readOptionalStringList(config, "remove"); List<String> remove = ConfigurationUtils.readOptionalList(config, "remove");
List<String> trim = ConfigurationUtils.readOptionalStringList(config, "trim"); List<String> trim = ConfigurationUtils.readOptionalList(config, "trim");
List<String> uppercase = ConfigurationUtils.readOptionalStringList(config, "uppercase"); List<String> uppercase = ConfigurationUtils.readOptionalList(config, "uppercase");
List<String> lowercase = ConfigurationUtils.readOptionalStringList(config, "lowercase"); List<String> lowercase = ConfigurationUtils.readOptionalList(config, "lowercase");
// pre-compile regex patterns // pre-compile regex patterns
List<GsubExpression> gsubExpressions = null; List<GsubExpression> gsubExpressions = null;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
import org.elasticsearch.ingest.processor.grok.GrokProcessor; import org.elasticsearch.ingest.processor.grok.GrokProcessor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor; import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -41,6 +42,7 @@ public class IngestModule extends AbstractModule {
binder().bind(PipelineExecutionService.class).asEagerSingleton(); binder().bind(PipelineExecutionService.class).asEagerSingleton();
binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton(); binder().bind(PipelineStoreClient.class).asEagerSingleton();
binder().bind(SimulateExecutionService.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory()); addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());

View File

@ -20,7 +20,6 @@
package org.elasticsearch.plugin.ingest; package org.elasticsearch.plugin.ingest;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
@ -30,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction; import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction; import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction; import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter; import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction;
@ -37,11 +37,11 @@ import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction; import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransportAction;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestModule; import org.elasticsearch.rest.RestModule;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -105,11 +105,13 @@ public class IngestPlugin extends Plugin {
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class); module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
module.registerAction(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
} }
public void onModule(RestModule restModule) { public void onModule(RestModule restModule) {
restModule.addRestAction(RestPutPipelineAction.class); restModule.addRestAction(RestPutPipelineAction.class);
restModule.addRestAction(RestGetPipelineAction.class); restModule.addRestAction(RestGetPipelineAction.class);
restModule.addRestAction(RestDeletePipelineAction.class); restModule.addRestAction(RestDeletePipelineAction.class);
restModule.addRestAction(RestSimulatePipelineAction.class);
} }
} }

View File

@ -95,6 +95,10 @@ public class PipelineStore extends AbstractLifecycleComponent {
} }
} }
public Map<String, Processor.Factory> getProcessorFactoryRegistry() {
return processorFactoryRegistry;
}
public List<PipelineReference> getReference(String... ids) { public List<PipelineReference> getReference(String... ids) {
List<PipelineReference> result = new ArrayList<>(ids.length); List<PipelineReference> result = new ArrayList<>(ids.length);
for (String id : ids) { for (String id : ids) {

View File

@ -25,27 +25,24 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction; import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest; import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequest;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestGetPipelineAction extends BaseRestHandler { public class RestGetPipelineAction extends BaseRestHandler {
@Inject @Inject
public RestGetPipelineAction(Settings settings, RestController controller, Client client) { public RestGetPipelineAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client); super(settings, controller, client);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{ids}", this); controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}", this);
} }
@Override @Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception { protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
GetPipelineRequest request = new GetPipelineRequest(); GetPipelineRequest request = new GetPipelineRequest();
request.ids(Strings.splitStringByCommaToArray(restRequest.param("ids"))); request.ids(Strings.splitStringByCommaToArray(restRequest.param("id")));
client.execute(GetPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); client.execute(GetPipelineAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
} }
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.rest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestToXContentListener;
public class RestSimulatePipelineAction extends BaseRestHandler {
@Inject
public RestSimulatePipelineAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/{id}/_simulate", this);
controller.registerHandler(RestRequest.Method.POST, "/_ingest/pipeline/_simulate", this);
controller.registerHandler(RestRequest.Method.GET, "/_ingest/pipeline/_simulate", this);
}
@Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
SimulatePipelineRequest request = new SimulatePipelineRequest();
request.setId(restRequest.param("id"));
request.setVerbose(restRequest.paramAsBoolean("verbose", false));
if (RestActions.hasBodyContent(restRequest)) {
request.setSource(RestActions.getRestContent(restRequest));
}
client.execute(SimulatePipelineAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.Data;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class TransportData implements Writeable<TransportData>, ToXContent {
private static final TransportData PROTOTYPE = new TransportData(null);
private final Data data;
public TransportData(Data data) {
this.data = data;
}
public Data get() {
return data;
}
public static TransportData readTransportDataFrom(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
@Override
public TransportData readFrom(StreamInput in) throws IOException {
String index = in.readString();
String type = in.readString();
String id = in.readString();
Map<String, Object> doc = in.readMap();
return new TransportData(new Data(index, type, id, doc));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(data.getIndex());
out.writeString(data.getType());
out.writeString(data.getId());
out.writeMap(data.getDocument());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DOCUMENT);
builder.field(Fields.MODIFIED, data.isModified());
builder.field(Fields.INDEX, data.getIndex());
builder.field(Fields.TYPE, data.getType());
builder.field(Fields.ID, data.getId());
builder.field(Fields.SOURCE, data.getDocument());
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TransportData that = (TransportData) o;
return Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(data);
}
static final class Fields {
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
static final XContentBuilderString INDEX = new XContentBuilderString("_index");
static final XContentBuilderString TYPE = new XContentBuilderString("_type");
static final XContentBuilderString ID = new XContentBuilderString("_id");
static final XContentBuilderString SOURCE = new XContentBuilderString("_source");
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
public class ParsedSimulateRequest {
private final List<Data> documents;
private final Pipeline pipeline;
private final boolean verbose;
ParsedSimulateRequest(Pipeline pipeline, List<Data> documents, boolean verbose) {
this.pipeline = pipeline;
this.documents = Collections.unmodifiableList(documents);
this.verbose = verbose;
}
public Pipeline getPipeline() {
return pipeline;
}
public List<Data> getDocuments() {
return documents;
}
public boolean isVerbose() {
return verbose;
}
public static class Parser {
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
private List<Data> parseDocs(Map<String, Object> config) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS);
List<Data> dataList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE),
ConfigurationUtils.readStringProperty(dataMap, Fields.ID),
document);
dataList.add(data);
}
return dataList;
}
public ParsedSimulateRequest parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
if (pipelineId == null) {
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = pipelineStore.get(pipelineId);
List<Data> dataList = parseDocs(config);
return new ParsedSimulateRequest(pipeline, dataList, verbose);
}
public ParsedSimulateRequest parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
List<Data> dataList = parseDocs(config);
return new ParsedSimulateRequest(pipeline, dataList, verbose);
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
public interface SimulateDocumentResult<T extends SimulateDocumentResult> extends Writeable<T>, ToXContent {
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.plugin.ingest.transport.TransportData;
import java.io.IOException;
public class SimulateDocumentSimpleResult implements SimulateDocumentResult<SimulateDocumentSimpleResult> {
private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult((Data)null);
private TransportData data;
private Exception failure;
public SimulateDocumentSimpleResult(Data data) {
this.data = new TransportData(data);
}
private SimulateDocumentSimpleResult(TransportData data) {
this.data = data;
}
public SimulateDocumentSimpleResult(Exception failure) {
this.failure = failure;
}
public Data getData() {
if (data == null) {
return null;
}
return data.get();
}
public Exception getFailure() {
return failure;
}
public static SimulateDocumentSimpleResult readSimulateDocumentSimpleResult(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
@Override
public SimulateDocumentSimpleResult readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
Exception exception = in.readThrowable();
return new SimulateDocumentSimpleResult(exception);
}
return new SimulateDocumentSimpleResult(TransportData.readTransportDataFrom(in));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (failure == null) {
out.writeBoolean(false);
data.writeTo(out);
} else {
out.writeBoolean(true);
out.writeThrowable(failure);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (failure == null) {
data.toXContent(builder, params);
} else {
ElasticsearchException.renderThrowable(builder, params, failure);
}
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SimulateDocumentVerboseResult implements SimulateDocumentResult<SimulateDocumentVerboseResult> {
private static final SimulateDocumentVerboseResult PROTOTYPE = new SimulateDocumentVerboseResult(Collections.emptyList());
private final List<SimulateProcessorResult> processorResults;
public SimulateDocumentVerboseResult(List<SimulateProcessorResult> processorResults) {
this.processorResults = processorResults;
}
public List<SimulateProcessorResult> getProcessorResults() {
return processorResults;
}
public static SimulateDocumentVerboseResult readSimulateDocumentVerboseResultFrom(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
@Override
public SimulateDocumentVerboseResult readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<SimulateProcessorResult> processorResults = new ArrayList<>();
for (int i = 0; i < size; i++) {
processorResults.add(SimulateProcessorResult.readSimulateProcessorResultFrom(in));
}
return new SimulateDocumentVerboseResult(processorResults);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processorResults.size());
for (SimulateProcessorResult result : processorResults) {
result.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(Fields.PROCESSOR_RESULTS);
for (SimulateProcessorResult processorResult : processorResults) {
processorResult.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
public class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
private final ThreadPool threadPool;
@Inject
public SimulateExecutionService(ThreadPool threadPool) {
this.threadPool = threadPool;
}
SimulateDocumentResult executeItem(Pipeline pipeline, Data data) {
try {
pipeline.execute(data);
return new SimulateDocumentSimpleResult(data);
} catch (Exception e) {
return new SimulateDocumentSimpleResult(e);
}
}
SimulateDocumentVerboseResult executeVerboseItem(Pipeline pipeline, Data data) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
Data currentData = new Data(data);
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
Processor processor = pipeline.getProcessors().get(i);
String processorId = "processor[" + processor.getType() + "]-" + i;
try {
processor.execute(currentData);
processorResultList.add(new SimulateProcessorResult(processorId, currentData));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(processorId, e));
}
currentData = new Data(currentData);
}
return new SimulateDocumentVerboseResult(processorResultList);
}
public void execute(ParsedSimulateRequest request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
@Override
public void run() {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (Data data : request.getDocuments()) {
if (request.isVerbose()) {
responses.add(executeVerboseItem(request.getPipeline(), data));
} else {
responses.add(executeItem(request.getPipeline(), data));
}
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
}
});
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class SimulatePipelineAction extends Action<SimulatePipelineRequest, SimulatePipelineResponse, SimulatePipelineRequestBuilder> {
public static final SimulatePipelineAction INSTANCE = new SimulatePipelineAction();
public static final String NAME = "cluster:admin/ingest/pipeline/simulate";
public SimulatePipelineAction() {
super(NAME);
}
@Override
public SimulatePipelineRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new SimulatePipelineRequestBuilder(client, this);
}
@Override
public SimulatePipelineResponse newResponse() {
return new SimulatePipelineResponse();
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SimulatePipelineRequest extends ActionRequest {
private String id;
private boolean verbose;
private BytesReference source;
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public boolean isVerbose() {
return verbose;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
public BytesReference getSource() {
return source;
}
public void setSource(BytesReference source) {
this.source = source;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
verbose = in.readBoolean();
source = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(verbose);
out.writeBytesReference(source);
}
public static final class Fields {
static final String PIPELINE = "pipeline";
static final String DOCS = "docs";
static final String SOURCE = "_source";
static final String INDEX = "_index";
static final String TYPE = "_type";
static final String ID = "_id";
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
public class SimulatePipelineRequestBuilder extends ActionRequestBuilder<SimulatePipelineRequest, SimulatePipelineResponse, SimulatePipelineRequestBuilder> {
public SimulatePipelineRequestBuilder(ElasticsearchClient client, SimulatePipelineAction action) {
super(client, action, new SimulatePipelineRequest());
}
public SimulatePipelineRequestBuilder setId(String id) {
request.setId(id);
return this;
}
public SimulatePipelineRequestBuilder setVerbose(boolean verbose) {
request.setVerbose(verbose);
return this;
}
public SimulatePipelineRequestBuilder setSource(BytesReference source) {
request.setSource(source);
return this;
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SimulatePipelineResponse extends ActionResponse implements ToXContent {
private String pipelineId;
private boolean verbose;
private List<SimulateDocumentResult> results;
public SimulatePipelineResponse() {
}
public SimulatePipelineResponse(String pipelineId, boolean verbose, List<SimulateDocumentResult> responses) {
this.pipelineId = pipelineId;
this.verbose = verbose;
this.results = Collections.unmodifiableList(responses);
}
public String getPipelineId() {
return pipelineId;
}
public List<SimulateDocumentResult> getResults() {
return results;
}
public boolean isVerbose() {
return verbose;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pipelineId);
out.writeBoolean(verbose);
out.writeVInt(results.size());
for (SimulateDocumentResult response : results) {
response.writeTo(out);
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.pipelineId = in.readString();
boolean verbose = in.readBoolean();
int responsesLength = in.readVInt();
results = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult<?> simulateDocumentResult;
if (verbose) {
simulateDocumentResult = SimulateDocumentVerboseResult.readSimulateDocumentVerboseResultFrom(in);
} else {
simulateDocumentResult = SimulateDocumentSimpleResult.readSimulateDocumentSimpleResult(in);
}
results.add(simulateDocumentResult);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : results) {
response.toXContent(builder, params);
}
builder.endArray();
return builder;
}
static final class Fields {
static final XContentBuilderString DOCUMENTS = new XContentBuilderString("docs");
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {
private final PipelineStore pipelineStore;
private final SimulateExecutionService executionService;
@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore, SimulateExecutionService executionService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.pipelineStore = pipelineStore;
this.executionService = executionService;
}
@Override
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false).v2();
ParsedSimulateRequest simulateRequest;
ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser();
try {
if (request.getId() != null) {
simulateRequest = parser.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
} else {
simulateRequest = parser.parse(source, request.isVerbose(), pipelineStore);
}
} catch (IOException e) {
listener.onFailure(e);
return;
}
executionService.execute(simulateRequest, listener);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.plugin.ingest.transport.TransportData;
import java.io.IOException;
public class SimulateProcessorResult implements Writeable<SimulateProcessorResult>, ToXContent {
private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult(null, (Data)null);
private String processorId;
private TransportData data;
private Exception failure;
public SimulateProcessorResult(String processorId, Data data) {
this.processorId = processorId;
this.data = new TransportData(data);
}
private SimulateProcessorResult(String processorId, TransportData data) {
this.processorId = processorId;
this.data = data;
}
public SimulateProcessorResult(String processorId, Exception failure) {
this.processorId = processorId;
this.failure = failure;
}
public Data getData() {
if (data == null) {
return null;
}
return data.get();
}
public String getProcessorId() {
return processorId;
}
public Exception getFailure() {
return failure;
}
public static SimulateProcessorResult readSimulateProcessorResultFrom(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
@Override
public SimulateProcessorResult readFrom(StreamInput in) throws IOException {
String processorId = in.readString();
if (in.readBoolean()) {
Exception exception = in.readThrowable();
return new SimulateProcessorResult(processorId, exception);
}
return new SimulateProcessorResult(processorId, TransportData.readTransportDataFrom(in));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(processorId);
if (failure == null) {
out.writeBoolean(false);
data.writeTo(out);
} else {
out.writeBoolean(true);
out.writeThrowable(failure);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.PROCESSOR_ID, processorId);
if (failure == null) {
data.toXContent(builder, params);
} else {
ElasticsearchException.renderThrowable(builder, params, failure);
}
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id");
}
}

View File

@ -22,10 +22,12 @@ package org.elasticsearch.ingest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class DataTests extends ESTestCase { public class DataTests extends ESTestCase {
@ -84,4 +86,55 @@ public class DataTests extends ESTestCase {
data.addField("fizz.new", "bar"); data.addField("fizz.new", "bar");
assertThat(data.getProperty("fizz.new"), equalTo("bar")); assertThat(data.getProperty("fizz.new"), equalTo("bar"));
} }
public void testEqualsAndHashcode() throws Exception {
String index = randomAsciiOfLengthBetween(1, 10);
String type = randomAsciiOfLengthBetween(1, 10);
String id = randomAsciiOfLengthBetween(1, 10);
String fieldName = randomAsciiOfLengthBetween(1, 10);
String fieldValue = randomAsciiOfLengthBetween(1, 10);
Data data = new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue));
boolean changed = false;
String otherIndex;
if (randomBoolean()) {
otherIndex = randomAsciiOfLengthBetween(1, 10);
changed = true;
} else {
otherIndex = index;
}
String otherType;
if (randomBoolean()) {
otherType = randomAsciiOfLengthBetween(1, 10);
changed = true;
} else {
otherType = type;
}
String otherId;
if (randomBoolean()) {
otherId = randomAsciiOfLengthBetween(1, 10);
changed = true;
} else {
otherId = id;
}
Map<String, Object> document;
if (randomBoolean()) {
document = Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
changed = true;
} else {
document = Collections.singletonMap(fieldName, fieldValue);
}
Data otherData = new Data(otherIndex, otherType, otherId, document);
if (changed) {
assertThat(data, not(equalTo(otherData)));
assertThat(otherData, not(equalTo(data)));
} else {
assertThat(data, equalTo(otherData));
assertThat(otherData, equalTo(data));
Data thirdData = new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue));
assertThat(thirdData, equalTo(data));
assertThat(data, equalTo(thirdData));
}
}
} }

View File

@ -30,15 +30,20 @@ import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse; import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateDocumentSimpleResult;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.notNullValue; import static org.hamcrest.core.IsNull.notNullValue;
@ -52,7 +57,58 @@ public class IngestClientIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() { protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins(); return nodePlugins();
}
public void testSimulate() throws Exception {
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("grok")
.field("field", "field1")
.field("pattern", "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>")
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
assertBusy(new Runnable() {
@Override
public void run() {
GetPipelineResponse response = new GetPipelineRequestBuilder(client(), GetPipelineAction.INSTANCE)
.setIds("_id")
.get();
assertThat(response.isFound(), is(true));
assertThat(response.pipelines().get("_id"), notNullValue());
}
});
SimulatePipelineResponse response = new SimulatePipelineRequestBuilder(client(), SimulatePipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
.startArray("docs")
.startObject()
.field("_index", "index")
.field("_type", "type")
.field("_id", "id")
.startObject("_source")
.field("foo", "bar")
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
assertThat(response.isVerbose(), equalTo(false));
assertThat(response.getPipelineId(), equalTo("_id"));
assertThat(response.getResults().size(), equalTo(1));
assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
Data expectedData = new Data("index", "type", "id", Collections.singletonMap("foo", "bar"));
assertThat(simulateDocumentSimpleResult.getData(), equalTo(expectedData));
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
} }
public void test() throws Exception { public void test() throws Exception {

View File

@ -59,7 +59,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
// TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer> // TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer>
public void testOptional_InvalidType() { public void testOptional_InvalidType() {
List<String> val = ConfigurationUtils.readStringList(config, "int"); List<String> val = ConfigurationUtils.readList(config, "int");
assertThat(val, equalTo(Arrays.asList(2))); assertThat(val, equalTo(Arrays.asList(2)));
} }
} }

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class TransportDataTests extends ESTestCase {
public void testEqualsAndHashcode() throws Exception {
String index = randomAsciiOfLengthBetween(1, 10);
String type = randomAsciiOfLengthBetween(1, 10);
String id = randomAsciiOfLengthBetween(1, 10);
String fieldName = randomAsciiOfLengthBetween(1, 10);
String fieldValue = randomAsciiOfLengthBetween(1, 10);
TransportData transportData = new TransportData(new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
boolean changed = false;
String otherIndex;
if (randomBoolean()) {
otherIndex = randomAsciiOfLengthBetween(1, 10);
changed = true;
} else {
otherIndex = index;
}
String otherType;
if (randomBoolean()) {
otherType = randomAsciiOfLengthBetween(1, 10);
changed = true;
} else {
otherType = type;
}
String otherId;
if (randomBoolean()) {
otherId = randomAsciiOfLengthBetween(1, 10);
changed = true;
} else {
otherId = id;
}
Map<String, Object> document;
if (randomBoolean()) {
document = Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
changed = true;
} else {
document = Collections.singletonMap(fieldName, fieldValue);
}
TransportData otherTransportData = new TransportData(new Data(otherIndex, otherType, otherId, document));
if (changed) {
assertThat(transportData, not(equalTo(otherTransportData)));
assertThat(otherTransportData, not(equalTo(transportData)));
} else {
assertThat(transportData, equalTo(otherTransportData));
assertThat(otherTransportData, equalTo(transportData));
TransportData thirdTransportData = new TransportData(new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
assertThat(thirdTransportData, equalTo(transportData));
assertThat(transportData, equalTo(thirdTransportData));
}
}
public void testSerialization() throws IOException {
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
TransportData transportData = new TransportData(data);
BytesStreamOutput out = new BytesStreamOutput();
transportData.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
TransportData otherTransportData = TransportData.readTransportDataFrom(streamInput);
assertThat(otherTransportData, equalTo(transportData));
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ParsedSimulateRequestParserTests extends ESTestCase {
private PipelineStore store;
@Before
public void init() throws IOException {
Pipeline pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.singletonList(mock(Processor.class)));
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mock_processor", mock(Processor.Factory.class));
store = mock(PipelineStore.class);
when(store.get(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry);
}
public void testParseUsingPipelineStore() throws Exception {
int numDocs = randomIntBetween(1, 10);
Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
List<Map<String, Object>> expectedDocs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
for (int i = 0; i < numDocs; i++) {
Map<String, Object> doc = new HashMap<>();
String index = randomAsciiOfLengthBetween(1, 10);
String type = randomAsciiOfLengthBetween(1, 10);
String id = randomAsciiOfLengthBetween(1, 10);
doc.put(Fields.INDEX, index);
doc.put(Fields.TYPE, type);
doc.put(Fields.ID, id);
String fieldName = randomAsciiOfLengthBetween(1, 10);
String fieldValue = randomAsciiOfLengthBetween(1, 10);
doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
docs.add(doc);
Map<String, Object> expectedDoc = new HashMap<>();
expectedDoc.put(Fields.INDEX, index);
expectedDoc.put(Fields.TYPE, type);
expectedDoc.put(Fields.ID, id);
expectedDoc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
expectedDocs.add(expectedDoc);
}
ParsedSimulateRequest actualRequest = new ParsedSimulateRequest.Parser().parseWithPipelineId(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, requestContent, false, store);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
for (Data data : actualRequest.getDocuments()) {
Map<String, Object> expectedDocument = expectedDocsIterator.next();
assertThat(data.getDocument(), equalTo(expectedDocument.get(Fields.SOURCE)));
assertThat(data.getIndex(), equalTo(expectedDocument.get(Fields.INDEX)));
assertThat(data.getType(), equalTo(expectedDocument.get(Fields.TYPE)));
assertThat(data.getId(), equalTo(expectedDocument.get(Fields.ID)));
}
assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(1));
}
public void testParseWithProvidedPipeline() throws Exception {
int numDocs = randomIntBetween(1, 10);
Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
List<Map<String, Object>> expectedDocs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
for (int i = 0; i < numDocs; i++) {
Map<String, Object> doc = new HashMap<>();
String index = randomAsciiOfLengthBetween(1, 10);
String type = randomAsciiOfLengthBetween(1, 10);
String id = randomAsciiOfLengthBetween(1, 10);
doc.put(Fields.INDEX, index);
doc.put(Fields.TYPE, type);
doc.put(Fields.ID, id);
String fieldName = randomAsciiOfLengthBetween(1, 10);
String fieldValue = randomAsciiOfLengthBetween(1, 10);
doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
docs.add(doc);
Map<String, Object> expectedDoc = new HashMap<>();
expectedDoc.put(Fields.INDEX, index);
expectedDoc.put(Fields.TYPE, type);
expectedDoc.put(Fields.ID, id);
expectedDoc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
expectedDocs.add(expectedDoc);
}
Map<String, Object> pipelineConfig = new HashMap<>();
List<Map<String, Object>> processors = new ArrayList<>();
int numProcessors = randomIntBetween(1, 10);
for (int i = 0; i < numProcessors; i++) {
processors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
pipelineConfig.put("processors", processors);
requestContent.put(Fields.PIPELINE, pipelineConfig);
ParsedSimulateRequest actualRequest = new ParsedSimulateRequest.Parser().parse(requestContent, false, store);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
for (Data data : actualRequest.getDocuments()) {
Map<String, Object> expectedDocument = expectedDocsIterator.next();
assertThat(data.getDocument(), equalTo(expectedDocument.get(Fields.SOURCE)));
assertThat(data.getIndex(), equalTo(expectedDocument.get(Fields.INDEX)));
assertThat(data.getType(), equalTo(expectedDocument.get(Fields.TYPE)));
assertThat(data.getId(), equalTo(expectedDocument.get(Fields.ID)));
}
assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(numProcessors));
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
public class SimulateDocumentSimpleResultTests extends ESTestCase {
public void testSerialization() throws IOException {
boolean isFailure = randomBoolean();
SimulateDocumentSimpleResult simulateDocumentSimpleResult;
if (isFailure) {
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test"));
} else {
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(data);
}
BytesStreamOutput out = new BytesStreamOutput();
simulateDocumentSimpleResult.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateDocumentSimpleResult otherSimulateDocumentSimpleResult = SimulateDocumentSimpleResult.readSimulateDocumentSimpleResult(streamInput);
assertThat(otherSimulateDocumentSimpleResult.getData(), equalTo(simulateDocumentSimpleResult.getData()));
if (isFailure) {
assertThat(otherSimulateDocumentSimpleResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) otherSimulateDocumentSimpleResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;
public class SimulateExecutionServiceTests extends ESTestCase {
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private Pipeline pipeline;
private Processor processor;
private Data data;
@Before
public void setup() {
threadPool = new ThreadPool(
Settings.builder()
.put("name", getClass().getName())
.build()
);
executionService = new SimulateExecutionService(threadPool);
processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor));
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
}
@After
public void destroy() {
threadPool.shutdown();
}
public void testExecuteVerboseItem() throws Exception {
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorId(), equalTo("processor[mock]-0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), not(sameInstance(data)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), equalTo(data));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(data)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(data));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}
public void testExecuteItem() throws Exception {
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
assertThat(simulateDocumentSimpleResult.getData(), equalTo(data));
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
}
public void testExecuteVerboseItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).doNothing().when(processor).execute(data);
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorId(), equalTo("processor[mock]-0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), instanceOf(RuntimeException.class));
RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(data)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(data));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
}
public void testExecuteItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).when(processor).execute(data);
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
verify(processor, times(1)).execute(data);
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
assertThat(simulateDocumentSimpleResult.getData(), nullValue());
assertThat(simulateDocumentSimpleResult.getFailure(), instanceOf(RuntimeException.class));
RuntimeException runtimeException = (RuntimeException) simulateDocumentSimpleResult.getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
public class SimulatePipelineResponseTests extends ESTestCase {
public void testSerialization() throws IOException {
boolean isVerbose = randomBoolean();
int numResults = randomIntBetween(1, 10);
List<SimulateDocumentResult> results = new ArrayList<>(numResults);
for (int i = 0; i < numResults; i++) {
boolean isFailure = randomBoolean();
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
if (isVerbose) {
int numProcessors = randomIntBetween(1, 10);
List<SimulateProcessorResult> processorResults = new ArrayList<>(numProcessors);
for (int j = 0; j < numProcessors; j++) {
String processorId = randomAsciiOfLengthBetween(1, 10);
SimulateProcessorResult processorResult;
if (isFailure) {
processorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test"));
} else {
processorResult = new SimulateProcessorResult(processorId, data);
}
processorResults.add(processorResult);
}
results.add(new SimulateDocumentVerboseResult(processorResults));
} else {
results.add(new SimulateDocumentSimpleResult(data));
SimulateDocumentSimpleResult simulateDocumentSimpleResult;
if (isFailure) {
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test"));
} else {
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(data);
}
results.add(simulateDocumentSimpleResult);
}
}
SimulatePipelineResponse response = new SimulatePipelineResponse(randomAsciiOfLengthBetween(1, 10), isVerbose, results);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulatePipelineResponse otherResponse = new SimulatePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(otherResponse.getPipelineId(), equalTo(response.getPipelineId()));
assertThat(otherResponse.getResults().size(), equalTo(response.getResults().size()));
Iterator<SimulateDocumentResult> expectedResultIterator = response.getResults().iterator();
for (SimulateDocumentResult result : otherResponse.getResults()) {
if (isVerbose) {
SimulateDocumentVerboseResult expectedSimulateDocumentVerboseResult = (SimulateDocumentVerboseResult) expectedResultIterator.next();
assertThat(result, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) result;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(expectedSimulateDocumentVerboseResult.getProcessorResults().size()));
Iterator<SimulateProcessorResult> expectedProcessorResultIterator = expectedSimulateDocumentVerboseResult.getProcessorResults().iterator();
for (SimulateProcessorResult simulateProcessorResult : simulateDocumentVerboseResult.getProcessorResults()) {
SimulateProcessorResult expectedProcessorResult = expectedProcessorResultIterator.next();
assertThat(simulateProcessorResult.getProcessorId(), equalTo(expectedProcessorResult.getProcessorId()));
assertThat(simulateProcessorResult.getData(), equalTo(expectedProcessorResult.getData()));
if (expectedProcessorResult.getFailure() == null) {
assertThat(simulateProcessorResult.getFailure(), nullValue());
} else {
assertThat(simulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) simulateProcessorResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
}
}
} else {
SimulateDocumentSimpleResult expectedSimulateDocumentSimpleResult = (SimulateDocumentSimpleResult) expectedResultIterator.next();
assertThat(result, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) result;
assertThat(simulateDocumentSimpleResult.getData(), equalTo(expectedSimulateDocumentSimpleResult.getData()));
if (expectedSimulateDocumentSimpleResult.getFailure() == null) {
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
} else {
assertThat(simulateDocumentSimpleResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) simulateDocumentSimpleResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
}
}
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class SimulateProcessorResultTests extends ESTestCase {
public void testSerialization() throws IOException {
String processorId = randomAsciiOfLengthBetween(1, 10);
boolean isFailure = randomBoolean();
SimulateProcessorResult simulateProcessorResult;
if (isFailure) {
simulateProcessorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test"));
} else {
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
simulateProcessorResult = new SimulateProcessorResult(processorId, data);
}
BytesStreamOutput out = new BytesStreamOutput();
simulateProcessorResult.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateProcessorResult otherSimulateProcessorResult = SimulateProcessorResult.readSimulateProcessorResultFrom(streamInput);
assertThat(otherSimulateProcessorResult.getProcessorId(), equalTo(simulateProcessorResult.getProcessorId()));
assertThat(otherSimulateProcessorResult.getData(), equalTo(simulateProcessorResult.getData()));
if (isFailure) {
assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));
IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure();
assertThat(e.getMessage(), equalTo("test"));
}
}
}

View File

@ -3,10 +3,10 @@
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html", "documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
"methods": [ "GET" ], "methods": [ "GET" ],
"url": { "url": {
"path": "/_ingest/pipeline/{ids}", "path": "/_ingest/pipeline/{id}",
"paths": [ "/_ingest/pipeline/{ids}" ], "paths": [ "/_ingest/pipeline/{id}" ],
"parts": { "parts": {
"ids": { "id": {
"type" : "string", "type" : "string",
"description" : "Comma separated list of pipeline ids. Wildcards supported", "description" : "Comma separated list of pipeline ids. Wildcards supported",
"required" : true "required" : true

View File

@ -0,0 +1,28 @@
{
"ingest.simulate": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest.html",
"methods": [ "GET", "POST" ],
"url": {
"path": "/_ingest/pipeline/_simulate",
"paths": [ "/_ingest/pipeline/_simulate", "/_ingest/pipeline/{id}/_simulate/" ],
"parts": {
"id": {
"type" : "string",
"description" : "Pipeline ID",
"required" : false
}
},
"params": {
"verbose": {
"type" : "boolean",
"description" : "Verbose mode. Display data output for each processor in executed pipeline",
"default" : false
}
}
},
"body": {
"description" : "The simulate definition",
"required" : true
}
}
}

View File

@ -32,7 +32,7 @@
- do: - do:
ingest.get_pipeline: ingest.get_pipeline:
ids: "my_pipeline" id: "my_pipeline"
- match: { my_pipeline._source.description: "_description" } - match: { my_pipeline._source.description: "_description" }
- match: { my_pipeline._version: 1 } - match: { my_pipeline._version: 1 }
@ -53,7 +53,7 @@
- do: - do:
catch: missing catch: missing
ingest.get_pipeline: ingest.get_pipeline:
ids: "my_pipeline" id: "my_pipeline"
--- ---
"Test invalid config": "Test invalid config":

View File

@ -0,0 +1,274 @@
---
"Test simulate with stored ingest pipeline":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"field2" : "_value"
}
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.simulate:
id: "my_pipeline"
body: >
{
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- is_true: docs.0.doc.modified
- match: { docs.0.doc._source.foo: "bar" }
- match: { docs.0.doc._source.field2: "_value" }
---
"Test simulate with provided pipeline definition":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"field2" : "_value"
}
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
---
"Test simulate with no provided pipeline or pipeline_id":
- do:
cluster.health:
wait_for_status: green
- do:
catch: request
ingest.simulate:
body: >
{
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { error: 3 }
- match: { status: 400 }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "required property [pipeline] is missing" }
---
"Test simulate with verbose flag":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"field2" : "_value"
}
}
},
{
"mutate" : {
"update" : {
"field3" : "third_val"
}
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.processor_id: "processor[mutate]-0" }
- is_true: docs.0.processor_results.0.doc.modified
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
- match: { docs.0.processor_results.0.doc._source.field2: "_value" }
- length: { docs.0.processor_results.1.doc._source: 3 }
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
- match: { docs.0.processor_results.1.doc._source.field2: "_value" }
- match: { docs.0.processor_results.1.doc._source..field3: "third_val" }
---
"Test simulate with exception thrown":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"uppercase" : ["foo"]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"not_foo": "bar"
}
},
{
"_index": "index",
"_type": "type",
"_id": "id2",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 2 }
- match: { docs.0.error.type: "null_pointer_exception" }
- is_true: docs.1.doc.modified
- match: { docs.1.doc._source.foo: "BAR" }
---
"Test verbose simulate with exception thrown":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"mutate" : {
"convert" : {
"foo": "integer"
}
}
},
{
"mutate" : {
"uppercase" : ["bar"]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar",
"bar": "hello"
}
},
{
"_index": "index",
"_type": "type",
"_id": "id2",
"_source": {
"foo": "5",
"bar": "hello"
}
}
]
}
- length: { docs: 2 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.error.type: "number_format_exception" }
- match: { docs.0.processor_results.1.doc._index: "index" }
- match: { docs.0.processor_results.1.doc._type: "type" }
- match: { docs.0.processor_results.1.doc._id: "id" }
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }