renamed Data to IngestDocument
moved all metadata related fields to a single metadata map removed specific metadata getters with a generic getMetadata()
This commit is contained in:
parent
36655b688c
commit
ecc8158b89
|
@ -24,26 +24,26 @@ import org.elasticsearch.common.Strings;
|
|||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Represents the data and meta data (like id and type) of a single document that is going to be indexed.
|
||||
* Represents a single document being captured before indexing and holds the source and meta data (like id, type and index).
|
||||
*/
|
||||
public final class Data {
|
||||
public final class IngestDocument {
|
||||
|
||||
private final String index;
|
||||
private final String type;
|
||||
private final String id;
|
||||
private final Map<String, Object> document;
|
||||
private final Map<String, String> metaData;
|
||||
private final Map<String, Object> source;
|
||||
|
||||
private boolean modified = false;
|
||||
|
||||
public Data(String index, String type, String id, Map<String, Object> document) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.document = document;
|
||||
public IngestDocument(String index, String type, String id, Map<String, Object> source) {
|
||||
this.metaData = new HashMap<>();
|
||||
this.metaData.put("_index", index);
|
||||
this.metaData.put("_type", type);
|
||||
this.metaData.put("_id", id);
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
public Data(Data other) {
|
||||
this(other.index, other.type, other.id, new HashMap<>(other.document));
|
||||
public IngestDocument(IngestDocument other) {
|
||||
this.metaData = new HashMap<>(other.metaData);
|
||||
this.source = new HashMap<>(other.source);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,7 +116,7 @@ public final class Data {
|
|||
}
|
||||
|
||||
private Map<String, Object> getParent(String[] pathElements) {
|
||||
Map<String, Object> innerMap = document;
|
||||
Map<String, Object> innerMap = source;
|
||||
for (int i = 0; i < pathElements.length - 1; i++) {
|
||||
Object obj = innerMap.get(pathElements[i]);
|
||||
if (obj instanceof Map) {
|
||||
|
@ -143,7 +143,7 @@ public final class Data {
|
|||
String[] pathElements = Strings.splitStringToArray(path, '.');
|
||||
assert pathElements.length > 0;
|
||||
|
||||
Map<String, Object> inner = document;
|
||||
Map<String, Object> inner = source;
|
||||
for (int i = 0; i < pathElements.length - 1; i++) {
|
||||
String pathElement = pathElements[i];
|
||||
if (inner.containsKey(pathElement)) {
|
||||
|
@ -169,16 +169,8 @@ public final class Data {
|
|||
modified = true;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
public String getMetadata(MetaData metaData) {
|
||||
return this.metaData.get(metaData.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -186,8 +178,8 @@ public final class Data {
|
|||
* not be reflected to the modified flag. Modify the document instead using {@link #setPropertyValue(String, Object)}
|
||||
* and {@link #removeProperty(String)}
|
||||
*/
|
||||
public Map<String, Object> getDocument() {
|
||||
return document;
|
||||
public Map<String, Object> getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public boolean isModified() {
|
||||
|
@ -201,15 +193,35 @@ public final class Data {
|
|||
return false;
|
||||
}
|
||||
|
||||
Data other = (Data) obj;
|
||||
return Objects.equals(document, other.document) &&
|
||||
Objects.equals(index, other.index) &&
|
||||
Objects.equals(type, other.type) &&
|
||||
Objects.equals(id, other.id);
|
||||
IngestDocument other = (IngestDocument) obj;
|
||||
return Objects.equals(source, other.source) &&
|
||||
Objects.equals(metaData, other.metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(index, type, id, document);
|
||||
return Objects.hash(metaData, source);
|
||||
}
|
||||
|
||||
public enum MetaData {
|
||||
|
||||
INDEX("_index"),
|
||||
TYPE("_type"),
|
||||
ID("_id"),
|
||||
ROUTING("_routing"),
|
||||
PARENT("_parent"),
|
||||
TIMESTAMP("_timestamp"),
|
||||
TTL("_ttl");
|
||||
|
||||
private final String name;
|
||||
|
||||
MetaData(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -44,9 +44,9 @@ public final class Pipeline {
|
|||
/**
|
||||
* Modifies the data of a document to be indexed based on the processor this pipeline holds
|
||||
*/
|
||||
public void execute(Data data) {
|
||||
public void execute(IngestDocument ingestDocument) {
|
||||
for (Processor processor : processors) {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -36,7 +36,7 @@ public interface Processor {
|
|||
/**
|
||||
* Introspect and potentially modify the incoming data.
|
||||
*/
|
||||
void execute(Data data);
|
||||
void execute(IngestDocument ingestDocument);
|
||||
|
||||
/**
|
||||
* Gets the type of a processor
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.date;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -56,8 +56,8 @@ public final class DateProcessor implements Processor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
String value = data.getPropertyValue(matchField, String.class);
|
||||
public void execute(IngestDocument ingestDocument) {
|
||||
String value = ingestDocument.getPropertyValue(matchField, String.class);
|
||||
// TODO(talevy): handle custom timestamp fields
|
||||
|
||||
DateTime dateTime = null;
|
||||
|
@ -75,7 +75,7 @@ public final class DateProcessor implements Processor {
|
|||
throw new IllegalArgumentException("unable to parse date [" + value + "]", lastException);
|
||||
}
|
||||
|
||||
data.setPropertyValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
|
||||
ingestDocument.setPropertyValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,7 +26,7 @@ import com.maxmind.geoip2.model.CountryResponse;
|
|||
import com.maxmind.geoip2.record.*;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -60,8 +60,8 @@ public final class GeoIpProcessor implements Processor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
String ip = data.getPropertyValue(sourceField, String.class);
|
||||
public void execute(IngestDocument ingestDocument) {
|
||||
String ip = ingestDocument.getPropertyValue(sourceField, String.class);
|
||||
final InetAddress ipAddress;
|
||||
try {
|
||||
ipAddress = InetAddress.getByName(ip);
|
||||
|
@ -88,7 +88,7 @@ public final class GeoIpProcessor implements Processor {
|
|||
default:
|
||||
throw new IllegalStateException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType() + "]");
|
||||
}
|
||||
data.setPropertyValue(targetField, geoData);
|
||||
ingestDocument.setPropertyValue(targetField, geoData);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.grok;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
|
@ -45,13 +45,13 @@ public final class GrokProcessor implements Processor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
Object field = data.getPropertyValue(matchField, Object.class);
|
||||
public void execute(IngestDocument ingestDocument) {
|
||||
Object field = ingestDocument.getPropertyValue(matchField, Object.class);
|
||||
// TODO(talevy): handle invalid field types
|
||||
if (field instanceof String) {
|
||||
Map<String, Object> matches = grok.captures((String) field);
|
||||
if (matches != null) {
|
||||
matches.forEach((k, v) -> data.setPropertyValue(k, v));
|
||||
matches.forEach((k, v) -> ingestDocument.setPropertyValue(k, v));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
|
@ -101,36 +101,36 @@ public final class MutateProcessor implements Processor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
public void execute(IngestDocument ingestDocument) {
|
||||
if (update != null) {
|
||||
doUpdate(data);
|
||||
doUpdate(ingestDocument);
|
||||
}
|
||||
if (rename != null) {
|
||||
doRename(data);
|
||||
doRename(ingestDocument);
|
||||
}
|
||||
if (convert != null) {
|
||||
doConvert(data);
|
||||
doConvert(ingestDocument);
|
||||
}
|
||||
if (split != null) {
|
||||
doSplit(data);
|
||||
doSplit(ingestDocument);
|
||||
}
|
||||
if (gsub != null) {
|
||||
doGsub(data);
|
||||
doGsub(ingestDocument);
|
||||
}
|
||||
if (join != null) {
|
||||
doJoin(data);
|
||||
doJoin(ingestDocument);
|
||||
}
|
||||
if (remove != null) {
|
||||
doRemove(data);
|
||||
doRemove(ingestDocument);
|
||||
}
|
||||
if (trim != null) {
|
||||
doTrim(data);
|
||||
doTrim(ingestDocument);
|
||||
}
|
||||
if (uppercase != null) {
|
||||
doUppercase(data);
|
||||
doUppercase(ingestDocument);
|
||||
}
|
||||
if (lowercase != null) {
|
||||
doLowercase(data);
|
||||
doLowercase(ingestDocument);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,18 +139,18 @@ public final class MutateProcessor implements Processor {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
private void doUpdate(Data data) {
|
||||
private void doUpdate(IngestDocument ingestDocument) {
|
||||
for(Map.Entry<String, Object> entry : update.entrySet()) {
|
||||
data.setPropertyValue(entry.getKey(), entry.getValue());
|
||||
ingestDocument.setPropertyValue(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void doRename(Data data) {
|
||||
private void doRename(IngestDocument ingestDocument) {
|
||||
for(Map.Entry<String, String> entry : rename.entrySet()) {
|
||||
if (data.hasPropertyValue(entry.getKey())) {
|
||||
Object oldVal = data.getPropertyValue(entry.getKey(), Object.class);
|
||||
data.getDocument().remove(entry.getKey());
|
||||
data.setPropertyValue(entry.getValue(), oldVal);
|
||||
if (ingestDocument.hasPropertyValue(entry.getKey())) {
|
||||
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
|
||||
ingestDocument.getSource().remove(entry.getKey());
|
||||
ingestDocument.setPropertyValue(entry.getValue(), oldVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -175,11 +175,11 @@ public final class MutateProcessor implements Processor {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doConvert(Data data) {
|
||||
private void doConvert(IngestDocument ingestDocument) {
|
||||
for(Map.Entry<String, String> entry : convert.entrySet()) {
|
||||
String toType = entry.getValue();
|
||||
|
||||
Object oldVal = data.getPropertyValue(entry.getKey(), Object.class);
|
||||
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
|
||||
Object newVal;
|
||||
|
||||
if (oldVal instanceof List) {
|
||||
|
@ -194,91 +194,91 @@ public final class MutateProcessor implements Processor {
|
|||
newVal = parseValueAsType(oldVal, toType);
|
||||
}
|
||||
|
||||
data.setPropertyValue(entry.getKey(), newVal);
|
||||
ingestDocument.setPropertyValue(entry.getKey(), newVal);
|
||||
}
|
||||
}
|
||||
|
||||
private void doSplit(Data data) {
|
||||
private void doSplit(IngestDocument ingestDocument) {
|
||||
for(Map.Entry<String, String> entry : split.entrySet()) {
|
||||
Object oldVal = data.getPropertyValue(entry.getKey(), Object.class);
|
||||
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
|
||||
if (oldVal == null) {
|
||||
throw new IllegalArgumentException("Cannot split field. [" + entry.getKey() + "] is null.");
|
||||
} else if (oldVal instanceof String) {
|
||||
data.setPropertyValue(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue())));
|
||||
ingestDocument.setPropertyValue(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue())));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot split a field that is not a String type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doGsub(Data data) {
|
||||
private void doGsub(IngestDocument ingestDocument) {
|
||||
for (GsubExpression gsubExpression : gsub) {
|
||||
String oldVal = data.getPropertyValue(gsubExpression.getFieldName(), String.class);
|
||||
String oldVal = ingestDocument.getPropertyValue(gsubExpression.getFieldName(), String.class);
|
||||
if (oldVal == null) {
|
||||
throw new IllegalArgumentException("Field \"" + gsubExpression.getFieldName() + "\" is null, cannot match pattern.");
|
||||
}
|
||||
Matcher matcher = gsubExpression.getPattern().matcher(oldVal);
|
||||
String newVal = matcher.replaceAll(gsubExpression.getReplacement());
|
||||
data.setPropertyValue(gsubExpression.getFieldName(), newVal);
|
||||
ingestDocument.setPropertyValue(gsubExpression.getFieldName(), newVal);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doJoin(Data data) {
|
||||
private void doJoin(IngestDocument ingestDocument) {
|
||||
for(Map.Entry<String, String> entry : join.entrySet()) {
|
||||
Object oldVal = data.getPropertyValue(entry.getKey(), Object.class);
|
||||
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
|
||||
if (oldVal instanceof List) {
|
||||
String joined = (String) ((List) oldVal)
|
||||
.stream()
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.joining(entry.getValue()));
|
||||
|
||||
data.setPropertyValue(entry.getKey(), joined);
|
||||
ingestDocument.setPropertyValue(entry.getKey(), joined);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot join field:" + entry.getKey() + " with type: " + oldVal.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doRemove(Data data) {
|
||||
private void doRemove(IngestDocument ingestDocument) {
|
||||
for(String field : remove) {
|
||||
data.getDocument().remove(field);
|
||||
ingestDocument.getSource().remove(field);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTrim(Data data) {
|
||||
private void doTrim(IngestDocument ingestDocument) {
|
||||
for(String field : trim) {
|
||||
Object val = data.getPropertyValue(field, Object.class);
|
||||
Object val = ingestDocument.getPropertyValue(field, Object.class);
|
||||
if (val == null) {
|
||||
throw new IllegalArgumentException("Cannot trim field. [" + field + "] is null.");
|
||||
} else if (val instanceof String) {
|
||||
data.setPropertyValue(field, ((String) val).trim());
|
||||
ingestDocument.setPropertyValue(field, ((String) val).trim());
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot trim field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doUppercase(Data data) {
|
||||
private void doUppercase(IngestDocument ingestDocument) {
|
||||
for(String field : uppercase) {
|
||||
Object val = data.getPropertyValue(field, Object.class);
|
||||
Object val = ingestDocument.getPropertyValue(field, Object.class);
|
||||
if (val == null) {
|
||||
throw new IllegalArgumentException("Cannot uppercase field. [" + field + "] is null.");
|
||||
} else if (val instanceof String) {
|
||||
data.setPropertyValue(field, ((String) val).toUpperCase(Locale.ROOT));
|
||||
ingestDocument.setPropertyValue(field, ((String) val).toUpperCase(Locale.ROOT));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot uppercase field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doLowercase(Data data) {
|
||||
private void doLowercase(IngestDocument ingestDocument) {
|
||||
for(String field : lowercase) {
|
||||
Object val = data.getPropertyValue(field, Object.class);
|
||||
Object val = ingestDocument.getPropertyValue(field, Object.class);
|
||||
if (val == null) {
|
||||
throw new IllegalArgumentException("Cannot lowercase field. [" + field + "] is null.");
|
||||
} else if (val instanceof String) {
|
||||
data.setPropertyValue(field, ((String) val).toLowerCase(Locale.ROOT));
|
||||
ingestDocument.setPropertyValue(field, ((String) val).toLowerCase(Locale.ROOT));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot lowercase field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -40,7 +40,7 @@ public class PipelineExecutionService {
|
|||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public void execute(Data data, String pipelineId, Listener listener) {
|
||||
public void execute(IngestDocument ingestDocument, String pipelineId, Listener listener) {
|
||||
Pipeline pipeline = store.get(pipelineId);
|
||||
if (pipeline == null) {
|
||||
listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId)));
|
||||
|
@ -51,8 +51,8 @@ public class PipelineExecutionService {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
pipeline.execute(data);
|
||||
listener.executed(data);
|
||||
pipeline.execute(ingestDocument);
|
||||
listener.executed(ingestDocument);
|
||||
} catch (Exception e) {
|
||||
listener.failed(e);
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ public class PipelineExecutionService {
|
|||
|
||||
public interface Listener {
|
||||
|
||||
void executed(Data data);
|
||||
void executed(IngestDocument ingestDocument);
|
||||
|
||||
void failed(Exception e);
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.action.support.ActionFilterChain;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||
|
||||
|
@ -82,12 +82,12 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
}
|
||||
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() {
|
||||
IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() {
|
||||
@Override
|
||||
public void executed(Data data) {
|
||||
if (data.isModified()) {
|
||||
indexRequest.source(data.getDocument());
|
||||
public void executed(IngestDocument ingestDocument) {
|
||||
if (ingestDocument.isModified()) {
|
||||
indexRequest.source(ingestDocument.getSource());
|
||||
}
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
|
||||
chain.proceed(action, indexRequest, listener);
|
||||
|
@ -115,12 +115,12 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() {
|
||||
IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() {
|
||||
@Override
|
||||
public void executed(Data data) {
|
||||
if (data.isModified()) {
|
||||
indexRequest.source(data.getDocument());
|
||||
public void executed(IngestDocument ingestDocument) {
|
||||
if (ingestDocument.isModified()) {
|
||||
indexRequest.source(ingestDocument.getSource());
|
||||
}
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests);
|
||||
}
|
||||
|
|
|
@ -25,24 +25,28 @@ import org.elasticsearch.common.io.stream.Writeable;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX;
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE;
|
||||
|
||||
public class TransportData implements Writeable<TransportData>, ToXContent {
|
||||
|
||||
private static final TransportData PROTOTYPE = new TransportData(null);
|
||||
|
||||
private final Data data;
|
||||
private final IngestDocument ingestDocument;
|
||||
|
||||
public TransportData(Data data) {
|
||||
this.data = data;
|
||||
public TransportData(IngestDocument ingestDocument) {
|
||||
this.ingestDocument = ingestDocument;
|
||||
}
|
||||
|
||||
public Data get() {
|
||||
return data;
|
||||
public IngestDocument get() {
|
||||
return ingestDocument;
|
||||
}
|
||||
|
||||
public static TransportData readTransportDataFrom(StreamInput in) throws IOException {
|
||||
|
@ -55,25 +59,25 @@ public class TransportData implements Writeable<TransportData>, ToXContent {
|
|||
String type = in.readString();
|
||||
String id = in.readString();
|
||||
Map<String, Object> doc = in.readMap();
|
||||
return new TransportData(new Data(index, type, id, doc));
|
||||
return new TransportData(new IngestDocument(index, type, id, doc));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(data.getIndex());
|
||||
out.writeString(data.getType());
|
||||
out.writeString(data.getId());
|
||||
out.writeMap(data.getDocument());
|
||||
out.writeString(ingestDocument.getMetadata(INDEX));
|
||||
out.writeString(ingestDocument.getMetadata(TYPE));
|
||||
out.writeString(ingestDocument.getMetadata(ID));
|
||||
out.writeMap(ingestDocument.getSource());
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.DOCUMENT);
|
||||
builder.field(Fields.MODIFIED, data.isModified());
|
||||
builder.field(Fields.INDEX, data.getIndex());
|
||||
builder.field(Fields.TYPE, data.getType());
|
||||
builder.field(Fields.ID, data.getId());
|
||||
builder.field(Fields.SOURCE, data.getDocument());
|
||||
builder.field(Fields.MODIFIED, ingestDocument.isModified());
|
||||
builder.field(Fields.INDEX, ingestDocument.getMetadata(INDEX));
|
||||
builder.field(Fields.TYPE, ingestDocument.getMetadata(TYPE));
|
||||
builder.field(Fields.ID, ingestDocument.getMetadata(ID));
|
||||
builder.field(Fields.SOURCE, ingestDocument.getSource());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -87,12 +91,12 @@ public class TransportData implements Writeable<TransportData>, ToXContent {
|
|||
return false;
|
||||
}
|
||||
TransportData that = (TransportData) o;
|
||||
return Objects.equals(data, that.data);
|
||||
return Objects.equals(ingestDocument, that.ingestDocument);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(data);
|
||||
return Objects.hash(ingestDocument);
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
|
@ -29,11 +29,11 @@ import java.util.*;
|
|||
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
|
||||
|
||||
public class ParsedSimulateRequest {
|
||||
private final List<Data> documents;
|
||||
private final List<IngestDocument> documents;
|
||||
private final Pipeline pipeline;
|
||||
private final boolean verbose;
|
||||
|
||||
ParsedSimulateRequest(Pipeline pipeline, List<Data> documents, boolean verbose) {
|
||||
ParsedSimulateRequest(Pipeline pipeline, List<IngestDocument> documents, boolean verbose) {
|
||||
this.pipeline = pipeline;
|
||||
this.documents = Collections.unmodifiableList(documents);
|
||||
this.verbose = verbose;
|
||||
|
@ -43,7 +43,7 @@ public class ParsedSimulateRequest {
|
|||
return pipeline;
|
||||
}
|
||||
|
||||
public List<Data> getDocuments() {
|
||||
public List<IngestDocument> getDocuments() {
|
||||
return documents;
|
||||
}
|
||||
|
||||
|
@ -55,18 +55,18 @@ public class ParsedSimulateRequest {
|
|||
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
|
||||
public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
|
||||
|
||||
private List<Data> parseDocs(Map<String, Object> config) {
|
||||
private List<IngestDocument> parseDocs(Map<String, Object> config) {
|
||||
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS);
|
||||
List<Data> dataList = new ArrayList<>();
|
||||
List<IngestDocument> ingestDocumentList = new ArrayList<>();
|
||||
for (Map<String, Object> dataMap : docs) {
|
||||
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
|
||||
Data data = new Data(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
|
||||
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
|
||||
ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE),
|
||||
ConfigurationUtils.readStringProperty(dataMap, Fields.ID),
|
||||
document);
|
||||
dataList.add(data);
|
||||
ingestDocumentList.add(ingestDocument);
|
||||
}
|
||||
return dataList;
|
||||
return ingestDocumentList;
|
||||
}
|
||||
|
||||
public ParsedSimulateRequest parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
|
||||
|
@ -74,16 +74,16 @@ public class ParsedSimulateRequest {
|
|||
throw new IllegalArgumentException("param [pipeline] is null");
|
||||
}
|
||||
Pipeline pipeline = pipelineStore.get(pipelineId);
|
||||
List<Data> dataList = parseDocs(config);
|
||||
return new ParsedSimulateRequest(pipeline, dataList, verbose);
|
||||
List<IngestDocument> ingestDocumentList = parseDocs(config);
|
||||
return new ParsedSimulateRequest(pipeline, ingestDocumentList, verbose);
|
||||
|
||||
}
|
||||
|
||||
public ParsedSimulateRequest parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
|
||||
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
|
||||
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
|
||||
List<Data> dataList = parseDocs(config);
|
||||
return new ParsedSimulateRequest(pipeline, dataList, verbose);
|
||||
List<IngestDocument> ingestDocumentList = parseDocs(config);
|
||||
return new ParsedSimulateRequest(pipeline, ingestDocumentList, verbose);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,20 +22,20 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.plugin.ingest.transport.TransportData;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SimulateDocumentSimpleResult implements SimulateDocumentResult<SimulateDocumentSimpleResult> {
|
||||
|
||||
private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult((Data)null);
|
||||
private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult((IngestDocument)null);
|
||||
|
||||
private TransportData data;
|
||||
private Exception failure;
|
||||
|
||||
public SimulateDocumentSimpleResult(Data data) {
|
||||
this.data = new TransportData(data);
|
||||
public SimulateDocumentSimpleResult(IngestDocument ingestDocument) {
|
||||
this.data = new TransportData(ingestDocument);
|
||||
}
|
||||
|
||||
private SimulateDocumentSimpleResult(TransportData data) {
|
||||
|
@ -46,7 +46,7 @@ public class SimulateDocumentSimpleResult implements SimulateDocumentResult<Simu
|
|||
this.failure = failure;
|
||||
}
|
||||
|
||||
public Data getData() {
|
||||
public IngestDocument getData() {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -40,30 +40,30 @@ public class SimulateExecutionService {
|
|||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
SimulateDocumentResult executeItem(Pipeline pipeline, Data data) {
|
||||
SimulateDocumentResult executeItem(Pipeline pipeline, IngestDocument ingestDocument) {
|
||||
try {
|
||||
pipeline.execute(data);
|
||||
return new SimulateDocumentSimpleResult(data);
|
||||
pipeline.execute(ingestDocument);
|
||||
return new SimulateDocumentSimpleResult(ingestDocument);
|
||||
} catch (Exception e) {
|
||||
return new SimulateDocumentSimpleResult(e);
|
||||
}
|
||||
}
|
||||
|
||||
SimulateDocumentVerboseResult executeVerboseItem(Pipeline pipeline, Data data) {
|
||||
SimulateDocumentVerboseResult executeVerboseItem(Pipeline pipeline, IngestDocument ingestDocument) {
|
||||
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
|
||||
Data currentData = new Data(data);
|
||||
IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
|
||||
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
|
||||
Processor processor = pipeline.getProcessors().get(i);
|
||||
String processorId = "processor[" + processor.getType() + "]-" + i;
|
||||
|
||||
try {
|
||||
processor.execute(currentData);
|
||||
processorResultList.add(new SimulateProcessorResult(processorId, currentData));
|
||||
processor.execute(currentIngestDocument);
|
||||
processorResultList.add(new SimulateProcessorResult(processorId, currentIngestDocument));
|
||||
} catch (Exception e) {
|
||||
processorResultList.add(new SimulateProcessorResult(processorId, e));
|
||||
}
|
||||
|
||||
currentData = new Data(currentData);
|
||||
currentIngestDocument = new IngestDocument(currentIngestDocument);
|
||||
}
|
||||
return new SimulateDocumentVerboseResult(processorResultList);
|
||||
}
|
||||
|
@ -73,11 +73,11 @@ public class SimulateExecutionService {
|
|||
@Override
|
||||
public void run() {
|
||||
List<SimulateDocumentResult> responses = new ArrayList<>();
|
||||
for (Data data : request.getDocuments()) {
|
||||
for (IngestDocument ingestDocument : request.getDocuments()) {
|
||||
if (request.isVerbose()) {
|
||||
responses.add(executeVerboseItem(request.getPipeline(), data));
|
||||
responses.add(executeVerboseItem(request.getPipeline(), ingestDocument));
|
||||
} else {
|
||||
responses.add(executeItem(request.getPipeline(), data));
|
||||
responses.add(executeItem(request.getPipeline(), ingestDocument));
|
||||
}
|
||||
}
|
||||
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
|
||||
|
|
|
@ -25,22 +25,22 @@ import org.elasticsearch.common.io.stream.Writeable;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.plugin.ingest.transport.TransportData;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SimulateProcessorResult implements Writeable<SimulateProcessorResult>, ToXContent {
|
||||
|
||||
private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult(null, (Data)null);
|
||||
private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult(null, (IngestDocument)null);
|
||||
|
||||
private String processorId;
|
||||
private TransportData data;
|
||||
private Exception failure;
|
||||
|
||||
public SimulateProcessorResult(String processorId, Data data) {
|
||||
public SimulateProcessorResult(String processorId, IngestDocument ingestDocument) {
|
||||
this.processorId = processorId;
|
||||
this.data = new TransportData(data);
|
||||
this.data = new TransportData(ingestDocument);
|
||||
}
|
||||
|
||||
private SimulateProcessorResult(String processorId, TransportData data) {
|
||||
|
@ -53,7 +53,7 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
|
|||
this.failure = failure;
|
||||
}
|
||||
|
||||
public Data getData() {
|
||||
public IngestDocument getData() {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ import static org.hamcrest.Matchers.*;
|
|||
|
||||
public class DataTests extends ESTestCase {
|
||||
|
||||
private Data data;
|
||||
private IngestDocument ingestDocument;
|
||||
|
||||
@Before
|
||||
public void setData() {
|
||||
|
@ -41,28 +41,28 @@ public class DataTests extends ESTestCase {
|
|||
innerObject.put("buzz", "hello world");
|
||||
innerObject.put("foo_null", null);
|
||||
document.put("fizz", innerObject);
|
||||
data = new Data("index", "type", "id", document);
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
}
|
||||
|
||||
public void testSimpleGetPropertyValue() {
|
||||
assertThat(data.getPropertyValue("foo", String.class), equalTo("bar"));
|
||||
assertThat(data.getPropertyValue("int", Integer.class), equalTo(123));
|
||||
assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("bar"));
|
||||
assertThat(ingestDocument.getPropertyValue("int", Integer.class), equalTo(123));
|
||||
}
|
||||
|
||||
public void testGetPropertyValueNullValue() {
|
||||
assertThat(data.getPropertyValue("fizz.foo_null", Object.class), nullValue());
|
||||
assertThat(ingestDocument.getPropertyValue("fizz.foo_null", Object.class), nullValue());
|
||||
}
|
||||
|
||||
public void testSimpleGetPropertyValueTypeMismatch() {
|
||||
try {
|
||||
data.getPropertyValue("int", String.class);
|
||||
ingestDocument.getPropertyValue("int", String.class);
|
||||
fail("getProperty should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("field [int] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
|
||||
}
|
||||
|
||||
try {
|
||||
data.getPropertyValue("foo", Integer.class);
|
||||
ingestDocument.getPropertyValue("foo", Integer.class);
|
||||
fail("getProperty should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("field [foo] of type [java.lang.String] cannot be cast to [java.lang.Integer]"));
|
||||
|
@ -70,67 +70,67 @@ public class DataTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testNestedGetPropertyValue() {
|
||||
assertThat(data.getPropertyValue("fizz.buzz", String.class), equalTo("hello world"));
|
||||
assertThat(ingestDocument.getPropertyValue("fizz.buzz", String.class), equalTo("hello world"));
|
||||
}
|
||||
|
||||
public void testGetPropertyValueNotFound() {
|
||||
assertThat(data.getPropertyValue("not.here", String.class), nullValue());
|
||||
assertThat(ingestDocument.getPropertyValue("not.here", String.class), nullValue());
|
||||
}
|
||||
|
||||
public void testGetPropertyValueNull() {
|
||||
assertNull(data.getPropertyValue(null, String.class));
|
||||
assertNull(ingestDocument.getPropertyValue(null, String.class));
|
||||
}
|
||||
|
||||
public void testGetPropertyValueEmpty() {
|
||||
assertNull(data.getPropertyValue("", String.class));
|
||||
assertNull(ingestDocument.getPropertyValue("", String.class));
|
||||
}
|
||||
|
||||
public void testHasProperty() {
|
||||
assertTrue(data.hasPropertyValue("fizz"));
|
||||
assertTrue(ingestDocument.hasPropertyValue("fizz"));
|
||||
}
|
||||
|
||||
public void testHasPropertyValueNested() {
|
||||
assertTrue(data.hasPropertyValue("fizz.buzz"));
|
||||
assertTrue(ingestDocument.hasPropertyValue("fizz.buzz"));
|
||||
}
|
||||
|
||||
public void testHasPropertyValueNotFound() {
|
||||
assertFalse(data.hasPropertyValue("doesnotexist"));
|
||||
assertFalse(ingestDocument.hasPropertyValue("doesnotexist"));
|
||||
}
|
||||
|
||||
public void testHasPropertyValueNestedNotFound() {
|
||||
assertFalse(data.hasPropertyValue("fizz.doesnotexist"));
|
||||
assertFalse(ingestDocument.hasPropertyValue("fizz.doesnotexist"));
|
||||
}
|
||||
|
||||
public void testHasPropertyValueNull() {
|
||||
assertFalse(data.hasPropertyValue(null));
|
||||
assertFalse(ingestDocument.hasPropertyValue(null));
|
||||
}
|
||||
|
||||
public void testHasPropertyValueNullValue() {
|
||||
assertTrue(data.hasPropertyValue("fizz.foo_null"));
|
||||
assertTrue(ingestDocument.hasPropertyValue("fizz.foo_null"));
|
||||
}
|
||||
|
||||
public void testHasPropertyValueEmpty() {
|
||||
assertFalse(data.hasPropertyValue(""));
|
||||
assertFalse(ingestDocument.hasPropertyValue(""));
|
||||
}
|
||||
|
||||
public void testSimpleSetPropertyValue() {
|
||||
data.setPropertyValue("new_field", "foo");
|
||||
assertThat(data.getDocument().get("new_field"), equalTo("foo"));
|
||||
assertThat(data.isModified(), equalTo(true));
|
||||
ingestDocument.setPropertyValue("new_field", "foo");
|
||||
assertThat(ingestDocument.getSource().get("new_field"), equalTo("foo"));
|
||||
assertThat(ingestDocument.isModified(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testSetPropertyValueNullValue() {
|
||||
data.setPropertyValue("new_field", null);
|
||||
assertThat(data.getDocument().containsKey("new_field"), equalTo(true));
|
||||
assertThat(data.getDocument().get("new_field"), nullValue());
|
||||
assertThat(data.isModified(), equalTo(true));
|
||||
ingestDocument.setPropertyValue("new_field", null);
|
||||
assertThat(ingestDocument.getSource().containsKey("new_field"), equalTo(true));
|
||||
assertThat(ingestDocument.getSource().get("new_field"), nullValue());
|
||||
assertThat(ingestDocument.isModified(), equalTo(true));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNestedSetPropertyValue() {
|
||||
data.setPropertyValue("a.b.c.d", "foo");
|
||||
assertThat(data.getDocument().get("a"), instanceOf(Map.class));
|
||||
Map<String, Object> a = (Map<String, Object>) data.getDocument().get("a");
|
||||
ingestDocument.setPropertyValue("a.b.c.d", "foo");
|
||||
assertThat(ingestDocument.getSource().get("a"), instanceOf(Map.class));
|
||||
Map<String, Object> a = (Map<String, Object>) ingestDocument.getSource().get("a");
|
||||
assertThat(a.get("b"), instanceOf(Map.class));
|
||||
Map<String, Object> b = (Map<String, Object>) a.get("b");
|
||||
assertThat(b.get("c"), instanceOf(Map.class));
|
||||
|
@ -138,110 +138,110 @@ public class DataTests extends ESTestCase {
|
|||
assertThat(c.get("d"), instanceOf(String.class));
|
||||
String d = (String) c.get("d");
|
||||
assertThat(d, equalTo("foo"));
|
||||
assertThat(data.isModified(), equalTo(true));
|
||||
assertThat(ingestDocument.isModified(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testSetPropertyValueOnExistingField() {
|
||||
data.setPropertyValue("foo", "newbar");
|
||||
assertThat(data.getDocument().get("foo"), equalTo("newbar"));
|
||||
ingestDocument.setPropertyValue("foo", "newbar");
|
||||
assertThat(ingestDocument.getSource().get("foo"), equalTo("newbar"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSetPropertyValueOnExistingParent() {
|
||||
data.setPropertyValue("fizz.new", "bar");
|
||||
assertThat(data.getDocument().get("fizz"), instanceOf(Map.class));
|
||||
Map<String, Object> innerMap = (Map<String, Object>) data.getDocument().get("fizz");
|
||||
ingestDocument.setPropertyValue("fizz.new", "bar");
|
||||
assertThat(ingestDocument.getSource().get("fizz"), instanceOf(Map.class));
|
||||
Map<String, Object> innerMap = (Map<String, Object>) ingestDocument.getSource().get("fizz");
|
||||
assertThat(innerMap.get("new"), instanceOf(String.class));
|
||||
String value = (String) innerMap.get("new");
|
||||
assertThat(value, equalTo("bar"));
|
||||
assertThat(data.isModified(), equalTo(true));
|
||||
assertThat(ingestDocument.isModified(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testSetPropertyValueOnExistingParentTypeMismatch() {
|
||||
try {
|
||||
data.setPropertyValue("fizz.buzz.new", "bar");
|
||||
ingestDocument.setPropertyValue("fizz.buzz.new", "bar");
|
||||
fail("add field should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("cannot add field to parent [buzz] of type [java.lang.String], [java.util.Map] expected instead."));
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSetPropertyValueOnExistingNullParent() {
|
||||
try {
|
||||
data.setPropertyValue("fizz.foo_null.test", "bar");
|
||||
ingestDocument.setPropertyValue("fizz.foo_null.test", "bar");
|
||||
fail("add field should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("cannot add field to null parent, [java.util.Map] expected instead."));
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSetPropertyValueNullName() {
|
||||
try {
|
||||
data.setPropertyValue(null, "bar");
|
||||
ingestDocument.setPropertyValue(null, "bar");
|
||||
fail("add field should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("cannot add null or empty field"));
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSetPropertyValueEmptyName() {
|
||||
try {
|
||||
data.setPropertyValue("", "bar");
|
||||
ingestDocument.setPropertyValue("", "bar");
|
||||
fail("add field should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("cannot add null or empty field"));
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
}
|
||||
}
|
||||
|
||||
public void testRemoveProperty() {
|
||||
data.removeProperty("foo");
|
||||
assertThat(data.isModified(), equalTo(true));
|
||||
assertThat(data.getDocument().size(), equalTo(2));
|
||||
assertThat(data.getDocument().containsKey("foo"), equalTo(false));
|
||||
ingestDocument.removeProperty("foo");
|
||||
assertThat(ingestDocument.isModified(), equalTo(true));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(2));
|
||||
assertThat(ingestDocument.getSource().containsKey("foo"), equalTo(false));
|
||||
}
|
||||
|
||||
public void testRemoveInnerProperty() {
|
||||
data.removeProperty("fizz.buzz");
|
||||
assertThat(data.getDocument().size(), equalTo(3));
|
||||
assertThat(data.getDocument().get("fizz"), instanceOf(Map.class));
|
||||
ingestDocument.removeProperty("fizz.buzz");
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(3));
|
||||
assertThat(ingestDocument.getSource().get("fizz"), instanceOf(Map.class));
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>)data.getDocument().get("fizz");
|
||||
Map<String, Object> map = (Map<String, Object>) ingestDocument.getSource().get("fizz");
|
||||
assertThat(map.size(), equalTo(1));
|
||||
assertThat(map.containsKey("buzz"), equalTo(false));
|
||||
|
||||
data.removeProperty("fizz.foo_null");
|
||||
ingestDocument.removeProperty("fizz.foo_null");
|
||||
assertThat(map.size(), equalTo(0));
|
||||
assertThat(data.getDocument().size(), equalTo(3));
|
||||
assertThat(data.getDocument().containsKey("fizz"), equalTo(true));
|
||||
assertThat(data.isModified(), equalTo(true));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(3));
|
||||
assertThat(ingestDocument.getSource().containsKey("fizz"), equalTo(true));
|
||||
assertThat(ingestDocument.isModified(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testRemoveNonExistingProperty() {
|
||||
data.removeProperty("does_not_exist");
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(data.getDocument().size(), equalTo(3));
|
||||
ingestDocument.removeProperty("does_not_exist");
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(3));
|
||||
}
|
||||
|
||||
public void testRemoveExistingParentTypeMismatch() {
|
||||
data.removeProperty("foo.test");
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(data.getDocument().size(), equalTo(3));
|
||||
ingestDocument.removeProperty("foo.test");
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(3));
|
||||
}
|
||||
|
||||
public void testRemoveNullProperty() {
|
||||
data.removeProperty(null);
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(data.getDocument().size(), equalTo(3));
|
||||
ingestDocument.removeProperty(null);
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(3));
|
||||
}
|
||||
|
||||
public void testRemoveEmptyProperty() {
|
||||
data.removeProperty("");
|
||||
assertThat(data.isModified(), equalTo(false));
|
||||
assertThat(data.getDocument().size(), equalTo(3));
|
||||
ingestDocument.removeProperty("");
|
||||
assertThat(ingestDocument.isModified(), equalTo(false));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(3));
|
||||
}
|
||||
|
||||
public void testEqualsAndHashcode() throws Exception {
|
||||
|
@ -250,7 +250,7 @@ public class DataTests extends ESTestCase {
|
|||
String id = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldName = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldValue = randomAsciiOfLengthBetween(1, 10);
|
||||
Data data = new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue));
|
||||
IngestDocument ingestDocument = new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue));
|
||||
|
||||
boolean changed = false;
|
||||
String otherIndex;
|
||||
|
@ -282,16 +282,16 @@ public class DataTests extends ESTestCase {
|
|||
document = Collections.singletonMap(fieldName, fieldValue);
|
||||
}
|
||||
|
||||
Data otherData = new Data(otherIndex, otherType, otherId, document);
|
||||
IngestDocument otherIngestDocument = new IngestDocument(otherIndex, otherType, otherId, document);
|
||||
if (changed) {
|
||||
assertThat(data, not(equalTo(otherData)));
|
||||
assertThat(otherData, not(equalTo(data)));
|
||||
assertThat(ingestDocument, not(equalTo(otherIngestDocument)));
|
||||
assertThat(otherIngestDocument, not(equalTo(ingestDocument)));
|
||||
} else {
|
||||
assertThat(data, equalTo(otherData));
|
||||
assertThat(otherData, equalTo(data));
|
||||
Data thirdData = new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue));
|
||||
assertThat(thirdData, equalTo(data));
|
||||
assertThat(data, equalTo(thirdData));
|
||||
assertThat(ingestDocument, equalTo(otherIngestDocument));
|
||||
assertThat(otherIngestDocument, equalTo(ingestDocument));
|
||||
IngestDocument thirdIngestDocument = new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue));
|
||||
assertThat(thirdIngestDocument, equalTo(ingestDocument));
|
||||
assertThat(ingestDocument, equalTo(thirdIngestDocument));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,8 +106,8 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
assertThat(response.getResults().size(), equalTo(1));
|
||||
assertThat(response.getResults().get(0), instanceOf(SimulateDocumentSimpleResult.class));
|
||||
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
|
||||
Data expectedData = new Data("index", "type", "id", Collections.singletonMap("foo", "bar"));
|
||||
assertThat(simulateDocumentSimpleResult.getData(), equalTo(expectedData));
|
||||
IngestDocument expectedIngestDocument = new IngestDocument("index", "type", "id", Collections.singletonMap("foo", "bar"));
|
||||
assertThat(simulateDocumentSimpleResult.getData(), equalTo(expectedIngestDocument));
|
||||
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.date;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
@ -36,9 +36,9 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList("yyyy dd MM hh:mm:ss"), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "2010 12 06 11:05:15");
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T11:05:15.000+02:00"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T11:05:15.000+02:00"));
|
||||
}
|
||||
|
||||
public void testJodaPatternMultipleFormats() {
|
||||
|
@ -51,27 +51,27 @@ public class DateProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "2010 12 06");
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
|
||||
document = new HashMap<>();
|
||||
document.put("date_as_string", "12/06/2010");
|
||||
data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
|
||||
document = new HashMap<>();
|
||||
document.put("date_as_string", "12-06-2010");
|
||||
data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
|
||||
document = new HashMap<>();
|
||||
document.put("date_as_string", "2010");
|
||||
data = new Data("index", "type", "id", document);
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
try {
|
||||
dateProcessor.execute(data);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
fail("processor should have failed due to not supported date format");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("unable to parse date [2010]"));
|
||||
|
@ -83,9 +83,9 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList("yyyy dd MMM"), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "2010 12 giugno");
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
}
|
||||
|
||||
public void testJodaPatternDefaultYear() {
|
||||
|
@ -93,9 +93,9 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList("dd/MM"), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "12/06");
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo(DateTime.now().getYear() + "-06-12T00:00:00.000+02:00"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo(DateTime.now().getYear() + "-06-12T00:00:00.000+02:00"));
|
||||
}
|
||||
|
||||
public void testTAI64N() {
|
||||
|
@ -104,9 +104,9 @@ public class DateProcessorTests extends ESTestCase {
|
|||
Map<String, Object> document = new HashMap<>();
|
||||
String dateAsString = (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024";
|
||||
document.put("date_as_string", dateAsString);
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("2012-12-22T03:00:46.767+02:00"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("2012-12-22T03:00:46.767+02:00"));
|
||||
}
|
||||
|
||||
public void testUnixMs() {
|
||||
|
@ -114,9 +114,9 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList(DateParserFactory.UNIX_MS), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "1000500");
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z"));
|
||||
}
|
||||
|
||||
public void testUnix() {
|
||||
|
@ -124,8 +124,8 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList(DateParserFactory.UNIX), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "1000.5");
|
||||
Data data = new Data("index", "type", "id", document);
|
||||
dateProcessor.execute(data);
|
||||
assertThat(data.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z"));
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getPropertyValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.ingest.processor.geoip;
|
||||
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
@ -38,13 +38,13 @@ public class GeoIpProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("source_field", "82.170.213.79");
|
||||
Data data = new Data("_index", "_type", "_id", document);
|
||||
processor.execute(data);
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document);
|
||||
processor.execute(ingestDocument);
|
||||
|
||||
assertThat(data.getDocument().size(), equalTo(2));
|
||||
assertThat(data.getDocument().get("source_field"), equalTo("82.170.213.79"));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(2));
|
||||
assertThat(ingestDocument.getSource().get("source_field"), equalTo("82.170.213.79"));
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> geoData = (Map<String, Object>) data.getDocument().get("target_field");
|
||||
Map<String, Object> geoData = (Map<String, Object>) ingestDocument.getSource().get("target_field");
|
||||
assertThat(geoData.size(), equalTo(10));
|
||||
assertThat(geoData.get("ip"), equalTo("82.170.213.79"));
|
||||
assertThat(geoData.get("country_iso_code"), equalTo("NL"));
|
||||
|
@ -64,13 +64,13 @@ public class GeoIpProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("source_field", "82.170.213.79");
|
||||
Data data = new Data("_index", "_type", "_id", document);
|
||||
processor.execute(data);
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document);
|
||||
processor.execute(ingestDocument);
|
||||
|
||||
assertThat(data.getDocument().size(), equalTo(2));
|
||||
assertThat(data.getDocument().get("source_field"), equalTo("82.170.213.79"));
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(2));
|
||||
assertThat(ingestDocument.getSource().get("source_field"), equalTo("82.170.213.79"));
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> geoData = (Map<String, Object>) data.getDocument().get("target_field");
|
||||
Map<String, Object> geoData = (Map<String, Object>) ingestDocument.getSource().get("target_field");
|
||||
assertThat(geoData.size(), equalTo(4));
|
||||
assertThat(geoData.get("ip"), equalTo("82.170.213.79"));
|
||||
assertThat(geoData.get("country_iso_code"), equalTo("NL"));
|
||||
|
@ -84,10 +84,10 @@ public class GeoIpProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("source_field", "202.45.11.11");
|
||||
Data data = new Data("_index", "_type", "_id", document);
|
||||
processor.execute(data);
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document);
|
||||
processor.execute(ingestDocument);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> geoData = (Map<String, Object>) data.getDocument().get("target_field");
|
||||
Map<String, Object> geoData = (Map<String, Object>) ingestDocument.getSource().get("target_field");
|
||||
assertThat(geoData.size(), equalTo(0));
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
|
||||
|
||||
public class MutateProcessorTests extends ESTestCase {
|
||||
private Data data;
|
||||
private IngestDocument ingestDocument;
|
||||
|
||||
@Before
|
||||
public void setData() {
|
||||
|
@ -49,35 +49,35 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
fizz.put("buzz", "hello world");
|
||||
document.put("fizz", fizz);
|
||||
|
||||
data = new Data("index", "type", "id", document);
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
}
|
||||
|
||||
public void testUpdate() throws IOException {
|
||||
Map<String, Object> update = new HashMap<>();
|
||||
update.put("foo", 123);
|
||||
Processor processor = new MutateProcessor(update, null, null, null, null, null, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("foo", Integer.class), equalTo(123));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("foo", Integer.class), equalTo(123));
|
||||
}
|
||||
|
||||
public void testRename() throws IOException {
|
||||
Map<String, String> rename = new HashMap<>();
|
||||
rename.put("foo", "bar");
|
||||
Processor processor = new MutateProcessor(null, rename, null, null, null, null, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("bar", String.class), equalTo("bar"));
|
||||
assertThat(data.hasPropertyValue("foo"), is(false));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("bar", String.class), equalTo("bar"));
|
||||
assertThat(ingestDocument.hasPropertyValue("foo"), is(false));
|
||||
}
|
||||
|
||||
public void testConvert() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("num", "integer");
|
||||
Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("num", Integer.class), equalTo(64));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("num", Integer.class), equalTo(64));
|
||||
}
|
||||
|
||||
public void testConvertNullField() throws IOException {
|
||||
|
@ -85,7 +85,7 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
convert.put("null", "integer");
|
||||
Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null);
|
||||
try {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
fail("processor execute should have failed");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Field \"null\" is null, cannot be converted to a/an integer"));
|
||||
|
@ -96,18 +96,18 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("arr", "integer");
|
||||
Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("arr", List.class), equalTo(Arrays.asList(1, 2, 3)));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("arr", List.class), equalTo(Arrays.asList(1, 2, 3)));
|
||||
}
|
||||
|
||||
public void testSplit() throws IOException {
|
||||
Map<String, String> split = new HashMap<>();
|
||||
split.put("ip", "\\.");
|
||||
Processor processor = new MutateProcessor(null, null, null, split, null, null, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("ip", List.class), equalTo(Arrays.asList("127", "0", "0", "1")));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("ip", List.class), equalTo(Arrays.asList("127", "0", "0", "1")));
|
||||
}
|
||||
|
||||
public void testSplitNullValue() throws IOException {
|
||||
|
@ -115,7 +115,7 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
split.put("not.found", "\\.");
|
||||
Processor processor = new MutateProcessor(null, null, null, split, null, null, null, null, null, null);
|
||||
try {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Cannot split field. [not.found] is null."));
|
||||
|
@ -125,16 +125,16 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
public void testGsub() throws IOException {
|
||||
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression("ip", Pattern.compile("\\."), "-"));
|
||||
Processor processor = new MutateProcessor(null, null, null, null, gsubExpressions, null, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("ip", String.class), equalTo("127-0-0-1"));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("ip", String.class), equalTo("127-0-0-1"));
|
||||
}
|
||||
|
||||
public void testGsub_NullValue() throws IOException {
|
||||
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression("null_field", Pattern.compile("\\."), "-"));
|
||||
Processor processor = new MutateProcessor(null, null, null, null, gsubExpressions, null, null, null, null, null);
|
||||
try {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
fail("processor execution should have failed");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Field \"null_field\" is null, cannot match pattern."));
|
||||
|
@ -145,34 +145,34 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
HashMap<String, String> join = new HashMap<>();
|
||||
join.put("arr", "-");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, join, null, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("arr", String.class), equalTo("1-2-3"));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("arr", String.class), equalTo("1-2-3"));
|
||||
}
|
||||
|
||||
public void testRemove() throws IOException {
|
||||
List<String> remove = Arrays.asList("foo", "ip");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, remove, null, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(5));
|
||||
assertThat(data.getPropertyValue("foo", Object.class), nullValue());
|
||||
assertThat(data.getPropertyValue("ip", Object.class), nullValue());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(5));
|
||||
assertThat(ingestDocument.getPropertyValue("foo", Object.class), nullValue());
|
||||
assertThat(ingestDocument.getPropertyValue("ip", Object.class), nullValue());
|
||||
}
|
||||
|
||||
public void testTrim() throws IOException {
|
||||
List<String> trim = Arrays.asList("to_strip", "foo");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, trim, null, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("foo", String.class), equalTo("bar"));
|
||||
assertThat(data.getPropertyValue("to_strip", String.class), equalTo("clean"));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("bar"));
|
||||
assertThat(ingestDocument.getPropertyValue("to_strip", String.class), equalTo("clean"));
|
||||
}
|
||||
|
||||
public void testTrimNullValue() throws IOException {
|
||||
List<String> trim = Collections.singletonList("not.found");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, trim, null, null);
|
||||
try {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Cannot trim field. [not.found] is null."));
|
||||
|
@ -182,16 +182,16 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
public void testUppercase() throws IOException {
|
||||
List<String> uppercase = Collections.singletonList("foo");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("foo", String.class), equalTo("BAR"));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("BAR"));
|
||||
}
|
||||
|
||||
public void testUppercaseNullValue() throws IOException {
|
||||
List<String> uppercase = Collections.singletonList("not.found");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
|
||||
try {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Cannot uppercase field. [not.found] is null."));
|
||||
|
@ -201,16 +201,16 @@ public class MutateProcessorTests extends ESTestCase {
|
|||
public void testLowercase() throws IOException {
|
||||
List<String> lowercase = Collections.singletonList("alpha");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase);
|
||||
processor.execute(data);
|
||||
assertThat(data.getDocument().size(), equalTo(7));
|
||||
assertThat(data.getPropertyValue("alpha", String.class), equalTo("abcd"));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(7));
|
||||
assertThat(ingestDocument.getPropertyValue("alpha", String.class), equalTo("abcd"));
|
||||
}
|
||||
|
||||
public void testLowercaseNullValue() throws IOException {
|
||||
List<String> lowercase = Collections.singletonList("not.found");
|
||||
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase);
|
||||
try {
|
||||
processor.execute(data);
|
||||
processor.execute(ingestDocument);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Cannot lowercase field. [not.found] is null."));
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -59,25 +59,25 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
|
||||
public void testExecute_pipelineDoesNotExist() {
|
||||
when(store.get("_id")).thenReturn(null);
|
||||
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap());
|
||||
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
|
||||
executionService.execute(data, "_id", listener);
|
||||
executionService.execute(ingestDocument, "_id", listener);
|
||||
verify(listener).failed(any(IllegalArgumentException.class));
|
||||
verify(listener, times(0)).executed(data);
|
||||
verify(listener, times(0)).executed(ingestDocument);
|
||||
}
|
||||
|
||||
public void testExecute_success() throws Exception {
|
||||
Processor processor = mock(Processor.class);
|
||||
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
|
||||
|
||||
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap());
|
||||
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
|
||||
executionService.execute(data, "_id", listener);
|
||||
executionService.execute(ingestDocument, "_id", listener);
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
verify(processor).execute(data);
|
||||
verify(listener).executed(data);
|
||||
verify(processor).execute(ingestDocument);
|
||||
verify(listener).executed(ingestDocument);
|
||||
verify(listener, times(0)).failed(any(Exception.class));
|
||||
}
|
||||
});
|
||||
|
@ -86,15 +86,15 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
public void testExecute_failure() throws Exception {
|
||||
Processor processor = mock(Processor.class);
|
||||
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
|
||||
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
||||
doThrow(new RuntimeException()).when(processor).execute(data);
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap());
|
||||
doThrow(new RuntimeException()).when(processor).execute(ingestDocument);
|
||||
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
|
||||
executionService.execute(data, "_id", listener);
|
||||
executionService.execute(ingestDocument, "_id", listener);
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
verify(processor).execute(data);
|
||||
verify(listener, times(0)).executed(data);
|
||||
verify(processor).execute(ingestDocument);
|
||||
verify(listener, times(0)).executed(ingestDocument);
|
||||
verify(listener).failed(any(RuntimeException.class));
|
||||
}
|
||||
});
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
|
||||
|
@ -80,7 +80,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
|
@ -121,16 +121,16 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
Answer answer = new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
Data data = (Data) invocationOnMock.getArguments()[0];
|
||||
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
||||
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
|
||||
listener.executed(data);
|
||||
listener.executed(ingestDocument);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
@ -151,10 +151,10 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
return null;
|
||||
}
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(actionListener).onFailure(exception);
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport;
|
|||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -39,7 +39,7 @@ public class TransportDataTests extends ESTestCase {
|
|||
String id = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldName = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldValue = randomAsciiOfLengthBetween(1, 10);
|
||||
TransportData transportData = new TransportData(new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
|
||||
TransportData transportData = new TransportData(new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
|
||||
|
||||
boolean changed = false;
|
||||
String otherIndex;
|
||||
|
@ -71,23 +71,23 @@ public class TransportDataTests extends ESTestCase {
|
|||
document = Collections.singletonMap(fieldName, fieldValue);
|
||||
}
|
||||
|
||||
TransportData otherTransportData = new TransportData(new Data(otherIndex, otherType, otherId, document));
|
||||
TransportData otherTransportData = new TransportData(new IngestDocument(otherIndex, otherType, otherId, document));
|
||||
if (changed) {
|
||||
assertThat(transportData, not(equalTo(otherTransportData)));
|
||||
assertThat(otherTransportData, not(equalTo(transportData)));
|
||||
} else {
|
||||
assertThat(transportData, equalTo(otherTransportData));
|
||||
assertThat(otherTransportData, equalTo(transportData));
|
||||
TransportData thirdTransportData = new TransportData(new Data(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
|
||||
TransportData thirdTransportData = new TransportData(new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
|
||||
assertThat(thirdTransportData, equalTo(transportData));
|
||||
assertThat(transportData, equalTo(thirdTransportData));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
TransportData transportData = new TransportData(data);
|
||||
TransportData transportData = new TransportData(ingestDocument);
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
transportData.writeTo(out);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
|
@ -29,6 +29,9 @@ import org.junit.Before;
|
|||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX;
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE;
|
||||
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
@ -80,12 +83,12 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
|
|||
assertThat(actualRequest.isVerbose(), equalTo(false));
|
||||
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
|
||||
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
|
||||
for (Data data : actualRequest.getDocuments()) {
|
||||
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
|
||||
Map<String, Object> expectedDocument = expectedDocsIterator.next();
|
||||
assertThat(data.getDocument(), equalTo(expectedDocument.get(Fields.SOURCE)));
|
||||
assertThat(data.getIndex(), equalTo(expectedDocument.get(Fields.INDEX)));
|
||||
assertThat(data.getType(), equalTo(expectedDocument.get(Fields.TYPE)));
|
||||
assertThat(data.getId(), equalTo(expectedDocument.get(Fields.ID)));
|
||||
assertThat(ingestDocument.getSource(), equalTo(expectedDocument.get(Fields.SOURCE)));
|
||||
assertThat(ingestDocument.getMetadata(INDEX), equalTo(expectedDocument.get(Fields.INDEX)));
|
||||
assertThat(ingestDocument.getMetadata(TYPE), equalTo(expectedDocument.get(Fields.TYPE)));
|
||||
assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(Fields.ID)));
|
||||
}
|
||||
|
||||
assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID));
|
||||
|
@ -133,12 +136,12 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
|
|||
assertThat(actualRequest.isVerbose(), equalTo(false));
|
||||
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
|
||||
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
|
||||
for (Data data : actualRequest.getDocuments()) {
|
||||
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
|
||||
Map<String, Object> expectedDocument = expectedDocsIterator.next();
|
||||
assertThat(data.getDocument(), equalTo(expectedDocument.get(Fields.SOURCE)));
|
||||
assertThat(data.getIndex(), equalTo(expectedDocument.get(Fields.INDEX)));
|
||||
assertThat(data.getType(), equalTo(expectedDocument.get(Fields.TYPE)));
|
||||
assertThat(data.getId(), equalTo(expectedDocument.get(Fields.ID)));
|
||||
assertThat(ingestDocument.getSource(), equalTo(expectedDocument.get(Fields.SOURCE)));
|
||||
assertThat(ingestDocument.getMetadata(INDEX), equalTo(expectedDocument.get(Fields.INDEX)));
|
||||
assertThat(ingestDocument.getMetadata(TYPE), equalTo(expectedDocument.get(Fields.TYPE)));
|
||||
assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(Fields.ID)));
|
||||
}
|
||||
|
||||
assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID));
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -38,9 +38,9 @@ public class SimulateDocumentSimpleResultTests extends ESTestCase {
|
|||
if (isFailure) {
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test"));
|
||||
} else {
|
||||
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(data);
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(ingestDocument);
|
||||
}
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -40,7 +40,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
private SimulateExecutionService executionService;
|
||||
private Pipeline pipeline;
|
||||
private Processor processor;
|
||||
private Data data;
|
||||
private IngestDocument ingestDocument;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
@ -53,7 +53,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
processor = mock(Processor.class);
|
||||
when(processor.getType()).thenReturn("mock");
|
||||
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor));
|
||||
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||
ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -62,35 +62,35 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteVerboseItem() throws Exception {
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
||||
verify(processor, times(2)).execute(data);
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, ingestDocument);
|
||||
verify(processor, times(2)).execute(ingestDocument);
|
||||
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
|
||||
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorId(), equalTo("processor[mock]-0"));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), not(sameInstance(data)));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), equalTo(data));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), not(sameInstance(ingestDocument)));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getData(), equalTo(ingestDocument));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1"));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(data)));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(data));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(ingestDocument)));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(ingestDocument));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
|
||||
}
|
||||
|
||||
public void testExecuteItem() throws Exception {
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
|
||||
verify(processor, times(2)).execute(data);
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, ingestDocument);
|
||||
verify(processor, times(2)).execute(ingestDocument);
|
||||
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
|
||||
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
|
||||
assertThat(simulateDocumentSimpleResult.getData(), equalTo(data));
|
||||
assertThat(simulateDocumentSimpleResult.getData(), equalTo(ingestDocument));
|
||||
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
|
||||
}
|
||||
|
||||
public void testExecuteVerboseItemWithFailure() throws Exception {
|
||||
Exception e = new RuntimeException("processor failed");
|
||||
doThrow(e).doNothing().when(processor).execute(data);
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
|
||||
verify(processor, times(2)).execute(data);
|
||||
doThrow(e).doNothing().when(processor).execute(ingestDocument);
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, ingestDocument);
|
||||
verify(processor, times(2)).execute(ingestDocument);
|
||||
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
|
||||
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
|
||||
|
@ -100,8 +100,8 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
|
||||
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorId(), equalTo("processor[mock]-1"));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(data)));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(data));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), not(sameInstance(ingestDocument)));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getData(), equalTo(ingestDocument));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
|
||||
runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure();
|
||||
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
|
||||
|
@ -109,9 +109,9 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
|
||||
public void testExecuteItemWithFailure() throws Exception {
|
||||
Exception e = new RuntimeException("processor failed");
|
||||
doThrow(e).when(processor).execute(data);
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
|
||||
verify(processor, times(1)).execute(data);
|
||||
doThrow(e).when(processor).execute(ingestDocument);
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, ingestDocument);
|
||||
verify(processor, times(1)).execute(ingestDocument);
|
||||
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
|
||||
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
|
||||
assertThat(simulateDocumentSimpleResult.getData(), nullValue());
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -42,7 +42,7 @@ public class SimulatePipelineResponseTests extends ESTestCase {
|
|||
List<SimulateDocumentResult> results = new ArrayList<>(numResults);
|
||||
for (int i = 0; i < numResults; i++) {
|
||||
boolean isFailure = randomBoolean();
|
||||
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
if (isVerbose) {
|
||||
int numProcessors = randomIntBetween(1, 10);
|
||||
|
@ -53,18 +53,18 @@ public class SimulatePipelineResponseTests extends ESTestCase {
|
|||
if (isFailure) {
|
||||
processorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test"));
|
||||
} else {
|
||||
processorResult = new SimulateProcessorResult(processorId, data);
|
||||
processorResult = new SimulateProcessorResult(processorId, ingestDocument);
|
||||
}
|
||||
processorResults.add(processorResult);
|
||||
}
|
||||
results.add(new SimulateDocumentVerboseResult(processorResults));
|
||||
} else {
|
||||
results.add(new SimulateDocumentSimpleResult(data));
|
||||
results.add(new SimulateDocumentSimpleResult(ingestDocument));
|
||||
SimulateDocumentSimpleResult simulateDocumentSimpleResult;
|
||||
if (isFailure) {
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test"));
|
||||
} else {
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(data);
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(ingestDocument);
|
||||
}
|
||||
results.add(simulateDocumentSimpleResult);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -39,9 +39,9 @@ public class SimulateProcessorResultTests extends ESTestCase {
|
|||
if (isFailure) {
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test"));
|
||||
} else {
|
||||
Data data = new Data(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorId, data);
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorId, ingestDocument);
|
||||
}
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
|
|
Loading…
Reference in New Issue