Merge pull request #15810 from talevy/fix/simulate_verbose_on_failure

[Ingest] Add on_failure support for verbose _simulate execution
This commit is contained in:
Tal Levy 2016-01-15 15:03:56 -08:00
commit aa35f510e8
66 changed files with 631 additions and 214 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
@ -39,20 +40,38 @@ class SimulateExecutionService {
this.threadPool = threadPool;
}
void executeVerboseDocument(Processor processor, IngestDocument ingestDocument, List<SimulateProcessorResult> processorResultList) throws Exception {
if (processor instanceof CompoundProcessor) {
CompoundProcessor cp = (CompoundProcessor) processor;
try {
for (Processor p : cp.getProcessors()) {
executeVerboseDocument(p, ingestDocument, processorResultList);
}
} catch (Exception e) {
for (Processor p : cp.getOnFailureProcessors()) {
executeVerboseDocument(p, ingestDocument, processorResultList);
}
}
} else {
try {
processor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
throw e;
}
}
}
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
Processor processor = pipeline.getProcessors().get(i);
String processorId = "processor[" + processor.getType() + "]-" + i;
CompoundProcessor pipelineProcessor = new CompoundProcessor(pipeline.getProcessors(), pipeline.getOnFailureProcessors());
try {
processor.execute(currentIngestDocument);
processorResultList.add(new SimulateProcessorResult(processorId, currentIngestDocument));
executeVerboseDocument(pipelineProcessor, currentIngestDocument, processorResultList);
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(processorId, e));
}
currentIngestDocument = new IngestDocument(currentIngestDocument);
return new SimulateDocumentSimpleResult(e);
}
return new SimulateDocumentVerboseResult(processorResultList);
} else {

View File

@ -24,19 +24,17 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.core.IngestDocument;
import java.io.IOException;
import java.util.Collections;
public class SimulateProcessorResult implements Writeable<SimulateProcessorResult>, ToXContent {
private final String processorId;
private final String processorTag;
private final WriteableIngestDocument ingestDocument;
private final Exception failure;
public SimulateProcessorResult(StreamInput in) throws IOException {
this.processorId = in.readString();
this.processorTag = in.readString();
if (in.readBoolean()) {
this.failure = in.readThrowable();
this.ingestDocument = null;
@ -46,14 +44,14 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
}
}
public SimulateProcessorResult(String processorId, IngestDocument ingestDocument) {
this.processorId = processorId;
public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) {
this.processorTag = processorTag;
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
this.failure = null;
}
public SimulateProcessorResult(String processorId, Exception failure) {
this.processorId = processorId;
public SimulateProcessorResult(String processorTag, Exception failure) {
this.processorTag = processorTag;
this.failure = failure;
this.ingestDocument = null;
}
@ -65,8 +63,8 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
return ingestDocument.getIngestDocument();
}
public String getProcessorId() {
return processorId;
public String getProcessorTag() {
return processorTag;
}
public Exception getFailure() {
@ -80,7 +78,7 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(processorId);
out.writeString(processorTag);
if (failure == null) {
out.writeBoolean(false);
ingestDocument.writeTo(out);
@ -93,7 +91,9 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("processor_id", processorId);
if (processorTag != null) {
builder.field("processor_tag", processorTag);
}
if (failure == null) {
ingestDocument.toXContent(builder, params);
} else {

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.core;
import java.util.Map;
/**
* A processor implementation may modify the data belonging to a document.
* Whether changes are made and what exactly is modified is up to the implementation.
*/
public abstract class AbstractProcessorFactory<P extends Processor> implements Processor.Factory<P> {
static final String PROCESSOR_TAG_KEY = "processor_tag";
@Override
public P create(Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(config, PROCESSOR_TAG_KEY);
return doCreate(tag, config);
}
protected abstract P doCreate(String tag, Map<String, Object> config) throws Exception;
}

View File

@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@ -42,6 +43,7 @@ public class CompoundProcessor implements Processor {
}
public CompoundProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
super();
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
}
@ -56,7 +58,12 @@ public class CompoundProcessor implements Processor {
@Override
public String getType() {
return "compound[" + processors.stream().map(Processor::getType).collect(Collectors.joining(",")) + "]";
return "compound";
}
@Override
public String getTag() {
return "compound-processor-" + Objects.hash(processors, onFailureProcessors);
}
@Override

View File

@ -94,17 +94,17 @@ public final class Pipeline {
}
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(config, fieldName);
List<Processor> onFailureProcessors = new ArrayList<>();
if (onFailureProcessorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : onFailureProcessorConfigs) {
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readOptionalList(config, fieldName);
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
onFailureProcessors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
}
}
}
return onFailureProcessors;
return processors;
}
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
@ -122,7 +122,5 @@ public final class Pipeline {
}
throw new IllegalArgumentException("No processor type exists with name [" + type + "]");
}
}
}

View File

