diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java similarity index 80% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java rename to plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 3c3a25f84af..27606090d29 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -24,26 +24,26 @@ import org.elasticsearch.common.Strings; import java.util.*; /** - * Represents the data and meta data (like id and type) of a single document that is going to be indexed. + * Represents a single document being captured before indexing and holds the source and meta data (like id, type and index). */ -public final class Data { +public final class IngestDocument { - private final String index; - private final String type; - private final String id; - private final Map document; + private final Map metaData; + private final Map source; private boolean modified = false; - public Data(String index, String type, String id, Map document) { - this.index = index; - this.type = type; - this.id = id; - this.document = document; + public IngestDocument(String index, String type, String id, Map source) { + this.metaData = new HashMap<>(); + this.metaData.put("_index", index); + this.metaData.put("_type", type); + this.metaData.put("_id", id); + this.source = source; } - public Data(Data other) { - this(other.index, other.type, other.id, new HashMap<>(other.document)); + public IngestDocument(IngestDocument other) { + this.metaData = new HashMap<>(other.metaData); + this.source = new HashMap<>(other.source); } /** @@ -116,7 +116,7 @@ public final class Data { } private Map getParent(String[] pathElements) { - Map innerMap = document; + Map innerMap = source; for (int i = 0; i < pathElements.length - 1; i++) { Object obj = innerMap.get(pathElements[i]); if (obj instanceof Map) { @@ -143,7 +143,7 @@ public final class Data { String[] pathElements = Strings.splitStringToArray(path, '.'); assert pathElements.length > 0; - Map inner = document; + Map inner = source; for (int i = 0; i < pathElements.length - 1; i++) { String pathElement = pathElements[i]; if (inner.containsKey(pathElement)) { @@ -169,16 +169,8 @@ public final class Data { modified = true; } - public String getIndex() { - return index; - } - - public String getType() { - return type; - } - - public String getId() { - return id; + public String getMetadata(MetaData metaData) { + return this.metaData.get(metaData.getName()); } /** @@ -186,8 +178,8 @@ public final class Data { * not be reflected to the modified flag. Modify the document instead using {@link #setPropertyValue(String, Object)} * and {@link #removeProperty(String)} */ - public Map getDocument() { - return document; + public Map getSource() { + return source; } public boolean isModified() { @@ -201,15 +193,35 @@ public final class Data { return false; } - Data other = (Data) obj; - return Objects.equals(document, other.document) && - Objects.equals(index, other.index) && - Objects.equals(type, other.type) && - Objects.equals(id, other.id); + IngestDocument other = (IngestDocument) obj; + return Objects.equals(source, other.source) && + Objects.equals(metaData, other.metaData); } @Override public int hashCode() { - return Objects.hash(index, type, id, document); + return Objects.hash(metaData, source); } + + public enum MetaData { + + INDEX("_index"), + TYPE("_type"), + ID("_id"), + ROUTING("_routing"), + PARENT("_parent"), + TIMESTAMP("_timestamp"), + TTL("_ttl"); + + private final String name; + + MetaData(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java index 7b44f7d5a7f..12575c731ee 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -44,9 +44,9 @@ public final class Pipeline { /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ - public void execute(Data data) { + public void execute(IngestDocument ingestDocument) { for (Processor processor : processors) { - processor.execute(data); + processor.execute(ingestDocument); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java index fc268b2b128..6e7d276876c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.processor; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import java.io.Closeable; import java.io.IOException; @@ -36,7 +36,7 @@ public interface Processor { /** * Introspect and potentially modify the incoming data. */ - void execute(Data data); + void execute(IngestDocument ingestDocument); /** * Gets the type of a processor diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java index ead4e0a1b1c..6f2016fc5c5 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/date/DateProcessor.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.processor.date; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.Processor; import org.joda.time.DateTime; @@ -56,8 +56,8 @@ public final class DateProcessor implements Processor { } @Override - public void execute(Data data) { - String value = data.getPropertyValue(matchField, String.class); + public void execute(IngestDocument ingestDocument) { + String value = ingestDocument.getPropertyValue(matchField, String.class); // TODO(talevy): handle custom timestamp fields DateTime dateTime = null; @@ -75,7 +75,7 @@ public final class DateProcessor implements Processor { throw new IllegalArgumentException("unable to parse date [" + value + "]", lastException); } - data.setPropertyValue(targetField, ISODateTimeFormat.dateTime().print(dateTime)); + ingestDocument.setPropertyValue(targetField, ISODateTimeFormat.dateTime().print(dateTime)); } @Override diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java index 629a7b59837..400632e828c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java @@ -26,7 +26,7 @@ import com.maxmind.geoip2.model.CountryResponse; import com.maxmind.geoip2.record.*; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.network.NetworkAddress; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.processor.Processor; import java.io.IOException; @@ -60,8 +60,8 @@ public final class GeoIpProcessor implements Processor { } @Override - public void execute(Data data) { - String ip = data.getPropertyValue(sourceField, String.class); + public void execute(IngestDocument ingestDocument) { + String ip = ingestDocument.getPropertyValue(sourceField, String.class); final InetAddress ipAddress; try { ipAddress = InetAddress.getByName(ip); @@ -88,7 +88,7 @@ public final class GeoIpProcessor implements Processor { default: throw new IllegalStateException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType() + "]"); } - data.setPropertyValue(targetField, geoData); + ingestDocument.setPropertyValue(targetField, geoData); } @Override diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java index 58b999c8f78..31440ad9e53 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.processor.grok; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.Processor; @@ -45,13 +45,13 @@ public final class GrokProcessor implements Processor { } @Override - public void execute(Data data) { - Object field = data.getPropertyValue(matchField, Object.class); + public void execute(IngestDocument ingestDocument) { + Object field = ingestDocument.getPropertyValue(matchField, Object.class); // TODO(talevy): handle invalid field types if (field instanceof String) { Map matches = grok.captures((String) field); if (matches != null) { - matches.forEach((k, v) -> data.setPropertyValue(k, v)); + matches.forEach((k, v) -> ingestDocument.setPropertyValue(k, v)); } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java index 3ae1fb98cf2..ee121b61a4b 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.processor.mutate; import org.elasticsearch.common.Booleans; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.Processor; @@ -101,36 +101,36 @@ public final class MutateProcessor implements Processor { } @Override - public void execute(Data data) { + public void execute(IngestDocument ingestDocument) { if (update != null) { - doUpdate(data); + doUpdate(ingestDocument); } if (rename != null) { - doRename(data); + doRename(ingestDocument); } if (convert != null) { - doConvert(data); + doConvert(ingestDocument); } if (split != null) { - doSplit(data); + doSplit(ingestDocument); } if (gsub != null) { - doGsub(data); + doGsub(ingestDocument); } if (join != null) { - doJoin(data); + doJoin(ingestDocument); } if (remove != null) { - doRemove(data); + doRemove(ingestDocument); } if (trim != null) { - doTrim(data); + doTrim(ingestDocument); } if (uppercase != null) { - doUppercase(data); + doUppercase(ingestDocument); } if (lowercase != null) { - doLowercase(data); + doLowercase(ingestDocument); } } @@ -139,18 +139,18 @@ public final class MutateProcessor implements Processor { return TYPE; } - private void doUpdate(Data data) { + private void doUpdate(IngestDocument ingestDocument) { for(Map.Entry entry : update.entrySet()) { - data.setPropertyValue(entry.getKey(), entry.getValue()); + ingestDocument.setPropertyValue(entry.getKey(), entry.getValue()); } } - private void doRename(Data data) { + private void doRename(IngestDocument ingestDocument) { for(Map.Entry entry : rename.entrySet()) { - if (data.hasPropertyValue(entry.getKey())) { - Object oldVal = data.getPropertyValue(entry.getKey(), Object.class); - data.getDocument().remove(entry.getKey()); - data.setPropertyValue(entry.getValue(), oldVal); + if (ingestDocument.hasPropertyValue(entry.getKey())) { + Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class); + ingestDocument.getSource().remove(entry.getKey()); + ingestDocument.setPropertyValue(entry.getValue(), oldVal); } } } @@ -175,11 +175,11 @@ public final class MutateProcessor implements Processor { } @SuppressWarnings("unchecked") - private void doConvert(Data data) { + private void doConvert(IngestDocument ingestDocument) { for(Map.Entry entry : convert.entrySet()) { String toType = entry.getValue(); - Object oldVal = data.getPropertyValue(entry.getKey(), Object.class); + Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class); Object newVal; if (oldVal instanceof List) { @@ -194,91 +194,91 @@ public final class MutateProcessor implements Processor { newVal = parseValueAsType(oldVal, toType); } - data.setPropertyValue(entry.getKey(), newVal); + ingestDocument.setPropertyValue(entry.getKey(), newVal); } } - private void doSplit(Data data) { + private void doSplit(IngestDocument ingestDocument) { for(Map.Entry entry : split.entrySet()) { - Object oldVal = data.getPropertyValue(entry.getKey(), Object.class); + Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class); if (oldVal == null) { throw new IllegalArgumentException("Cannot split field. [" + entry.getKey() + "] is null."); } else if (oldVal instanceof String) { - data.setPropertyValue(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue()))); + ingestDocument.setPropertyValue(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue()))); } else { throw new IllegalArgumentException("Cannot split a field that is not a String type"); } } } - private void doGsub(Data data) { + private void doGsub(IngestDocument ingestDocument) { for (GsubExpression gsubExpression : gsub) { - String oldVal = data.getPropertyValue(gsubExpression.getFieldName(), String.class); + String oldVal = ingestDocument.getPropertyValue(gsubExpression.getFieldName(), String.class); if (oldVal == null) { throw new IllegalArgumentException("Field \"" + gsubExpression.getFieldName() + "\" is null, cannot match pattern."); } Matcher matcher = gsubExpression.getPattern().matcher(oldVal); String newVal = matcher.replaceAll(gsubExpression.getReplacement()); - data.setPropertyValue(gsubExpression.getFieldName(), newVal); + ingestDocument.setPropertyValue(gsubExpression.getFieldName(), newVal); } } @SuppressWarnings("unchecked") - private void doJoin(Data data) { + private void doJoin(IngestDocument ingestDocument) { for(Map.Entry entry : join.entrySet()) { - Object oldVal = data.getPropertyValue(entry.getKey(), Object.class); + Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class); if (oldVal instanceof List) { String joined = (String) ((List) oldVal) .stream() .map(Object::toString) .collect(Collectors.joining(entry.getValue())); - data.setPropertyValue(entry.getKey(), joined); + ingestDocument.setPropertyValue(entry.getKey(), joined); } else { throw new IllegalArgumentException("Cannot join field:" + entry.getKey() + " with type: " + oldVal.getClass()); } } } - private void doRemove(Data data) { + private void doRemove(IngestDocument ingestDocument) { for(String field : remove) { - data.getDocument().remove(field); + ingestDocument.getSource().remove(field); } } - private void doTrim(Data data) { + private void doTrim(IngestDocument ingestDocument) { for(String field : trim) { - Object val = data.getPropertyValue(field, Object.class); + Object val = ingestDocument.getPropertyValue(field, Object.class); if (val == null) { throw new IllegalArgumentException("Cannot trim field. [" + field + "] is null."); } else if (val instanceof String) { - data.setPropertyValue(field, ((String) val).trim()); + ingestDocument.setPropertyValue(field, ((String) val).trim()); } else { throw new IllegalArgumentException("Cannot trim field:" + field + " with type: " + val.getClass()); } } } - private void doUppercase(Data data) { + private void doUppercase(IngestDocument ingestDocument) { for(String field : uppercase) { - Object val = data.getPropertyValue(field, Object.class); + Object val = ingestDocument.getPropertyValue(field, Object.class); if (val == null) { throw new IllegalArgumentException("Cannot uppercase field. [" + field + "] is null."); } else if (val instanceof String) { - data.setPropertyValue(field, ((String) val).toUpperCase(Locale.ROOT)); + ingestDocument.setPropertyValue(field, ((String) val).toUpperCase(Locale.ROOT)); } else { throw new IllegalArgumentException("Cannot uppercase field:" + field + " with type: " + val.getClass()); } } } - private void doLowercase(Data data) { + private void doLowercase(IngestDocument ingestDocument) { for(String field : lowercase) { - Object val = data.getPropertyValue(field, Object.class); + Object val = ingestDocument.getPropertyValue(field, Object.class); if (val == null) { throw new IllegalArgumentException("Cannot lowercase field. [" + field + "] is null."); } else if (val instanceof String) { - data.setPropertyValue(field, ((String) val).toLowerCase(Locale.ROOT)); + ingestDocument.setPropertyValue(field, ((String) val).toLowerCase(Locale.ROOT)); } else { throw new IllegalArgumentException("Cannot lowercase field:" + field + " with type: " + val.getClass()); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index 18d656813ec..82e3a403fd1 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.threadpool.ThreadPool; @@ -40,7 +40,7 @@ public class PipelineExecutionService { this.threadPool = threadPool; } - public void execute(Data data, String pipelineId, Listener listener) { + public void execute(IngestDocument ingestDocument, String pipelineId, Listener listener) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId))); @@ -51,8 +51,8 @@ public class PipelineExecutionService { @Override public void run() { try { - pipeline.execute(data); - listener.executed(data); + pipeline.execute(ingestDocument); + listener.executed(ingestDocument); } catch (Exception e) { listener.failed(e); } @@ -62,7 +62,7 @@ public class PipelineExecutionService { public interface Listener { - void executed(Data data); + void executed(IngestDocument ingestDocument); void failed(Exception e); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index 3483544a8ce..37099df9b5f 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -29,7 +29,7 @@ import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; @@ -82,12 +82,12 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte } Map sourceAsMap = indexRequest.sourceAsMap(); - Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); - executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() { + IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); + executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() { @Override - public void executed(Data data) { - if (data.isModified()) { - indexRequest.source(data.getDocument()); + public void executed(IngestDocument ingestDocument) { + if (ingestDocument.isModified()) { + indexRequest.source(ingestDocument.getSource()); } indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true); chain.proceed(action, indexRequest, listener); @@ -115,12 +115,12 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte IndexRequest indexRequest = (IndexRequest) actionRequest; Map sourceAsMap = indexRequest.sourceAsMap(); - Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); - executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() { + IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); + executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() { @Override - public void executed(Data data) { - if (data.isModified()) { - indexRequest.source(data.getDocument()); + public void executed(IngestDocument ingestDocument) { + if (ingestDocument.isModified()) { + indexRequest.source(ingestDocument.getSource()); } processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java index cc9a8513e03..bb26161582a 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/TransportData.java @@ -25,24 +25,28 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import java.io.IOException; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; +import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX; +import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; + public class TransportData implements Writeable, ToXContent { private static final TransportData PROTOTYPE = new TransportData(null); - private final Data data; + private final IngestDocument ingestDocument; - public TransportData(Data data) { - this.data = data; + public TransportData(IngestDocument ingestDocument) { + this.ingestDocument = ingestDocument; } - public Data get() { - return data; + public IngestDocument get() { + return ingestDocument; } public static TransportData readTransportDataFrom(StreamInput in) throws IOException { @@ -55,25 +59,25 @@ public class TransportData implements Writeable, ToXContent { String type = in.readString(); String id = in.readString(); Map doc = in.readMap(); - return new TransportData(new Data(index, type, id, doc)); + return new TransportData(new IngestDocument(index, type, id, doc)); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(data.getIndex()); - out.writeString(data.getType()); - out.writeString(data.getId()); - out.writeMap(data.getDocument()); + out.writeString(ingestDocument.getMetadata(INDEX)); + out.writeString(ingestDocument.getMetadata(TYPE)); + out.writeString(ingestDocument.getMetadata(ID)); + out.writeMap(ingestDocument.getSource()); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.DOCUMENT); - builder.field(Fields.MODIFIED, data.isModified()); - builder.field(Fields.INDEX, data.getIndex()); - builder.field(Fields.TYPE, data.getType()); - builder.field(Fields.ID, data.getId()); - builder.field(Fields.SOURCE, data.getDocument()); + builder.field(Fields.MODIFIED, ingestDocument.isModified()); + builder.field(Fields.INDEX, ingestDocument.getMetadata(INDEX)); + builder.field(Fields.TYPE, ingestDocument.getMetadata(TYPE)); + builder.field(Fields.ID, ingestDocument.getMetadata(ID)); + builder.field(Fields.SOURCE, ingestDocument.getSource()); builder.endObject(); return builder; } @@ -87,12 +91,12 @@ public class TransportData implements Writeable, ToXContent { return false; } TransportData that = (TransportData) o; - return Objects.equals(data, that.data); + return Objects.equals(ingestDocument, that.ingestDocument); } @Override public int hashCode() { - return Objects.hash(data); + return Objects.hash(ingestDocument); } static final class Fields { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java index 50897435217..4f55ef8424d 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequest.java @@ -18,7 +18,7 @@ */ package org.elasticsearch.plugin.ingest.transport.simulate; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.plugin.ingest.PipelineStore; @@ -29,11 +29,11 @@ import java.util.*; import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields; public class ParsedSimulateRequest { - private final List documents; + private final List documents; private final Pipeline pipeline; private final boolean verbose; - ParsedSimulateRequest(Pipeline pipeline, List documents, boolean verbose) { + ParsedSimulateRequest(Pipeline pipeline, List documents, boolean verbose) { this.pipeline = pipeline; this.documents = Collections.unmodifiableList(documents); this.verbose = verbose; @@ -43,7 +43,7 @@ public class ParsedSimulateRequest { return pipeline; } - public List getDocuments() { + public List getDocuments() { return documents; } @@ -55,18 +55,18 @@ public class ParsedSimulateRequest { private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory(); public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline"; - private List parseDocs(Map config) { + private List parseDocs(Map config) { List> docs = ConfigurationUtils.readList(config, Fields.DOCS); - List dataList = new ArrayList<>(); + List ingestDocumentList = new ArrayList<>(); for (Map dataMap : docs) { Map document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE); - Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX), + IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX), ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE), ConfigurationUtils.readStringProperty(dataMap, Fields.ID), document); - dataList.add(data); + ingestDocumentList.add(ingestDocument); } - return dataList; + return ingestDocumentList; } public ParsedSimulateRequest parseWithPipelineId(String pipelineId, Map config, boolean verbose, PipelineStore pipelineStore) { @@ -74,16 +74,16 @@ public class ParsedSimulateRequest { throw new IllegalArgumentException("param [pipeline] is null"); } Pipeline pipeline = pipelineStore.get(pipelineId); - List dataList = parseDocs(config); - return new ParsedSimulateRequest(pipeline, dataList, verbose); + List ingestDocumentList = parseDocs(config); + return new ParsedSimulateRequest(pipeline, ingestDocumentList, verbose); } public ParsedSimulateRequest parse(Map config, boolean verbose, PipelineStore pipelineStore) throws IOException { Map pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE); Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); - List dataList = parseDocs(config); - return new ParsedSimulateRequest(pipeline, dataList, verbose); + List ingestDocumentList = parseDocs(config); + return new ParsedSimulateRequest(pipeline, ingestDocumentList, verbose); } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResult.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResult.java index 1783b10f998..8cf08e3dc61 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResult.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResult.java @@ -22,20 +22,20 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.plugin.ingest.transport.TransportData; import java.io.IOException; public class SimulateDocumentSimpleResult implements SimulateDocumentResult { - private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult((Data)null); + private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult((IngestDocument)null); private TransportData data; private Exception failure; - public SimulateDocumentSimpleResult(Data data) { - this.data = new TransportData(data); + public SimulateDocumentSimpleResult(IngestDocument ingestDocument) { + this.data = new TransportData(ingestDocument); } private SimulateDocumentSimpleResult(TransportData data) { @@ -46,7 +46,7 @@ public class SimulateDocumentSimpleResult implements SimulateDocumentResult processorResultList = new ArrayList<>(); - Data currentData = new Data(data); + IngestDocument currentIngestDocument = new IngestDocument(ingestDocument); for (int i = 0; i < pipeline.getProcessors().size(); i++) { Processor processor = pipeline.getProcessors().get(i); String processorId = "processor[" + processor.getType() + "]-" + i; try { - processor.execute(currentData); - processorResultList.add(new SimulateProcessorResult(processorId, currentData)); + processor.execute(currentIngestDocument); + processorResultList.add(new SimulateProcessorResult(processorId, currentIngestDocument)); } catch (Exception e) { processorResultList.add(new SimulateProcessorResult(processorId, e)); } - currentData = new Data(currentData); + currentIngestDocument = new IngestDocument(currentIngestDocument); } return new SimulateDocumentVerboseResult(processorResultList); } @@ -73,11 +73,11 @@ public class SimulateExecutionService { @Override public void run() { List responses = new ArrayList<>(); - for (Data data : request.getDocuments()) { + for (IngestDocument ingestDocument : request.getDocuments()) { if (request.isVerbose()) { - responses.add(executeVerboseItem(request.getPipeline(), data)); + responses.add(executeVerboseItem(request.getPipeline(), ingestDocument)); } else { - responses.add(executeItem(request.getPipeline(), data)); + responses.add(executeItem(request.getPipeline(), ingestDocument)); } } listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java index 138f1ae553c..27d2065848c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResult.java @@ -25,22 +25,22 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.plugin.ingest.transport.TransportData; import java.io.IOException; public class SimulateProcessorResult implements Writeable, ToXContent { - private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult(null, (Data)null); + private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult(null, (IngestDocument)null); private String processorId; private TransportData data; private Exception failure; - public SimulateProcessorResult(String processorId, Data data) { + public SimulateProcessorResult(String processorId, IngestDocument ingestDocument) { this.processorId = processorId; - this.data = new TransportData(data); + this.data = new TransportData(ingestDocument); } private SimulateProcessorResult(String processorId, TransportData data) { @@ -53,7 +53,7 @@ public class SimulateProcessorResult implements Writeable a = (Map) data.getDocument().get("a"); + ingestDocument.setPropertyValue("a.b.c.d", "foo"); + assertThat(ingestDocument.getSource().get("a"), instanceOf(Map.class)); + Map a = (Map) ingestDocument.getSource().get("a"); assertThat(a.get("b"), instanceOf(Map.class)); Map b = (Map) a.get("b"); assertThat(b.get("c"), instanceOf(Map.class)); @@ -138,110 +138,110 @@ public class DataTests extends ESTestCase { assertThat(c.get("d"), instanceOf(String.class)); String d = (String) c.get("d"); assertThat(d, equalTo("foo")); - assertThat(data.isModified(), equalTo(true)); + assertThat(ingestDocument.isModified(), equalTo(true)); } public void testSetPropertyValueOnExistingField() { - data.setPropertyValue("foo", "newbar"); - assertThat(data.getDocument().get("foo"), equalTo("newbar")); + ingestDocument.setPropertyValue("foo", "newbar"); + assertThat(ingestDocument.getSource().get("foo"), equalTo("newbar")); } @SuppressWarnings("unchecked") public void testSetPropertyValueOnExistingParent() { - data.setPropertyValue("fizz.new", "bar"); - assertThat(data.getDocument().get("fizz"), instanceOf(Map.class)); - Map innerMap = (Map) data.getDocument().get("fizz"); + ingestDocument.setPropertyValue("fizz.new", "bar"); + assertThat(ingestDocument.getSource().get("fizz"), instanceOf(Map.class)); + Map innerMap = (Map) ingestDocument.getSource().get("fizz"); assertThat(innerMap.get("new"), instanceOf(String.class)); String value = (String) innerMap.get("new"); assertThat(value, equalTo("bar")); - assertThat(data.isModified(), equalTo(true)); + assertThat(ingestDocument.isModified(), equalTo(true)); } public void testSetPropertyValueOnExistingParentTypeMismatch() { try { - data.setPropertyValue("fizz.buzz.new", "bar"); + ingestDocument.setPropertyValue("fizz.buzz.new", "bar"); fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add field to parent [buzz] of type [java.lang.String], [java.util.Map] expected instead.")); - assertThat(data.isModified(), equalTo(false)); + assertThat(ingestDocument.isModified(), equalTo(false)); } } public void testSetPropertyValueOnExistingNullParent() { try { - data.setPropertyValue("fizz.foo_null.test", "bar"); + ingestDocument.setPropertyValue("fizz.foo_null.test", "bar"); fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add field to null parent, [java.util.Map] expected instead.")); - assertThat(data.isModified(), equalTo(false)); + assertThat(ingestDocument.isModified(), equalTo(false)); } } public void testSetPropertyValueNullName() { try { - data.setPropertyValue(null, "bar"); + ingestDocument.setPropertyValue(null, "bar"); fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add null or empty field")); - assertThat(data.isModified(), equalTo(false)); + assertThat(ingestDocument.isModified(), equalTo(false)); } } public void testSetPropertyValueEmptyName() { try { - data.setPropertyValue("", "bar"); + ingestDocument.setPropertyValue("", "bar"); fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add null or empty field")); - assertThat(data.isModified(), equalTo(false)); + assertThat(ingestDocument.isModified(), equalTo(false)); } } public void testRemoveProperty() { - data.removeProperty("foo"); - assertThat(data.isModified(), equalTo(true)); - assertThat(data.getDocument().size(), equalTo(2)); - assertThat(data.getDocument().containsKey("foo"), equalTo(false)); + ingestDocument.removeProperty("foo"); + assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.getSource().size(), equalTo(2)); + assertThat(ingestDocument.getSource().containsKey("foo"), equalTo(false)); } public void testRemoveInnerProperty() { - data.removeProperty("fizz.buzz"); - assertThat(data.getDocument().size(), equalTo(3)); - assertThat(data.getDocument().get("fizz"), instanceOf(Map.class)); + ingestDocument.removeProperty("fizz.buzz"); + assertThat(ingestDocument.getSource().size(), equalTo(3)); + assertThat(ingestDocument.getSource().get("fizz"), instanceOf(Map.class)); @SuppressWarnings("unchecked") - Map map = (Map)data.getDocument().get("fizz"); + Map map = (Map) ingestDocument.getSource().get("fizz"); assertThat(map.size(), equalTo(1)); assertThat(map.containsKey("buzz"), equalTo(false)); - data.removeProperty("fizz.foo_null"); + ingestDocument.removeProperty("fizz.foo_null"); assertThat(map.size(), equalTo(0)); - assertThat(data.getDocument().size(), equalTo(3)); - assertThat(data.getDocument().containsKey("fizz"), equalTo(true)); - assertThat(data.isModified(), equalTo(true)); + assertThat(ingestDocument.getSource().size(), equalTo(3)); + assertThat(ingestDocument.getSource().containsKey("fizz"), equalTo(true)); + assertThat(ingestDocument.isModified(), equalTo(true)); } public void testRemoveNonExistingProperty() { - data.removeProperty("does_not_exist"); - assertThat(data.isModified(), equalTo(false)); - assertThat(data.getDocument().size(), equalTo(3)); + ingestDocument.removeProperty("does_not_exist"); + assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testRemoveExistingParentTypeMismatch() { - data.removeProperty("foo.test"); - assertThat(data.isModified(), equalTo(false)); - assertThat(data.getDocument().size(), equalTo(3)); + ingestDocument.removeProperty("foo.test"); + assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testRemoveNullProperty() { - data.removeProperty(null); - assertThat(data.isModified(), equalTo(false)); - assertThat(data.getDocument().size(), equalTo(3)); + ingestDocument.removeProperty(null); + assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testRemoveEmptyProperty() { - data.removeProperty(""); - assertThat(data.isModified(), equalTo(false)); - assertThat(data.getDocument().size(), equalTo(3)); + ingestDocument.removeProperty(""); + assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testEqualsAndHashcode() throws Exception { @@ -250,7 +250,7 @@ public class DataTests extends ESTestCase { String id = randomAsciiOfLengthBetween(1, 10); String fieldName = randomAsciiOfLengthBetween(1, 10); String fieldValue = randomAsciiOfLengthBetween(1, 10); - Data data = new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue)); + IngestDocument ingestDocument = new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue)); boolean changed = false; String otherIndex; @@ -282,16 +282,16 @@ public class DataTests extends ESTestCase { document = Collections.singletonMap(fieldName, fieldValue); } - Data otherData = new Data(otherIndex, otherType, otherId, document); + IngestDocument otherIngestDocument = new IngestDocument(otherIndex, otherType, otherId, document); if (changed) { - assertThat(data, not(equalTo(otherData))); - assertThat(otherData, not(equalTo(data))); + assertThat(ingestDocument, not(equalTo(otherIngestDocument))); + assertThat(otherIngestDocument, not(equalTo(ingestDocument))); } else { - assertThat(data, equalTo(otherData)); - assertThat(otherData, equalTo(data)); - Data thirdData = new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue)); - assertThat(thirdData, equalTo(data)); - assertThat(data, equalTo(thirdData)); + assertThat(ingestDocument, equalTo(otherIngestDocument)); + assertThat(otherIngestDocument, equalTo(ingestDocument)); + IngestDocument thirdIngestDocument = new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue)); + assertThat(thirdIngestDocument, equalTo(ingestDocument)); + assertThat(ingestDocument, equalTo(thirdIngestDocument)); } } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 9b61ffa4fe3..a14ca5a31cf 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -106,8 +106,8 @@ public class IngestClientIT extends ESIntegTestCase { assertThat(response.getResults().size(), equalTo(1)); assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class)); SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0); - Data expectedData = new Data("index", "type", "id", Collections.singletonMap("foo", "bar")); - assertThat(simulateDocumentSimpleResult.getData(), equalTo(expectedData)); + IngestDocument expectedIngestDocument = new IngestDocument("index", "type", "id", Collections.singletonMap("foo", "bar")); + assertThat(simulateDocumentSimpleResult.getData(), equalTo(expectedIngestDocument)); assertThat(simulateDocumentSimpleResult.getFailure(), nullValue()); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/date/DateProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/date/DateProcessorTests.java index 2a9c444d691..8b8c1da25f1 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/date/DateProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/date/DateProcessorTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.processor.date; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -36,9 +36,9 @@ public class DateProcessorTests extends ESTestCase { "date_as_string", Collections.singletonList("yyyy dd MM hh:mm:ss"), "date_as_date"); Map document = new HashMap<>(); document.put("date_as_string", "2010 12 06 11:05:15"); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T11:05:15.000+02:00")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T11:05:15.000+02:00")); } public void testJodaPatternMultipleFormats() { @@ -51,27 +51,27 @@ public class DateProcessorTests extends ESTestCase { Map document = new HashMap<>(); document.put("date_as_string", "2010 12 06"); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); document = new HashMap<>(); document.put("date_as_string", "12/06/2010"); - data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); + ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); document = new HashMap<>(); document.put("date_as_string", "12-06-2010"); - data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); + ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); document = new HashMap<>(); document.put("date_as_string", "2010"); - data = new Data("index", "type", "id", document); + ingestDocument = new IngestDocument("index", "type", "id", document); try { - dateProcessor.execute(data); + dateProcessor.execute(ingestDocument); fail("processor should have failed due to not supported date format"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), containsString("unable to parse date [2010]")); @@ -83,9 +83,9 @@ public class DateProcessorTests extends ESTestCase { "date_as_string", Collections.singletonList("yyyy dd MMM"), "date_as_date"); Map document = new HashMap<>(); document.put("date_as_string", "2010 12 giugno"); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00")); } public void testJodaPatternDefaultYear() { @@ -93,9 +93,9 @@ public class DateProcessorTests extends ESTestCase { "date_as_string", Collections.singletonList("dd/MM"), "date_as_date"); Map document = new HashMap<>(); document.put("date_as_string", "12/06"); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo(DateTime.now().getYear() + "-06-12T00:00:00.000+02:00")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo(DateTime.now().getYear() + "-06-12T00:00:00.000+02:00")); } public void testTAI64N() { @@ -104,9 +104,9 @@ public class DateProcessorTests extends ESTestCase { Map document = new HashMap<>(); String dateAsString = (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024"; document.put("date_as_string", dateAsString); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2012-12-22T03:00:46.767+02:00")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2012-12-22T03:00:46.767+02:00")); } public void testUnixMs() { @@ -114,9 +114,9 @@ public class DateProcessorTests extends ESTestCase { "date_as_string", Collections.singletonList(DateParserFactory.UNIX_MS), "date_as_date"); Map document = new HashMap<>(); document.put("date_as_string", "1000500"); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z")); } public void testUnix() { @@ -124,8 +124,8 @@ public class DateProcessorTests extends ESTestCase { "date_as_string", Collections.singletonList(DateParserFactory.UNIX), "date_as_date"); Map document = new HashMap<>(); document.put("date_as_string", "1000.5"); - Data data = new Data("index", "type", "id", document); - dateProcessor.execute(data); - assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z")); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document); + dateProcessor.execute(ingestDocument); + assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z")); } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorTests.java index c93b4b78f1a..86de0a7862a 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.processor.geoip; import com.maxmind.geoip2.DatabaseReader; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; import java.io.InputStream; @@ -38,13 +38,13 @@ public class GeoIpProcessorTests extends ESTestCase { Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); - Data data = new Data("_index", "_type", "_id", document); - processor.execute(data); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document); + processor.execute(ingestDocument); - assertThat(data.getDocument().size(), equalTo(2)); - assertThat(data.getDocument().get("source_field"), equalTo("82.170.213.79")); + assertThat(ingestDocument.getSource().size(), equalTo(2)); + assertThat(ingestDocument.getSource().get("source_field"), equalTo("82.170.213.79")); @SuppressWarnings("unchecked") - Map geoData = (Map) data.getDocument().get("target_field"); + Map geoData = (Map) ingestDocument.getSource().get("target_field"); assertThat(geoData.size(), equalTo(10)); assertThat(geoData.get("ip"), equalTo("82.170.213.79")); assertThat(geoData.get("country_iso_code"), equalTo("NL")); @@ -64,13 +64,13 @@ public class GeoIpProcessorTests extends ESTestCase { Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); - Data data = new Data("_index", "_type", "_id", document); - processor.execute(data); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document); + processor.execute(ingestDocument); - assertThat(data.getDocument().size(), equalTo(2)); - assertThat(data.getDocument().get("source_field"), equalTo("82.170.213.79")); + assertThat(ingestDocument.getSource().size(), equalTo(2)); + assertThat(ingestDocument.getSource().get("source_field"), equalTo("82.170.213.79")); @SuppressWarnings("unchecked") - Map geoData = (Map) data.getDocument().get("target_field"); + Map geoData = (Map) ingestDocument.getSource().get("target_field"); assertThat(geoData.size(), equalTo(4)); assertThat(geoData.get("ip"), equalTo("82.170.213.79")); assertThat(geoData.get("country_iso_code"), equalTo("NL")); @@ -84,10 +84,10 @@ public class GeoIpProcessorTests extends ESTestCase { Map document = new HashMap<>(); document.put("source_field", "202.45.11.11"); - Data data = new Data("_index", "_type", "_id", document); - processor.execute(data); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document); + processor.execute(ingestDocument); @SuppressWarnings("unchecked") - Map geoData = (Map) data.getDocument().get("target_field"); + Map geoData = (Map) ingestDocument.getSource().get("target_field"); assertThat(geoData.size(), equalTo(0)); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java index d026b3d8fbc..9231a5db2be 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.processor.mutate; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.nullValue; public class MutateProcessorTests extends ESTestCase { - private Data data; + private IngestDocument ingestDocument; @Before public void setData() { @@ -49,35 +49,35 @@ public class MutateProcessorTests extends ESTestCase { fizz.put("buzz", "hello world"); document.put("fizz", fizz); - data = new Data("index", "type", "id", document); + ingestDocument = new IngestDocument("index", "type", "id", document); } public void testUpdate() throws IOException { Map update = new HashMap<>(); update.put("foo", 123); Processor processor = new MutateProcessor(update, null, null, null, null, null, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("foo", Integer.class), equalTo(123)); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("foo", Integer.class), equalTo(123)); } public void testRename() throws IOException { Map rename = new HashMap<>(); rename.put("foo", "bar"); Processor processor = new MutateProcessor(null, rename, null, null, null, null, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("bar", String.class), equalTo("bar")); - assertThat(data.hasPropertyValue("foo"), is(false)); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("bar", String.class), equalTo("bar")); + assertThat(ingestDocument.hasPropertyValue("foo"), is(false)); } public void testConvert() throws IOException { Map convert = new HashMap<>(); convert.put("num", "integer"); Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("num", Integer.class), equalTo(64)); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("num", Integer.class), equalTo(64)); } public void testConvertNullField() throws IOException { @@ -85,7 +85,7 @@ public class MutateProcessorTests extends ESTestCase { convert.put("null", "integer"); Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null); try { - processor.execute(data); + processor.execute(ingestDocument); fail("processor execute should have failed"); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Field \"null\" is null, cannot be converted to a/an integer")); @@ -96,18 +96,18 @@ public class MutateProcessorTests extends ESTestCase { Map convert = new HashMap<>(); convert.put("arr", "integer"); Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("arr", List.class), equalTo(Arrays.asList(1, 2, 3))); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("arr", List.class), equalTo(Arrays.asList(1, 2, 3))); } public void testSplit() throws IOException { Map split = new HashMap<>(); split.put("ip", "\\."); Processor processor = new MutateProcessor(null, null, null, split, null, null, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("ip", List.class), equalTo(Arrays.asList("127", "0", "0", "1"))); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("ip", List.class), equalTo(Arrays.asList("127", "0", "0", "1"))); } public void testSplitNullValue() throws IOException { @@ -115,7 +115,7 @@ public class MutateProcessorTests extends ESTestCase { split.put("not.found", "\\."); Processor processor = new MutateProcessor(null, null, null, split, null, null, null, null, null, null); try { - processor.execute(data); + processor.execute(ingestDocument); fail(); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Cannot split field. [not.found] is null.")); @@ -125,16 +125,16 @@ public class MutateProcessorTests extends ESTestCase { public void testGsub() throws IOException { List gsubExpressions = Collections.singletonList(new GsubExpression("ip", Pattern.compile("\\."), "-")); Processor processor = new MutateProcessor(null, null, null, null, gsubExpressions, null, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("ip", String.class), equalTo("127-0-0-1")); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("ip", String.class), equalTo("127-0-0-1")); } public void testGsub_NullValue() throws IOException { List gsubExpressions = Collections.singletonList(new GsubExpression("null_field", Pattern.compile("\\."), "-")); Processor processor = new MutateProcessor(null, null, null, null, gsubExpressions, null, null, null, null, null); try { - processor.execute(data); + processor.execute(ingestDocument); fail("processor execution should have failed"); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Field \"null_field\" is null, cannot match pattern.")); @@ -145,34 +145,34 @@ public class MutateProcessorTests extends ESTestCase { HashMap join = new HashMap<>(); join.put("arr", "-"); Processor processor = new MutateProcessor(null, null, null, null, null, join, null, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("arr", String.class), equalTo("1-2-3")); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("arr", String.class), equalTo("1-2-3")); } public void testRemove() throws IOException { List remove = Arrays.asList("foo", "ip"); Processor processor = new MutateProcessor(null, null, null, null, null, null, remove, null, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(5)); - assertThat(data.getPropertyValue("foo", Object.class), nullValue()); - assertThat(data.getPropertyValue("ip", Object.class), nullValue()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(5)); + assertThat(ingestDocument.getPropertyValue("foo", Object.class), nullValue()); + assertThat(ingestDocument.getPropertyValue("ip", Object.class), nullValue()); } public void testTrim() throws IOException { List trim = Arrays.asList("to_strip", "foo"); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, trim, null, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("foo", String.class), equalTo("bar")); - assertThat(data.getPropertyValue("to_strip", String.class), equalTo("clean")); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("bar")); + assertThat(ingestDocument.getPropertyValue("to_strip", String.class), equalTo("clean")); } public void testTrimNullValue() throws IOException { List trim = Collections.singletonList("not.found"); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, trim, null, null); try { - processor.execute(data); + processor.execute(ingestDocument); fail(); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Cannot trim field. [not.found] is null.")); @@ -182,16 +182,16 @@ public class MutateProcessorTests extends ESTestCase { public void testUppercase() throws IOException { List uppercase = Collections.singletonList("foo"); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("foo", String.class), equalTo("BAR")); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("BAR")); } public void testUppercaseNullValue() throws IOException { List uppercase = Collections.singletonList("not.found"); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null); try { - processor.execute(data); + processor.execute(ingestDocument); fail(); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Cannot uppercase field. [not.found] is null.")); @@ -201,16 +201,16 @@ public class MutateProcessorTests extends ESTestCase { public void testLowercase() throws IOException { List lowercase = Collections.singletonList("alpha"); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase); - processor.execute(data); - assertThat(data.getDocument().size(), equalTo(7)); - assertThat(data.getPropertyValue("alpha", String.class), equalTo("abcd")); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSource().size(), equalTo(7)); + assertThat(ingestDocument.getPropertyValue("alpha", String.class), equalTo("abcd")); } public void testLowercaseNullValue() throws IOException { List lowercase = Collections.singletonList("not.found"); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase); try { - processor.execute(data); + processor.execute(ingestDocument); fail(); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Cannot lowercase field. [not.found] is null.")); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index f970f24b237..3f9ec3517e6 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; @@ -59,25 +59,25 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecute_pipelineDoesNotExist() { when(store.get("_id")).thenReturn(null); - Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap()); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); - executionService.execute(data, "_id", listener); + executionService.execute(ingestDocument, "_id", listener); verify(listener).failed(any(IllegalArgumentException.class)); - verify(listener, times(0)).executed(data); + verify(listener, times(0)).executed(ingestDocument); } public void testExecute_success() throws Exception { Processor processor = mock(Processor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); - Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap()); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); - executionService.execute(data, "_id", listener); + executionService.execute(ingestDocument, "_id", listener); assertBusy(new Runnable() { @Override public void run() { - verify(processor).execute(data); - verify(listener).executed(data); + verify(processor).execute(ingestDocument); + verify(listener).executed(ingestDocument); verify(listener, times(0)).failed(any(Exception.class)); } }); @@ -86,15 +86,15 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecute_failure() throws Exception { Processor processor = mock(Processor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); - Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); - doThrow(new RuntimeException()).when(processor).execute(data); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap()); + doThrow(new RuntimeException()).when(processor).execute(ingestDocument); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); - executionService.execute(data, "_id", listener); + executionService.execute(ingestDocument, "_id", listener); assertBusy(new Runnable() { @Override public void run() { - verify(processor).execute(data); - verify(listener, times(0)).executed(data); + verify(processor).execute(ingestDocument); + verify(listener, times(0)).executed(ingestDocument); verify(listener).failed(any(RuntimeException.class)); } }); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index c0ff79b056f..cee6b1b1d78 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.mutate.MutateProcessor; @@ -80,7 +80,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verifyZeroInteractions(actionFilterChain); } @@ -93,7 +93,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verifyZeroInteractions(actionFilterChain); } @@ -121,16 +121,16 @@ public class IngestActionFilterTests extends ESTestCase { Answer answer = new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Data data = (Data) invocationOnMock.getArguments()[0]; + IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; - listener.executed(data); + listener.executed(ingestDocument); return null; } }; - doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verify(actionFilterChain).proceed("_action", indexRequest, actionListener); verifyZeroInteractions(actionListener); } @@ -151,10 +151,10 @@ public class IngestActionFilterTests extends ESTestCase { return null; } }; - doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verify(actionListener).onFailure(exception); verifyZeroInteractions(actionFilterChain); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java index 1cc3f6baada..165ea62e1f2 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/TransportDataTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -39,7 +39,7 @@ public class TransportDataTests extends ESTestCase { String id = randomAsciiOfLengthBetween(1, 10); String fieldName = randomAsciiOfLengthBetween(1, 10); String fieldValue = randomAsciiOfLengthBetween(1, 10); - TransportData transportData = new TransportData(new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue))); + TransportData transportData = new TransportData(new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue))); boolean changed = false; String otherIndex; @@ -71,23 +71,23 @@ public class TransportDataTests extends ESTestCase { document = Collections.singletonMap(fieldName, fieldValue); } - TransportData otherTransportData = new TransportData(new Data(otherIndex, otherType, otherId, document)); + TransportData otherTransportData = new TransportData(new IngestDocument(otherIndex, otherType, otherId, document)); if (changed) { assertThat(transportData, not(equalTo(otherTransportData))); assertThat(otherTransportData, not(equalTo(transportData))); } else { assertThat(transportData, equalTo(otherTransportData)); assertThat(otherTransportData, equalTo(transportData)); - TransportData thirdTransportData = new TransportData(new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue))); + TransportData thirdTransportData = new TransportData(new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue))); assertThat(thirdTransportData, equalTo(transportData)); assertThat(transportData, equalTo(thirdTransportData)); } } public void testSerialization() throws IOException { - Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), + IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10))); - TransportData transportData = new TransportData(data); + TransportData transportData = new TransportData(ingestDocument); BytesStreamOutput out = new BytesStreamOutput(); transportData.writeTo(out); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java index 7f44fc08b9b..29c4faa17f7 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/ParsedSimulateRequestParserTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.plugin.ingest.PipelineStore; @@ -29,6 +29,9 @@ import org.junit.Before; import java.io.IOException; import java.util.*; +import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; +import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX; +import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -80,12 +83,12 @@ public class ParsedSimulateRequestParserTests extends ESTestCase { assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); - for (Data data : actualRequest.getDocuments()) { + for (IngestDocument ingestDocument : actualRequest.getDocuments()) { Map expectedDocument = expectedDocsIterator.next(); - assertThat(data.getDocument(), equalTo(expectedDocument.get(Fields.SOURCE))); - assertThat(data.getIndex(), equalTo(expectedDocument.get(Fields.INDEX))); - assertThat(data.getType(), equalTo(expectedDocument.get(Fields.TYPE))); - assertThat(data.getId(), equalTo(expectedDocument.get(Fields.ID))); + assertThat(ingestDocument.getSource(), equalTo(expectedDocument.get(Fields.SOURCE))); + assertThat(ingestDocument.getMetadata(INDEX), equalTo(expectedDocument.get(Fields.INDEX))); + assertThat(ingestDocument.getMetadata(TYPE), equalTo(expectedDocument.get(Fields.TYPE))); + assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(Fields.ID))); } assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID)); @@ -133,12 +136,12 @@ public class ParsedSimulateRequestParserTests extends ESTestCase { assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); - for (Data data : actualRequest.getDocuments()) { + for (IngestDocument ingestDocument : actualRequest.getDocuments()) { Map expectedDocument = expectedDocsIterator.next(); - assertThat(data.getDocument(), equalTo(expectedDocument.get(Fields.SOURCE))); - assertThat(data.getIndex(), equalTo(expectedDocument.get(Fields.INDEX))); - assertThat(data.getType(), equalTo(expectedDocument.get(Fields.TYPE))); - assertThat(data.getId(), equalTo(expectedDocument.get(Fields.ID))); + assertThat(ingestDocument.getSource(), equalTo(expectedDocument.get(Fields.SOURCE))); + assertThat(ingestDocument.getMetadata(INDEX), equalTo(expectedDocument.get(Fields.INDEX))); + assertThat(ingestDocument.getMetadata(TYPE), equalTo(expectedDocument.get(Fields.TYPE))); + assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(Fields.ID))); } assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID)); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResultTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResultTests.java index e0b1c1c0f88..183aed0a5a4 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResultTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateDocumentSimpleResultTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -38,9 +38,9 @@ public class SimulateDocumentSimpleResultTests extends ESTestCase { if (isFailure) { simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test")); } else { - Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), + IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10))); - simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(data); + simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(ingestDocument); } BytesStreamOutput out = new BytesStreamOutput(); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java index bd2e6397a74..901ecb29dca 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; @@ -40,7 +40,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { private SimulateExecutionService executionService; private Pipeline pipeline; private Processor processor; - private Data data; + private IngestDocument ingestDocument; @Before public void setup() { @@ -53,7 +53,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { processor = mock(Processor.class); when(processor.getType()).thenReturn("mock"); pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor)); - data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); + ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); } @After @@ -62,35 +62,35 @@ public class SimulateExecutionServiceTests extends ESTestCase { } public void testExecuteVerboseItem() throws Exception { - SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data); - verify(processor, times(2)).execute(data); + SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, ingestDocument); + verify(processor, times(2)).execute(ingestDocument); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorId(), equalTo("processor[mock]-0")); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), not(sameInstance(data))); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), equalTo(data)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), not(sameInstance(ingestDocument))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), equalTo(ingestDocument)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue()); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1")); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(data))); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(data)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(ingestDocument))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(ingestDocument)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } public void testExecuteItem() throws Exception { - SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data); - verify(processor, times(2)).execute(data); + SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, ingestDocument); + verify(processor, times(2)).execute(ingestDocument); assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class)); SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse; - assertThat(simulateDocumentSimpleResult.getData(), equalTo(data)); + assertThat(simulateDocumentSimpleResult.getData(), equalTo(ingestDocument)); assertThat(simulateDocumentSimpleResult.getFailure(), nullValue()); } public void testExecuteVerboseItemWithFailure() throws Exception { Exception e = new RuntimeException("processor failed"); - doThrow(e).doNothing().when(processor).execute(data); - SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data); - verify(processor, times(2)).execute(data); + doThrow(e).doNothing().when(processor).execute(ingestDocument); + SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, ingestDocument); + verify(processor, times(2)).execute(ingestDocument); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2)); @@ -100,8 +100,8 @@ public class SimulateExecutionServiceTests extends ESTestCase { RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(); assertThat(runtimeException.getMessage(), equalTo("processor failed")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1")); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(data))); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(data)); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(ingestDocument))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(ingestDocument)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue()); runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(); assertThat(runtimeException.getMessage(), equalTo("processor failed")); @@ -109,9 +109,9 @@ public class SimulateExecutionServiceTests extends ESTestCase { public void testExecuteItemWithFailure() throws Exception { Exception e = new RuntimeException("processor failed"); - doThrow(e).when(processor).execute(data); - SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data); - verify(processor, times(1)).execute(data); + doThrow(e).when(processor).execute(ingestDocument); + SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, ingestDocument); + verify(processor, times(1)).execute(ingestDocument); assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class)); SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse; assertThat(simulateDocumentSimpleResult.getData(), nullValue()); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponseTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponseTests.java index 0a325ca3268..ab7803c719d 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponseTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineResponseTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -42,7 +42,7 @@ public class SimulatePipelineResponseTests extends ESTestCase { List results = new ArrayList<>(numResults); for (int i = 0; i < numResults; i++) { boolean isFailure = randomBoolean(); - Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), + IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10))); if (isVerbose) { int numProcessors = randomIntBetween(1, 10); @@ -53,18 +53,18 @@ public class SimulatePipelineResponseTests extends ESTestCase { if (isFailure) { processorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test")); } else { - processorResult = new SimulateProcessorResult(processorId, data); + processorResult = new SimulateProcessorResult(processorId, ingestDocument); } processorResults.add(processorResult); } results.add(new SimulateDocumentVerboseResult(processorResults)); } else { - results.add(new SimulateDocumentSimpleResult(data)); + results.add(new SimulateDocumentSimpleResult(ingestDocument)); SimulateDocumentSimpleResult simulateDocumentSimpleResult; if (isFailure) { simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test")); } else { - simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(data); + simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(ingestDocument); } results.add(simulateDocumentSimpleResult); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResultTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResultTests.java index 55024219347..d347c6749e3 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResultTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateProcessorResultTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -39,9 +39,9 @@ public class SimulateProcessorResultTests extends ESTestCase { if (isFailure) { simulateProcessorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test")); } else { - Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), + IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10))); - simulateProcessorResult = new SimulateProcessorResult(processorId, data); + simulateProcessorResult = new SimulateProcessorResult(processorId, ingestDocument); } BytesStreamOutput out = new BytesStreamOutput();