@ -38,6 +38,11 @@ public interface Processor {
*/
String getType();
/**
* Gets the tag of a processor.
*/
String getTag();
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
@ -50,6 +55,5 @@ public interface Processor {
* verify if all configurations settings have been used.
*/
P create(Map<String, Object> config) throws Exception;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
@ -31,9 +32,11 @@ import java.util.Map;
*/
public abstract class AbstractStringProcessor implements Processor {
private final String processorTag;
private final String field;
protected AbstractStringProcessor(String field) {
protected AbstractStringProcessor(String processorTag, String field) {
this.processorTag = processorTag;
this.field = field;
}
@ -50,15 +53,21 @@ public abstract class AbstractStringProcessor implements Processor {
document.setFieldValue(field, process(val));
}
@Override
public String getTag() {
return processorTag;
}
protected abstract String process(String value);
public static abstract class Factory<T extends AbstractStringProcessor> implements Processor.Factory<T> {
public static abstract class Factory<T extends AbstractStringProcessor> extends AbstractProcessorFactory<T> {
@Override
public T create(Map<String, Object> config) throws Exception {
public T doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
return newProcessor(field);
return newProcessor(processorTag, field);
}
protected abstract T newProcessor(String field);
protected abstract T newProcessor(String processorTag, String field);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
@ -36,10 +37,12 @@ public class AppendProcessor implements Processor {
public static final String TYPE = "append";
private final String processorTag;
private final TemplateService.Template field;
private final ValueSource value;
AppendProcessor(TemplateService.Template field, ValueSource value) {
AppendProcessor(String processorTag, TemplateService.Template field, ValueSource value) {
this.processorTag = processorTag;
this.field = field;
this.value = value;
}
@ -62,7 +65,12 @@ public class AppendProcessor implements Processor {
return TYPE;
}
public static final class Factory implements Processor.Factory<AppendProcessor> {
@Override
public String getTag() {
return processorTag;
}
public static final class Factory extends AbstractProcessorFactory<AppendProcessor> {
private final TemplateService templateService;
@ -71,10 +79,10 @@ public class AppendProcessor implements Processor {
}
@Override
public AppendProcessor create(Map<String, Object> config) throws Exception {
public AppendProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Object value = ConfigurationUtils.readObject(config, "value");
return new AppendProcessor(templateService.compile(field), ValueSource.wrap(value, templateService));
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
@ -90,10 +91,12 @@ public class ConvertProcessor implements Processor {
public static final String TYPE = "convert";
private final String processorTag;
private final String field;
private final Type convertType;
ConvertProcessor(String field, Type convertType) {
ConvertProcessor(String processorTag, String field, Type convertType) {
this.processorTag = processorTag;
this.field = field;
this.convertType = convertType;
}
@ -132,12 +135,17 @@ public class ConvertProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<ConvertProcessor> {
@Override
public ConvertProcessor create(Map<String, Object> config) throws Exception {
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<ConvertProcessor> {
@Override
public ConvertProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Type convertType = Type.fromString(ConfigurationUtils.readStringProperty(config, "type"));
return new ConvertProcessor(field, convertType);
return new ConvertProcessor(processorTag, field, convertType);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
@ -39,6 +40,7 @@ public final class DateProcessor implements Processor {
public static final String TYPE = "date";
static final String DEFAULT_TARGET_FIELD = "@timestamp";
private final String processorTag;
private final DateTimeZone timezone;
private final Locale locale;
private final String matchField;
@ -46,7 +48,8 @@ public final class DateProcessor implements Processor {
private final List<String> matchFormats;
private final List<Function<String, DateTime>> dateParsers;
DateProcessor(DateTimeZone timezone, Locale locale, String matchField, List<String> matchFormats, String targetField) {
DateProcessor(String processorTag, DateTimeZone timezone, Locale locale, String matchField, List<String> matchFormats, String targetField) {
this.processorTag = processorTag;
this.timezone = timezone;
this.locale = locale;
this.matchField = matchField;
@ -93,6 +96,11 @@ public final class DateProcessor implements Processor {
return TYPE;
}
@Override
public String getTag() {
return processorTag;
}
DateTimeZone getTimezone() {
return timezone;
}
@ -113,10 +121,10 @@ public final class DateProcessor implements Processor {
return matchFormats;
}
public static class Factory implements Processor.Factory<DateProcessor> {
public static class Factory extends AbstractProcessorFactory<DateProcessor> {
@SuppressWarnings("unchecked")
public DateProcessor create(Map<String, Object> config) throws Exception {
public DateProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "match_field");
String targetField = ConfigurationUtils.readStringProperty(config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(config, "timezone");
@ -131,7 +139,7 @@ public final class DateProcessor implements Processor {
}
}
List<String> matchFormats = ConfigurationUtils.readList(config, "match_formats");
return new DateProcessor(timezone, locale, matchField, matchFormats, targetField);
return new DateProcessor(processorTag, timezone, locale, matchField, matchFormats, targetField);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
@ -37,9 +38,11 @@ public class DeDotProcessor implements Processor {
public static final String TYPE = "dedot";
static final String DEFAULT_SEPARATOR = "_";
private final String processorTag;
private final String separator;
public DeDotProcessor(String separator) {
public DeDotProcessor(String processorTag, String separator) {
this.processorTag = processorTag;
this.separator = separator;
}
@ -57,6 +60,11 @@ public class DeDotProcessor implements Processor {
return TYPE;
}
@Override
public String getTag() {
return processorTag;
}
/**
* Recursively iterates through Maps and Lists in search of map entries with
* keys containing dots. The dots in these fields are replaced with {@link #separator}.
@ -87,15 +95,15 @@ public class DeDotProcessor implements Processor {
}
}
public static class Factory implements Processor.Factory<DeDotProcessor> {
public static class Factory extends AbstractProcessorFactory<DeDotProcessor> {
@Override
public DeDotProcessor create(Map<String, Object> config) throws Exception {
public DeDotProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String separator = ConfigurationUtils.readOptionalStringProperty(config, "separator");
if (separator == null) {
separator = DEFAULT_SEPARATOR;
}
return new DeDotProcessor(separator);
return new DeDotProcessor(processorTag, separator);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
@ -34,9 +35,11 @@ public class FailProcessor implements Processor {
public static final String TYPE = "fail";
private final String processorTag;
private final TemplateService.Template message;
FailProcessor(TemplateService.Template message) {
FailProcessor(String processorTag, TemplateService.Template message) {
this.processorTag = processorTag;
this.message = message;
}
@ -54,7 +57,12 @@ public class FailProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<FailProcessor> {
@Override
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<FailProcessor> {
private final TemplateService templateService;
@ -63,9 +71,9 @@ public class FailProcessor implements Processor {
}
@Override
public FailProcessor create(Map<String, Object> config) throws Exception {
public FailProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String message = ConfigurationUtils.readStringProperty(config, "message");
return new FailProcessor(templateService.compile(message));
return new FailProcessor(processorTag, templateService.compile(message));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
@ -35,11 +36,13 @@ public class GsubProcessor implements Processor {
public static final String TYPE = "gsub";
private final String processorTag;
private final String field;
private final Pattern pattern;
private final String replacement;
GsubProcessor(String field, Pattern pattern, String replacement) {
GsubProcessor(String processorTag, String field, Pattern pattern, String replacement) {
this.processorTag = processorTag;
this.field = field;
this.pattern = pattern;
this.replacement = replacement;
@ -74,14 +77,19 @@ public class GsubProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<GsubProcessor> {
@Override
public GsubProcessor create(Map<String, Object> config) throws Exception {
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<GsubProcessor> {
@Override
public GsubProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String pattern = ConfigurationUtils.readStringProperty(config, "pattern");
String replacement = ConfigurationUtils.readStringProperty(config, "replacement");
Pattern searchPattern = Pattern.compile(pattern);
return new GsubProcessor(field, searchPattern, replacement);
return new GsubProcessor(processorTag, field, searchPattern, replacement);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
@ -35,10 +36,12 @@ public class JoinProcessor implements Processor {
public static final String TYPE = "join";
private final String processorTag;
private final String field;
private final String separator;
JoinProcessor(String field, String separator) {
JoinProcessor(String processorTag, String field, String separator) {
this.processorTag = processorTag;
this.field = field;
this.separator = separator;
}
@ -68,12 +71,17 @@ public class JoinProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<JoinProcessor> {
@Override
public JoinProcessor create(Map<String, Object> config) throws Exception {
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<JoinProcessor> {
@Override
public JoinProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String separator = ConfigurationUtils.readStringProperty(config, "separator");
return new JoinProcessor(field, separator);
return new JoinProcessor(processorTag, field, separator);
}
}
}

View File

@ -30,8 +30,8 @@ public class LowercaseProcessor extends AbstractStringProcessor {
public static final String TYPE = "lowercase";
LowercaseProcessor(String field) {
super(field);
LowercaseProcessor(String processorId, String field) {
super(processorId, field);
}
@Override
@ -46,8 +46,8 @@ public class LowercaseProcessor extends AbstractStringProcessor {
public static class Factory extends AbstractStringProcessor.Factory<LowercaseProcessor> {
@Override
protected LowercaseProcessor newProcessor(String field) {
return new LowercaseProcessor(field);
protected LowercaseProcessor newProcessor(String processorTag, String field) {
return new LowercaseProcessor(processorTag, field);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
@ -33,9 +34,11 @@ public class RemoveProcessor implements Processor {
public static final String TYPE = "remove";
private final String processorTag;
private final TemplateService.Template field;
RemoveProcessor(TemplateService.Template field) {
RemoveProcessor(String processorTag, TemplateService.Template field) {
this.processorTag = processorTag;
this.field = field;
}
@ -53,7 +56,11 @@ public class RemoveProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<RemoveProcessor> {
@Override
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<RemoveProcessor> {
private final TemplateService templateService;
@ -62,9 +69,9 @@ public class RemoveProcessor implements Processor {
}
@Override
public RemoveProcessor create(Map<String, Object> config) throws Exception {
public RemoveProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
return new RemoveProcessor(templateService.compile(field));
return new RemoveProcessor(processorTag, templateService.compile(field));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
@ -32,10 +33,12 @@ public class RenameProcessor implements Processor {
public static final String TYPE = "rename";
private final String processorTag;
private final String oldFieldName;
private final String newFieldName;
RenameProcessor(String oldFieldName, String newFieldName) {
RenameProcessor(String processorTag, String oldFieldName, String newFieldName) {
this.processorTag = processorTag;
this.oldFieldName = oldFieldName;
this.newFieldName = newFieldName;
}
@ -73,12 +76,17 @@ public class RenameProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<RenameProcessor> {
@Override
public RenameProcessor create(Map<String, Object> config) throws Exception {
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<RenameProcessor> {
@Override
public RenameProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
String newField = ConfigurationUtils.readStringProperty(config, "to");
return new RenameProcessor(field, newField);
return new RenameProcessor(processorTag, field, newField);
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
@ -35,10 +36,12 @@ public class SetProcessor implements Processor {
public static final String TYPE = "set";
private final String processorTag;
private final TemplateService.Template field;
private final ValueSource value;
SetProcessor(TemplateService.Template field, ValueSource value) {
SetProcessor(String processorTag, TemplateService.Template field, ValueSource value) {
this.processorTag = processorTag;
this.field = field;
this.value = value;
}
@ -61,7 +64,12 @@ public class SetProcessor implements Processor {
return TYPE;
}
public static final class Factory implements Processor.Factory<SetProcessor> {
@Override
public String getTag() {
return processorTag;
}
public static final class Factory extends AbstractProcessorFactory<SetProcessor> {
private final TemplateService templateService;
@ -70,10 +78,10 @@ public class SetProcessor implements Processor {
}
@Override
public SetProcessor create(Map<String, Object> config) throws Exception {
public SetProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Object value = ConfigurationUtils.readObject(config, "value");
return new SetProcessor(templateService.compile(field), ValueSource.wrap(value, templateService));
return new SetProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.Processor;
@ -35,10 +36,12 @@ public class SplitProcessor implements Processor {
public static final String TYPE = "split";
private final String processorTag;
private final String field;
private final String separator;
SplitProcessor(String field, String separator) {
SplitProcessor(String processorTag, String field, String separator) {
this.processorTag = processorTag;
this.field = field;
this.separator = separator;
}
@ -65,11 +68,16 @@ public class SplitProcessor implements Processor {
return TYPE;
}
public static class Factory implements Processor.Factory<SplitProcessor> {
@Override
public SplitProcessor create(Map<String, Object> config) throws Exception {
public String getTag() {
return processorTag;
}
public static class Factory extends AbstractProcessorFactory<SplitProcessor> {
@Override
public SplitProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
return new SplitProcessor(field, ConfigurationUtils.readStringProperty(config, "separator"));
return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(config, "separator"));
}
}
}

View File

@ -27,8 +27,8 @@ public class TrimProcessor extends AbstractStringProcessor {
public static final String TYPE = "trim";
TrimProcessor(String field) {
super(field);
TrimProcessor(String processorId, String field) {
super(processorId, field);
}
@Override
@ -43,8 +43,8 @@ public class TrimProcessor extends AbstractStringProcessor {
public static class Factory extends AbstractStringProcessor.Factory<TrimProcessor> {
@Override
protected TrimProcessor newProcessor(String field) {
return new TrimProcessor(field);
protected TrimProcessor newProcessor(String processorTag, String field) {
return new TrimProcessor(processorTag, field);
}
}
}

View File

@ -29,8 +29,8 @@ public class UppercaseProcessor extends AbstractStringProcessor {
public static final String TYPE = "uppercase";
UppercaseProcessor(String field) {
super(field);
UppercaseProcessor(String processorTag, String field) {
super(processorTag, field);
}
@Override
@ -45,8 +45,8 @@ public class UppercaseProcessor extends AbstractStringProcessor {
public static class Factory extends AbstractStringProcessor.Factory<UppercaseProcessor> {
@Override
protected UppercaseProcessor newProcessor(String field) {
return new UppercaseProcessor(field);
protected UppercaseProcessor newProcessor(String processorTag, String field) {
return new UppercaseProcessor(processorTag, field);
}
}
}

View File

@ -170,6 +170,11 @@ public class IngestActionFilterTests extends ESTestCase {
public String getType() {
return null;
}
@Override
public String getTag() {
return null;
}
};
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.core.IngestDocument;

View File

@ -19,14 +19,11 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulateExecutionService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.test.ESTestCase;
@ -34,6 +31,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
@ -44,6 +46,8 @@ public class SimulateExecutionServiceTests extends ESTestCase {
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private Pipeline pipeline;
private Processor processor;
private IngestDocument ingestDocument;
@Before
@ -54,6 +58,8 @@ public class SimulateExecutionServiceTests extends ESTestCase {
.build()
);
executionService = new SimulateExecutionService(threadPool);
processor = new TestProcessor("id", "mock", ingestDocument -> {});
pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}
@ -62,21 +68,89 @@ public class SimulateExecutionServiceTests extends ESTestCase {
threadPool.shutdown();
}
public void testExecuteVerboseDocumentSimple() throws Exception {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
executionService.executeVerboseDocument(processor, ingestDocument, processorResultList);
SimulateProcessorResult result = new SimulateProcessorResult("id", ingestDocument);
assertThat(processorResultList.size(), equalTo(1));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag()));
assertThat(processorResultList.get(0).getIngestDocument(), equalTo(result.getIngestDocument()));
assertThat(processorResultList.get(0).getFailure(), nullValue());
}
public void testExecuteVerboseDocumentSimpleException() throws Exception {
RuntimeException exception = new RuntimeException("mock_exception");
TestProcessor processor = new TestProcessor("id", "mock", ingestDocument -> { throw exception; });
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
try {
executionService.executeVerboseDocument(processor, ingestDocument, processorResultList);
fail("should throw exception");
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("mock_exception"));
}
SimulateProcessorResult result = new SimulateProcessorResult("id", exception);
assertThat(processorResultList.size(), equalTo(1));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag()));
assertThat(processorResultList.get(0).getFailure(), equalTo(result.getFailure()));
}
public void testExecuteVerboseDocumentCompoundSuccess() throws Exception {
TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { });
TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { });
Processor compoundProcessor = new CompoundProcessor(processor1, processor2);
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(processorResultList.size(), equalTo(2));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1"));
assertThat(processorResultList.get(0).getIngestDocument(), equalTo(ingestDocument));
assertThat(processorResultList.get(0).getFailure(), nullValue());
assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2"));
assertThat(processorResultList.get(1).getIngestDocument(), equalTo(ingestDocument));
assertThat(processorResultList.get(1).getFailure(), nullValue());
}
public void testExecuteVerboseDocumentCompoundOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { });
TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { throw new RuntimeException("p2_exception"); });
TestProcessor onFailureProcessor1 = new TestProcessor("fail_p1", "mock", ingestDocument -> { });
TestProcessor onFailureProcessor2 = new TestProcessor("fail_p2", "mock", ingestDocument -> { throw new RuntimeException("fail_p2_exception"); });
TestProcessor onFailureProcessor3 = new TestProcessor("fail_p3", "mock", ingestDocument -> { });
CompoundProcessor onFailureCompoundProcessor = new CompoundProcessor(Collections.singletonList(onFailureProcessor2), Collections.singletonList(onFailureProcessor3));
Processor compoundProcessor = new CompoundProcessor(Arrays.asList(processor1, processor2), Arrays.asList(onFailureProcessor1, onFailureCompoundProcessor));
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor1.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor2.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor3.getInvokedCounter(), equalTo(1));
assertThat(processorResultList.size(), equalTo(5));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1"));
assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2"));
assertThat(processorResultList.get(2).getProcessorTag(), equalTo("fail_p1"));
assertThat(processorResultList.get(3).getProcessorTag(), equalTo("fail_p2"));
assertThat(processorResultList.get(4).getProcessorTag(), equalTo("fail_p3"));
}
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("mock", ingestDocument -> {});
TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorId(), equalTo("processor[mock]-0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
@ -86,7 +160,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("mock", ingestDocument -> {});
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
assertThat(processor.getInvokedCounter(), equalTo(2));
@ -97,21 +171,21 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItemWithFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2));
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorId(), equalTo("processor[mock]-0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), instanceOf(RuntimeException.class));
RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;

View File

@ -19,11 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.core.IngestDocument;
@ -94,7 +89,7 @@ public class SimulatePipelineResponseTests extends ESTestCase {
Iterator<SimulateProcessorResult> expectedProcessorResultIterator = expectedSimulateDocumentVerboseResult.getProcessorResults().iterator();
for (SimulateProcessorResult simulateProcessorResult : simulateDocumentVerboseResult.getProcessorResults()) {
SimulateProcessorResult expectedProcessorResult = expectedProcessorResultIterator.next();
assertThat(simulateProcessorResult.getProcessorId(), equalTo(expectedProcessorResult.getProcessorId()));
assertThat(simulateProcessorResult.getProcessorTag(), equalTo(expectedProcessorResult.getProcessorTag()));
assertThat(simulateProcessorResult.getIngestDocument(), equalTo(expectedProcessorResult.getIngestDocument()));
if (expectedProcessorResult.getFailure() == null) {
assertThat(simulateProcessorResult.getFailure(), nullValue());

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.core.IngestDocument;
@ -48,7 +47,7 @@ public class SimulateProcessorResultTests extends ESTestCase {
simulateProcessorResult.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput);
assertThat(otherSimulateProcessorResult.getProcessorId(), equalTo(simulateProcessorResult.getProcessorId()));
assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag()));
assertThat(otherSimulateProcessorResult.getIngestDocument(), equalTo(simulateProcessorResult.getIngestDocument()));
if (isFailure) {
assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));

View File

@ -226,8 +226,8 @@ public class IngestClientIT extends ESIntegTestCase {
}
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor("test", (templateService) -> config ->
new TestProcessor("test", ingestDocument -> {
nodeModule.registerProcessor("test", templateService -> config ->
new TestProcessor("id", "test", ingestDocument -> {
ingestDocument.setFieldValue("processed", true);
if (ingestDocument.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");

View File

@ -71,7 +71,7 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
@ -87,8 +87,8 @@ public class CompoundProcessorTests extends ESTestCase {
}
public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("second", ingestDocument -> {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));

View File

@ -24,26 +24,33 @@ import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class PipelineFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> processorConfig0 = new HashMap<>();
Map<String, Object> processorConfig1 = new HashMap<>();
processorConfig0.put(AbstractProcessorFactory.PROCESSOR_TAG_KEY, "first-processor");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().size(), equalTo(2));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor"));
assertThat(pipeline.getProcessors().get(0).getTag(), equalTo("first-processor"));
assertThat(pipeline.getProcessors().get(1).getType(), equalTo("test-processor"));
assertThat(pipeline.getProcessors().get(1).getTag(), nullValue());
}
public void testCreateWithPipelineOnFailure() throws Exception {
@ -91,6 +98,6 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound[test-processor]"));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound"));
}
}

View File

@ -49,9 +49,12 @@ public class AppendProcessorFactoryTests extends ESTestCase {
value = Arrays.asList("value1", "value2", "value3");
}
config.put("value", value);
AppendProcessor setProcessor = factory.create(config);
assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo(value));
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
AppendProcessor appendProcessor = factory.create(config);
assertThat(appendProcessor.getTag(), equalTo(processorTag));
assertThat(appendProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(appendProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo(value));
}
public void testCreateNoFieldPresent() throws Exception {

View File

@ -158,7 +158,7 @@ public class AppendProcessorTests extends ESTestCase {
private static Processor createAppendProcessor(String fieldName, Object fieldValue) {
TemplateService templateService = TestTemplateService.instance();
return new AppendProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
return new AppendProcessor(randomAsciiOfLength(10), templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
}
private enum Scalar {

View File

@ -36,7 +36,10 @@ public class ConvertProcessorFactoryTests extends ESTestCase {
ConvertProcessor.Type type = randomFrom(ConvertProcessor.Type.values());
config.put("field", "field1");
config.put("type", type.toString());
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
ConvertProcessor convertProcessor = factory.create(config);
assertThat(convertProcessor.getTag(), equalTo(processorTag));
assertThat(convertProcessor.getField(), equalTo("field1"));
assertThat(convertProcessor.getConvertType(), equalTo(type));
}

View File

@ -41,7 +41,7 @@ public class ConvertProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int randomInt = randomInt();
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, randomInt);
Processor processor = new ConvertProcessor(fieldName, Type.INTEGER);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.INTEGER);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, Integer.class), equalTo(randomInt));
}
@ -57,7 +57,7 @@ public class ConvertProcessorTests extends ESTestCase {
expectedList.add(randomInt);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new ConvertProcessor(fieldName, Type.INTEGER);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.INTEGER);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(expectedList));
}
@ -68,7 +68,7 @@ public class ConvertProcessorTests extends ESTestCase {
String value = "string-" + randomAsciiOfLengthBetween(1, 10);
ingestDocument.setFieldValue(fieldName, value);
Processor processor = new ConvertProcessor(fieldName, Type.INTEGER);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.INTEGER);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -84,7 +84,7 @@ public class ConvertProcessorTests extends ESTestCase {
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, randomFloat);
expectedResult.put(fieldName, randomFloat);
Processor processor = new ConvertProcessor(fieldName, Type.FLOAT);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.FLOAT);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, Float.class), equalTo(randomFloat));
}
@ -100,7 +100,7 @@ public class ConvertProcessorTests extends ESTestCase {
expectedList.add(randomFloat);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new ConvertProcessor(fieldName, Type.FLOAT);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.FLOAT);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(expectedList));
}
@ -111,7 +111,7 @@ public class ConvertProcessorTests extends ESTestCase {
String value = "string-" + randomAsciiOfLengthBetween(1, 10);
ingestDocument.setFieldValue(fieldName, value);
Processor processor = new ConvertProcessor(fieldName, Type.FLOAT);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.FLOAT);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -122,8 +122,6 @@ public class ConvertProcessorTests extends ESTestCase {
public void testConvertBoolean() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, Boolean> expectedResult = new HashMap<>();
boolean randomBoolean = randomBoolean();
String booleanString = Boolean.toString(randomBoolean);
if (randomBoolean) {
@ -131,7 +129,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, booleanString);
Processor processor = new ConvertProcessor(fieldName, Type.BOOLEAN);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.BOOLEAN);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, Boolean.class), equalTo(randomBoolean));
}
@ -151,7 +149,7 @@ public class ConvertProcessorTests extends ESTestCase {
expectedList.add(randomBoolean);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new ConvertProcessor(fieldName, Type.BOOLEAN);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.BOOLEAN);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(expectedList));
}
@ -168,7 +166,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
ingestDocument.setFieldValue(fieldName, fieldValue);
Processor processor = new ConvertProcessor(fieldName, Type.BOOLEAN);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.BOOLEAN);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -202,7 +200,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new ConvertProcessor(fieldName, Type.STRING);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.STRING);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(expectedFieldValue));
}
@ -238,7 +236,7 @@ public class ConvertProcessorTests extends ESTestCase {
expectedList.add(randomValueString);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new ConvertProcessor(fieldName, Type.STRING);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, Type.STRING);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(expectedList));
}
@ -247,7 +245,7 @@ public class ConvertProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Type type = randomFrom(Type.values());
Processor processor = new ConvertProcessor(fieldName, type);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), fieldName, type);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -259,7 +257,7 @@ public class ConvertProcessorTests extends ESTestCase {
public void testConvertNullField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null));
Type type = randomFrom(Type.values());
Processor processor = new ConvertProcessor("field", type);
Processor processor = new ConvertProcessor(randomAsciiOfLength(10), "field", type);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");

View File

@ -41,8 +41,10 @@ public class DateProcessorFactoryTests extends ESTestCase {
String sourceField = randomAsciiOfLengthBetween(1, 10);
config.put("match_field", sourceField);
config.put("match_formats", Collections.singletonList("dd/MM/yyyyy"));
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
DateProcessor processor = factory.create(config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getMatchField(), equalTo(sourceField));
assertThat(processor.getTargetField(), equalTo(DateProcessor.DEFAULT_TARGET_FIELD));
assertThat(processor.getMatchFormats(), equalTo(Collections.singletonList("dd/MM/yyyyy")));

View File

@ -38,7 +38,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
public class DateProcessorTests extends ESTestCase {
public void testJodaPattern() {
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.forID("Europe/Amsterdam"), Locale.ENGLISH,
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.forID("Europe/Amsterdam"), Locale.ENGLISH,
"date_as_string", Collections.singletonList("yyyy dd MM hh:mm:ss"), "date_as_date");
Map<String, Object> document = new HashMap<>();
document.put("date_as_string", "2010 12 06 11:05:15");
@ -52,7 +52,7 @@ public class DateProcessorTests extends ESTestCase {
matchFormats.add("yyyy dd MM");
matchFormats.add("dd/MM/yyyy");
matchFormats.add("dd-MM-yyyy");
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.forID("Europe/Amsterdam"), Locale.ENGLISH,
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.forID("Europe/Amsterdam"), Locale.ENGLISH,
"date_as_string", matchFormats, "date_as_date");
Map<String, Object> document = new HashMap<>();
@ -85,7 +85,7 @@ public class DateProcessorTests extends ESTestCase {
}
public void testJodaPatternLocale() {
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.forID("Europe/Amsterdam"), Locale.ITALIAN,
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.forID("Europe/Amsterdam"), Locale.ITALIAN,
"date_as_string", Collections.singletonList("yyyy dd MMM"), "date_as_date");
Map<String, Object> document = new HashMap<>();
document.put("date_as_string", "2010 12 giugno");
@ -95,7 +95,7 @@ public class DateProcessorTests extends ESTestCase {
}
public void testJodaPatternDefaultYear() {
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.forID("Europe/Amsterdam"), Locale.ENGLISH,
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.forID("Europe/Amsterdam"), Locale.ENGLISH,
"date_as_string", Collections.singletonList("dd/MM"), "date_as_date");
Map<String, Object> document = new HashMap<>();
document.put("date_as_string", "12/06");
@ -105,7 +105,7 @@ public class DateProcessorTests extends ESTestCase {
}
public void testTAI64N() {
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.forOffsetHours(2), randomLocale(random()),
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.forOffsetHours(2), randomLocale(random()),
"date_as_string", Collections.singletonList(DateFormat.Tai64n.toString()), "date_as_date");
Map<String, Object> document = new HashMap<>();
String dateAsString = (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024";
@ -116,7 +116,7 @@ public class DateProcessorTests extends ESTestCase {
}
public void testUnixMs() {
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.UTC, randomLocale(random()),
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.UTC, randomLocale(random()),
"date_as_string", Collections.singletonList(DateFormat.UnixMs.toString()), "date_as_date");
Map<String, Object> document = new HashMap<>();
document.put("date_as_string", "1000500");
@ -126,7 +126,7 @@ public class DateProcessorTests extends ESTestCase {
}
public void testUnix() {
DateProcessor dateProcessor = new DateProcessor(DateTimeZone.UTC, randomLocale(random()),
DateProcessor dateProcessor = new DateProcessor(randomAsciiOfLength(10), DateTimeZone.UTC, randomLocale(random()),
"date_as_string", Collections.singletonList(DateFormat.Unix.toString()), "date_as_date");
Map<String, Object> document = new HashMap<>();
document.put("date_as_string", "1000.5");

View File

@ -39,8 +39,11 @@ public class DeDotProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("separator", "_");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
DeDotProcessor deDotProcessor = factory.create(config);
assertThat(deDotProcessor.getSeparator(), equalTo("_"));
assertThat(deDotProcessor.getTag(), equalTo(processorTag));
}
public void testCreateMissingSeparatorField() throws Exception {

View File

@ -37,7 +37,7 @@ public class DeDotProcessorTests extends ESTestCase {
source.put("a.b", "hello world!");
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
String separator = randomUnicodeOfCodepointLengthBetween(1, 10);
Processor processor = new DeDotProcessor(separator);
Processor processor = new DeDotProcessor(randomAsciiOfLength(10), separator);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("a" + separator + "b" ), equalTo("hello world!"));
}
@ -48,7 +48,7 @@ public class DeDotProcessorTests extends ESTestCase {
subField.put("b.c", "hello world!");
source.put("a", subField);
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
Processor processor = new DeDotProcessor("_");
Processor processor = new DeDotProcessor(randomAsciiOfLength(10), "_");
processor.execute(ingestDocument);
IngestDocument expectedDocument = new IngestDocument(
@ -63,7 +63,7 @@ public class DeDotProcessorTests extends ESTestCase {
subField.put("b.c", "hello world!");
source.put("a", Arrays.asList(subField));
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
Processor processor = new DeDotProcessor("_");
Processor processor = new DeDotProcessor(randomAsciiOfLength(10), "_");
processor.execute(ingestDocument);
IngestDocument expectedDocument = new IngestDocument(

View File

@ -42,7 +42,10 @@ public class FailProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("message", "error");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
FailProcessor failProcessor = factory.create(config);
assertThat(failProcessor.getTag(), equalTo(processorTag));
assertThat(failProcessor.getMessage().execute(Collections.emptyMap()), equalTo("error"));
}

View File

@ -32,7 +32,7 @@ public class FailProcessorTests extends ESTestCase {
public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String message = randomAsciiOfLength(10);
Processor processor = new FailProcessor(new TestTemplateService.MockTemplate(message));
Processor processor = new FailProcessor(randomAsciiOfLength(10), new TestTemplateService.MockTemplate(message));
try {
processor.execute(ingestDocument);
fail("fail processor should throw an exception");

View File

@ -35,7 +35,10 @@ public class GsubProcessorFactoryTests extends ESTestCase {
config.put("field", "field1");
config.put("pattern", "\\.");
config.put("replacement", "-");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
GsubProcessor gsubProcessor = factory.create(config);
assertThat(gsubProcessor.getTag(), equalTo(processorTag));
assertThat(gsubProcessor.getField(), equalTo("field1"));
assertThat(gsubProcessor.getPattern().toString(), equalTo("\\."));
assertThat(gsubProcessor.getReplacement(), equalTo("-"));

View File

@ -36,7 +36,7 @@ public class GsubProcessorTests extends ESTestCase {
public void testGsub() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "127.0.0.1");
Processor processor = new GsubProcessor(fieldName, Pattern.compile("\\."), "-");
Processor processor = new GsubProcessor(randomAsciiOfLength(10), fieldName, Pattern.compile("\\."), "-");
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo("127-0-0-1"));
}
@ -45,7 +45,7 @@ public class GsubProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, 123);
Processor processor = new GsubProcessor(fieldName, Pattern.compile("\\."), "-");
Processor processor = new GsubProcessor(randomAsciiOfLength(10), fieldName, Pattern.compile("\\."), "-");
try {
processor.execute(ingestDocument);
fail("processor execution should have failed");
@ -57,7 +57,7 @@ public class GsubProcessorTests extends ESTestCase {
public void testGsubFieldNotFound() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new GsubProcessor(fieldName, Pattern.compile("\\."), "-");
Processor processor = new GsubProcessor(randomAsciiOfLength(10), fieldName, Pattern.compile("\\."), "-");
try {
processor.execute(ingestDocument);
fail("processor execution should have failed");
@ -68,7 +68,7 @@ public class GsubProcessorTests extends ESTestCase {
public void testGsubNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null));
Processor processor = new GsubProcessor("field", Pattern.compile("\\."), "-");
Processor processor = new GsubProcessor(randomAsciiOfLength(10), "field", Pattern.compile("\\."), "-");
try {
processor.execute(ingestDocument);
fail("processor execution should have failed");

View File

@ -33,7 +33,10 @@ public class JoinProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("separator", "-");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
JoinProcessor joinProcessor = factory.create(config);
assertThat(joinProcessor.getTag(), equalTo(processorTag));
assertThat(joinProcessor.getField(), equalTo("field1"));
assertThat(joinProcessor.getSeparator(), equalTo("-"));
}

View File

@ -51,7 +51,7 @@ public class JoinProcessorTests extends ESTestCase {
}
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new JoinProcessor(fieldName, separator);
Processor processor = new JoinProcessor(randomAsciiOfLength(10), fieldName, separator);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(expectedResult));
}
@ -71,7 +71,7 @@ public class JoinProcessorTests extends ESTestCase {
}
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new JoinProcessor(fieldName, separator);
Processor processor = new JoinProcessor(randomAsciiOfLength(10), fieldName, separator);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(expectedResult));
}
@ -80,7 +80,7 @@ public class JoinProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, randomAsciiOfLengthBetween(1, 10));
Processor processor = new JoinProcessor(fieldName, "-");
Processor processor = new JoinProcessor(randomAsciiOfLength(10), fieldName, "-");
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
@ -91,7 +91,7 @@ public class JoinProcessorTests extends ESTestCase {
public void testJoinNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new JoinProcessor(fieldName, "-");
Processor processor = new JoinProcessor(randomAsciiOfLength(10), fieldName, "-");
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
@ -101,7 +101,7 @@ public class JoinProcessorTests extends ESTestCase {
public void testJoinNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null));
Processor processor = new JoinProcessor("field", "-");
Processor processor = new JoinProcessor(randomAsciiOfLength(10), "field", "-");
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {

View File

@ -32,7 +32,10 @@ public class LowercaseProcessorFactoryTests extends ESTestCase {
LowercaseProcessor.Factory factory = new LowercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
LowercaseProcessor uppercaseProcessor = factory.create(config);
assertThat(uppercaseProcessor.getTag(), equalTo(processorTag));
assertThat(uppercaseProcessor.getField(), equalTo("field1"));
}

View File

@ -24,7 +24,7 @@ import java.util.Locale;
public class LowercaseProcessorTests extends AbstractStringProcessorTestCase {
@Override
protected AbstractStringProcessor newProcessor(String field) {
return new LowercaseProcessor(field);
return new LowercaseProcessor(randomAsciiOfLength(10), field);
}
@Override

View File

@ -41,7 +41,10 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
RemoveProcessor removeProcessor = factory.create(config);
assertThat(removeProcessor.getTag(), equalTo(processorTag));
assertThat(removeProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
}

View File

@ -35,7 +35,7 @@ public class RemoveProcessorTests extends ESTestCase {
public void testRemoveFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String field = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Processor processor = new RemoveProcessor(new TestTemplateService.MockTemplate(field));
Processor processor = new RemoveProcessor(randomAsciiOfLength(10), new TestTemplateService.MockTemplate(field));
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(field), equalTo(false));
}
@ -43,7 +43,7 @@ public class RemoveProcessorTests extends ESTestCase {
public void testRemoveNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new RemoveProcessor(new TestTemplateService.MockTemplate(fieldName));
Processor processor = new RemoveProcessor(randomAsciiOfLength(10), new TestTemplateService.MockTemplate(fieldName));
try {
processor.execute(ingestDocument);
fail("remove field should have failed");

View File

@ -33,7 +33,10 @@ public class RenameProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "old_field");
config.put("to", "new_field");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
RenameProcessor renameProcessor = factory.create(config);
assertThat(renameProcessor.getTag(), equalTo(processorTag));
assertThat(renameProcessor.getOldFieldName(), equalTo("old_field"));
assertThat(renameProcessor.getNewFieldName(), equalTo("new_field"));
}

View File

@ -44,7 +44,7 @@ public class RenameProcessorTests extends ESTestCase {
do {
newFieldName = RandomDocumentPicks.randomFieldName(random());
} while (RandomDocumentPicks.canAddField(newFieldName, ingestDocument) == false || newFieldName.equals(fieldName));
Processor processor = new RenameProcessor(fieldName, newFieldName);
Processor processor = new RenameProcessor(randomAsciiOfLength(10), fieldName, newFieldName);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(newFieldName, Object.class), equalTo(fieldValue));
}
@ -62,7 +62,7 @@ public class RenameProcessorTests extends ESTestCase {
document.put("one", one);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Processor processor = new RenameProcessor("list.0", "item");
Processor processor = new RenameProcessor(randomAsciiOfLength(10), "list.0", "item");
processor.execute(ingestDocument);
Object actualObject = ingestDocument.getSourceAndMetadata().get("list");
assertThat(actualObject, instanceOf(List.class));
@ -75,7 +75,7 @@ public class RenameProcessorTests extends ESTestCase {
assertThat(actualObject, instanceOf(String.class));
assertThat(actualObject, equalTo("item1"));
processor = new RenameProcessor("list.0", "list.3");
processor = new RenameProcessor(randomAsciiOfLength(10), "list.0", "list.3");
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -90,7 +90,7 @@ public class RenameProcessorTests extends ESTestCase {
public void testRenameNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new RenameProcessor(fieldName, RandomDocumentPicks.randomFieldName(random()));
Processor processor = new RenameProcessor(randomAsciiOfLength(10), fieldName, RandomDocumentPicks.randomFieldName(random()));
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -102,7 +102,7 @@ public class RenameProcessorTests extends ESTestCase {
public void testRenameNewFieldAlreadyExists() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Processor processor = new RenameProcessor(RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument), fieldName);
Processor processor = new RenameProcessor(randomAsciiOfLength(10), RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument), fieldName);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -116,7 +116,7 @@ public class RenameProcessorTests extends ESTestCase {
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, null);
String newFieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new RenameProcessor(fieldName, newFieldName);
Processor processor = new RenameProcessor(randomAsciiOfLength(10), fieldName, newFieldName);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(false));
assertThat(ingestDocument.hasField(newFieldName), equalTo(true));
@ -136,7 +136,7 @@ public class RenameProcessorTests extends ESTestCase {
source.put("list", Collections.singletonList("item"));
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
Processor processor = new RenameProcessor("list", "new_field");
Processor processor = new RenameProcessor(randomAsciiOfLength(10), "list", "new_field");
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
@ -160,7 +160,7 @@ public class RenameProcessorTests extends ESTestCase {
source.put("list", Collections.singletonList("item"));
IngestDocument ingestDocument = new IngestDocument(source, Collections.emptyMap());
Processor processor = new RenameProcessor("list", "new_field");
Processor processor = new RenameProcessor(randomAsciiOfLength(10), "list", "new_field");
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");

View File

@ -42,7 +42,10 @@ public class SetProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("value", "value1");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
SetProcessor setProcessor = factory.create(config);
assertThat(setProcessor.getTag(), equalTo(processorTag));
assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo("value1"));
}

View File

@ -78,6 +78,6 @@ public class SetProcessorTests extends ESTestCase {
private static Processor createSetProcessor(String fieldName, Object fieldValue) {
TemplateService templateService = TestTemplateService.instance();
return new SetProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
return new SetProcessor(randomAsciiOfLength(10), templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
}
}

View File

@ -33,7 +33,10 @@ public class SplitProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("separator", "\\.");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
SplitProcessor splitProcessor = factory.create(config);
assertThat(splitProcessor.getTag(), equalTo(processorTag));
assertThat(splitProcessor.getField(), equalTo("field1"));
assertThat(splitProcessor.getSeparator(), equalTo("\\."));
}

View File

@ -37,7 +37,7 @@ public class SplitProcessorTests extends ESTestCase {
public void testSplit() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "127.0.0.1");
Processor processor = new SplitProcessor(fieldName, "\\.");
Processor processor = new SplitProcessor(randomAsciiOfLength(10), fieldName, "\\.");
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, List.class), equalTo(Arrays.asList("127", "0", "0", "1")));
}
@ -45,7 +45,7 @@ public class SplitProcessorTests extends ESTestCase {
public void testSplitFieldNotFound() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new SplitProcessor(fieldName, "\\.");
Processor processor = new SplitProcessor(randomAsciiOfLength(10), fieldName, "\\.");
try {
processor.execute(ingestDocument);
fail("split processor should have failed");
@ -56,7 +56,7 @@ public class SplitProcessorTests extends ESTestCase {
public void testSplitNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null));
Processor processor = new SplitProcessor("field", "\\.");
Processor processor = new SplitProcessor(randomAsciiOfLength(10), "field", "\\.");
try {
processor.execute(ingestDocument);
fail("split processor should have failed");
@ -69,7 +69,7 @@ public class SplitProcessorTests extends ESTestCase {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, randomInt());
Processor processor = new SplitProcessor(fieldName, "\\.");
Processor processor = new SplitProcessor(randomAsciiOfLength(10), fieldName, "\\.");
try {
processor.execute(ingestDocument);
fail("split processor should have failed");

View File

@ -32,7 +32,10 @@ public class TrimProcessorFactoryTests extends ESTestCase {
TrimProcessor.Factory factory = new TrimProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
TrimProcessor uppercaseProcessor = factory.create(config);
assertThat(uppercaseProcessor.getTag(), equalTo(processorTag));
assertThat(uppercaseProcessor.getField(), equalTo("field1"));
}

View File

@ -23,7 +23,7 @@ public class TrimProcessorTests extends AbstractStringProcessorTestCase {
@Override
protected AbstractStringProcessor newProcessor(String field) {
return new TrimProcessor(field);
return new TrimProcessor(randomAsciiOfLength(10), field);
}
@Override

View File

@ -32,7 +32,10 @@ public class UppercaseProcessorFactoryTests extends ESTestCase {
UppercaseProcessor.Factory factory = new UppercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
UppercaseProcessor uppercaseProcessor = factory.create(config);
assertThat(uppercaseProcessor.getTag(), equalTo(processorTag));
assertThat(uppercaseProcessor.getField(), equalTo("field1"));
}

View File

@ -25,7 +25,7 @@ public class UppercaseProcessorTests extends AbstractStringProcessorTestCase {
@Override
protected AbstractStringProcessor newProcessor(String field) {
return new UppercaseProcessor(field);
return new UppercaseProcessor(randomAsciiOfLength(10), field);
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.grok;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
@ -30,10 +31,12 @@ public final class GrokProcessor implements Processor {
public static final String TYPE = "grok";
private final String processorTag;
private final String matchField;
private final Grok grok;
public GrokProcessor(Grok grok, String matchField) {
public GrokProcessor(String processorTag, Grok grok, String matchField) {
this.processorTag = processorTag;
this.matchField = matchField;
this.grok = grok;
}
@ -54,6 +57,11 @@ public final class GrokProcessor implements Processor {
return TYPE;
}
@Override
public String getTag() {
return processorTag;
}
String getMatchField() {
return matchField;
}
@ -62,7 +70,7 @@ public final class GrokProcessor implements Processor {
return grok;
}
public final static class Factory implements Processor.Factory<GrokProcessor> {
public final static class Factory extends AbstractProcessorFactory<GrokProcessor> {
private final Map<String, String> builtinPatterns;
@ -70,7 +78,8 @@ public final class GrokProcessor implements Processor {
this.builtinPatterns = builtinPatterns;
}
public GrokProcessor create(Map<String, Object> config) throws Exception {
@Override
public GrokProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "field");
String matchPattern = ConfigurationUtils.readStringProperty(config, "pattern");
Map<String, String> customPatternBank = ConfigurationUtils.readOptionalMap(config, "pattern_definitions");
@ -80,7 +89,7 @@ public final class GrokProcessor implements Processor {
}
Grok grok = new Grok(patternBank, matchPattern);
return new GrokProcessor(grok, matchField);
return new GrokProcessor(processorTag, grok, matchField);
}
}

View File

@ -36,7 +36,10 @@ public class GrokProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("pattern", "(?<foo>\\w+)");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
GrokProcessor processor = factory.create(config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getMatchField(), equalTo("_field"));
assertThat(processor.getGrok(), notNullValue());
}

View File

@ -38,7 +38,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, "1");
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
GrokProcessor processor = new GrokProcessor(grok, fieldName);
GrokProcessor processor = new GrokProcessor(randomAsciiOfLength(10), grok, fieldName);
processor.execute(doc);
assertThat(doc.getFieldValue("one", String.class), equalTo("1"));
}
@ -48,7 +48,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, "23");
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
GrokProcessor processor = new GrokProcessor(grok, fieldName);
GrokProcessor processor = new GrokProcessor(randomAsciiOfLength(10), grok, fieldName);
try {
processor.execute(doc);
fail();
@ -63,7 +63,7 @@ public class GrokProcessorTests extends ESTestCase {
originalDoc.setFieldValue(fieldName, fieldName);
IngestDocument doc = new IngestDocument(originalDoc);
Grok grok = new Grok(Collections.emptyMap(), fieldName);
GrokProcessor processor = new GrokProcessor(grok, fieldName);
GrokProcessor processor = new GrokProcessor(randomAsciiOfLength(10), grok, fieldName);
processor.execute(doc);
assertThat(doc, equalTo(originalDoc));
}
@ -73,7 +73,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, 1);
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
GrokProcessor processor = new GrokProcessor(grok, fieldName);
GrokProcessor processor = new GrokProcessor(randomAsciiOfLength(10), grok, fieldName);
try {
processor.execute(doc);
fail();
@ -86,7 +86,7 @@ public class GrokProcessorTests extends ESTestCase {
String fieldName = "foo.bar";
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
GrokProcessor processor = new GrokProcessor(grok, fieldName);
GrokProcessor processor = new GrokProcessor(randomAsciiOfLength(10), grok, fieldName);
try {
processor.execute(doc);
fail();

View File

@ -32,6 +32,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
@ -56,12 +57,14 @@ public final class GeoIpProcessor implements Processor {
public static final String TYPE = "geoip";
private final String processorTag;
private final String sourceField;
private final String targetField;
private final DatabaseReader dbReader;
private final Set<Field> fields;
GeoIpProcessor(String sourceField, DatabaseReader dbReader, String targetField, Set<Field> fields) throws IOException {
GeoIpProcessor(String processorTag, String sourceField, DatabaseReader dbReader, String targetField, Set<Field> fields) throws IOException {
this.processorTag = processorTag;
this.sourceField = sourceField;
this.targetField = targetField;
this.dbReader = dbReader;
@ -100,6 +103,11 @@ public final class GeoIpProcessor implements Processor {
return TYPE;
}
@Override
public String getTag() {
return processorTag;
}
String getSourceField() {
return sourceField;
}
@ -215,7 +223,7 @@ public final class GeoIpProcessor implements Processor {
return geoData;
}
public static final class Factory implements Processor.Factory<GeoIpProcessor>, Closeable {
public static final class Factory extends AbstractProcessorFactory<GeoIpProcessor> implements Closeable {
static final Set<Field> DEFAULT_FIELDS = EnumSet.of(
Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION
@ -227,7 +235,8 @@ public final class GeoIpProcessor implements Processor {
this.databaseReaders = databaseReaders;
}
public GeoIpProcessor create(Map<String, Object> config) throws Exception {
@Override
public GeoIpProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String ipField = readStringProperty(config, "source_field");
String targetField = readStringProperty(config, "target_field", "geoip");
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb");
@ -251,7 +260,7 @@ public final class GeoIpProcessor implements Processor {
if (databaseReader == null) {
throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist");
}
return new GeoIpProcessor(ipField, databaseReader, targetField, fields);
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, fields);
}
@Override

View File

@ -60,7 +60,11 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
String processorTag = randomAsciiOfLength(10);
config.put("processor_tag", processorTag);
GeoIpProcessor processor = factory.create(config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getSourceField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City"));

View File

@ -37,7 +37,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCity() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
GeoIpProcessor processor = new GeoIpProcessor(randomAsciiOfLength(10), "source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "82.170.213.79");
@ -62,7 +62,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCountry() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-Country.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
GeoIpProcessor processor = new GeoIpProcessor(randomAsciiOfLength(10), "source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "82.170.213.79");
@ -81,7 +81,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testAddressIsNotInTheDatabase() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
GeoIpProcessor processor = new GeoIpProcessor(randomAsciiOfLength(10), "source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "202.45.11.11");
@ -95,7 +95,7 @@ public class GeoIpProcessorTests extends ESTestCase {
/** Don't silently do DNS lookups or anything trappy on bogus data */
public void testInvalid() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
GeoIpProcessor processor = new GeoIpProcessor(randomAsciiOfLength(10), "source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "www.google.com");

View File

@ -69,6 +69,39 @@
}
- length: { docs: 1 }
---
"Test simulate with provided invalid pipeline definition":
- do:
catch: request
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"set" : {
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { error: 3 }
- match: { status: 400 }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "required property [field] is missing" }
---
"Test simulate without index type and id":
- do:
@ -173,6 +206,7 @@
"processors": [
{
"set" : {
"processor_tag" : "processor[set]-0",
"field" : "field2",
"value" : "_value"
}
@ -198,7 +232,7 @@
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.processor_id: "processor[set]-0" }
- match: { docs.0.processor_results.0.processor_tag: "processor[set]-0" }
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
- match: { docs.0.processor_results.0.doc._source.field2: "_value" }
@ -297,15 +331,10 @@
]
}
- length: { docs: 2 }
- length: { docs.0.processor_results: 2 }
- length: { docs.0.processor_results: 1 }
- match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.doc._index: "index" }
- match: { docs.0.processor_results.1.doc._type: "type" }
- match: { docs.0.processor_results.1.doc._id: "id" }
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
- match: { docs.0.processor_results.1.doc._source.bar: "HELLO" }
- length: { docs.0.processor_results.1.doc._ingest: 1 }
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
- length: { docs.1.processor_results: 2 }
- match: { docs.1.processor_results.0.doc._index: "index" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- length: { docs.1.processor_results.0.doc._ingest: 1 }
@ -315,3 +344,78 @@
- length: { docs.1.processor_results.1.doc._ingest: 1 }
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
---
"Test verbose simulate with on_failure":
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline" : {
"description": "_description",
"processors": [
{
"set" : {
"processor_tag" : "setstatus-1",
"field" : "status",
"value" : 200
}
},
{
"rename" : {
"processor_tag" : "rename-1",
"field" : "foofield",
"to" : "field1",
"on_failure" : [
{
"set" : {
"processor_tag" : "set on_failure rename",
"field" : "foofield",
"value" : "exists"
}
},
{
"rename" : {
"field" : "foofield2",
"to" : "field1",
"on_failure" : [
{
"set" : {
"field" : "foofield2",
"value" : "ran"
}
}
]
}
}
]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"field1": "123.42 400 <foo>"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 5 }
- match: { docs.0.processor_results.0.processor_tag: "setstatus-1" }
- match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.0.doc._source.status: 200 }
- match: { docs.0.processor_results.1.processor_tag: "rename-1" }
- match: { docs.0.processor_results.1.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.error.reason: "field [foofield] doesn't exist" }
- match: { docs.0.processor_results.2.processor_tag: "set on_failure rename" }
- is_false: docs.0.processor_results.3.processor_tag
- is_false: docs.0.processor_results.4.processor_tag
- match: { docs.0.processor_results.4.doc._source.foofield: "exists" }
- match: { docs.0.processor_results.4.doc._source.foofield2: "ran" }
- match: { docs.0.processor_results.4.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.4.doc._source.status: 200 }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
@ -33,16 +34,18 @@ import java.util.function.Consumer;
public class TestProcessor implements Processor {
private final String type;
private final String tag;
private final Consumer<IngestDocument> ingestDocumentConsumer;
private final AtomicInteger invokedCounter = new AtomicInteger();
public TestProcessor(Consumer<IngestDocument> ingestDocumentConsumer) {
this("test-processor", ingestDocumentConsumer);
this(null, "test-processor", ingestDocumentConsumer);
}
public TestProcessor(String type, Consumer<IngestDocument> ingestDocumentConsumer) {
public TestProcessor(String tag, String type, Consumer<IngestDocument> ingestDocumentConsumer) {
this.ingestDocumentConsumer = ingestDocumentConsumer;
this.type = type;
this.tag = tag;
}
@Override
@ -56,14 +59,19 @@ public class TestProcessor implements Processor {
return type;
}
@Override
public String getTag() {
return tag;
}
public int getInvokedCounter() {
return invokedCounter.get();
}
public static final class Factory implements Processor.Factory<TestProcessor> {
public static final class Factory extends AbstractProcessorFactory<TestProcessor> {
@Override
public TestProcessor create(Map<String, Object> config) throws Exception {
return new TestProcessor(ingestDocument -> {});
public TestProcessor doCreate(String processorId, Map<String, Object> config) throws Exception {
return new TestProcessor(processorId, "test-processor", ingestDocument -> {});
}
}
}