Add support for transient metadata to IngestDocument
IngestDocument now holds an additional map of transient metadata. The only field that gets added automatically is `timestamp`, which contains the timestamp of ingestion in ISO8601 format. In the future it will be possible to eventually add or modify these fields, which will not get indexed, but they will be available via templates to all of the processors. Transient metadata will be visualized by the simulate api, although they will never get indexed. Moved WriteableIngestDocument to the simulate package as it's only used by simulate and it's now modelled for that specific usecase. Also taken the chance to remove one IngestDocument constructor used only for testing (accepting only a subset of es metadata fields). While doing that introduced some more randomizations to some existing processor tests. Closes #15036
This commit is contained in:
parent
5bc1e46113
commit
b0d7d604ff
|
@ -21,6 +21,8 @@ package org.elasticsearch.ingest;
|
|||
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
@ -28,38 +30,54 @@ import java.util.*;
|
|||
*/
|
||||
public final class IngestDocument {
|
||||
|
||||
private final Map<String, String> metaData;
|
||||
static final String TIMESTAMP = "timestamp";
|
||||
|
||||
private final Map<String, String> esMetadata;
|
||||
private final Map<String, Object> source;
|
||||
private final Map<String, String> ingestMetadata;
|
||||
|
||||
private boolean sourceModified = false;
|
||||
|
||||
public IngestDocument(String index, String type, String id, Map<String, Object> source) {
|
||||
this(index, type, id, null, null, null, null, source);
|
||||
}
|
||||
|
||||
public IngestDocument(String index, String type, String id, String routing, String parent, String timestamp, String ttl, Map<String, Object> source) {
|
||||
this.metaData = new HashMap<>();
|
||||
this.metaData.put(MetaData.INDEX.getFieldName(), index);
|
||||
this.metaData.put(MetaData.TYPE.getFieldName(), type);
|
||||
this.metaData.put(MetaData.ID.getFieldName(), id);
|
||||
this.esMetadata = new HashMap<>();
|
||||
this.esMetadata.put(MetaData.INDEX.getFieldName(), index);
|
||||
this.esMetadata.put(MetaData.TYPE.getFieldName(), type);
|
||||
this.esMetadata.put(MetaData.ID.getFieldName(), id);
|
||||
if (routing != null) {
|
||||
this.metaData.put(MetaData.ROUTING.getFieldName(), routing);
|
||||
this.esMetadata.put(MetaData.ROUTING.getFieldName(), routing);
|
||||
}
|
||||
if (parent != null) {
|
||||
this.metaData.put(MetaData.PARENT.getFieldName(), parent);
|
||||
this.esMetadata.put(MetaData.PARENT.getFieldName(), parent);
|
||||
}
|
||||
if (timestamp != null) {
|
||||
this.metaData.put(MetaData.TIMESTAMP.getFieldName(), timestamp);
|
||||
this.esMetadata.put(MetaData.TIMESTAMP.getFieldName(), timestamp);
|
||||
}
|
||||
if (ttl != null) {
|
||||
this.metaData.put(MetaData.TTL.getFieldName(), ttl);
|
||||
this.esMetadata.put(MetaData.TTL.getFieldName(), ttl);
|
||||
}
|
||||
this.source = source;
|
||||
this.ingestMetadata = new HashMap<>();
|
||||
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT);
|
||||
df.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||
this.ingestMetadata.put(TIMESTAMP, df.format(new Date()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties of the one provided as argument
|
||||
*/
|
||||
public IngestDocument(IngestDocument other) {
|
||||
this.metaData = new HashMap<>(other.metaData);
|
||||
this.source = new HashMap<>(other.source);
|
||||
this(other.esMetadata, other.source, other.ingestMetadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor needed for testing that allows to create a new {@link IngestDocument} given the provided elasticsearch metadata,
|
||||
* source and ingest metadata. This is needed because the ingest metadata will be initialized with the current timestamp at
|
||||
* init time, which makes comparisons impossible in tests.
|
||||
*/
|
||||
public IngestDocument(Map<String, String> esMetadata, Map<String, Object> source, Map<String, String> ingestMetadata) {
|
||||
this.esMetadata = new HashMap<>(esMetadata);
|
||||
this.source = new HashMap<>(source);
|
||||
this.ingestMetadata = new HashMap<>(ingestMetadata);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -335,12 +353,28 @@ public final class IngestDocument {
|
|||
}
|
||||
}
|
||||
|
||||
public String getMetadata(MetaData metaData) {
|
||||
return this.metaData.get(metaData.getFieldName());
|
||||
public String getEsMetadata(MetaData esMetadata) {
|
||||
return this.esMetadata.get(esMetadata.getFieldName());
|
||||
}
|
||||
|
||||
public void setMetaData(MetaData metaData, String value) {
|
||||
this.metaData.put(metaData.getFieldName(), value);
|
||||
public Map<String, String> getEsMetadata() {
|
||||
return Collections.unmodifiableMap(esMetadata);
|
||||
}
|
||||
|
||||
public void setEsMetadata(MetaData metaData, String value) {
|
||||
this.esMetadata.put(metaData.getFieldName(), value);
|
||||
}
|
||||
|
||||
public String getIngestMetadata(String ingestMetadata) {
|
||||
return this.ingestMetadata.get(ingestMetadata);
|
||||
}
|
||||
|
||||
public Map<String, String> getIngestMetadata() {
|
||||
return Collections.unmodifiableMap(this.ingestMetadata);
|
||||
}
|
||||
|
||||
public void setIngestMetadata(String metadata, String value) {
|
||||
this.ingestMetadata.put(metadata, value);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -391,19 +425,21 @@ public final class IngestDocument {
|
|||
|
||||
IngestDocument other = (IngestDocument) obj;
|
||||
return Objects.equals(source, other.source) &&
|
||||
Objects.equals(metaData, other.metaData);
|
||||
Objects.equals(esMetadata, other.esMetadata) &&
|
||||
Objects.equals(ingestMetadata, other.ingestMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(metaData, source);
|
||||
return Objects.hash(esMetadata, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IngestDocument{" +
|
||||
"metaData=" + metaData +
|
||||
"esMetadata=" + esMetadata +
|
||||
", source=" + source +
|
||||
", ingestMetadata=" + ingestMetadata +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -447,7 +483,5 @@ public final class IngestDocument {
|
|||
throw new IllegalArgumentException("no valid metadata field name [" + value + "]");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public final class MetaDataProcessor implements Processor {
|
|||
for (Map.Entry<MetaData, Mustache> entry : templates.entrySet()) {
|
||||
StringWriter writer = new StringWriter();
|
||||
entry.getValue().execute(writer, model);
|
||||
ingestDocument.setMetaData(entry.getKey(), writer.toString());
|
||||
ingestDocument.setEsMetadata(entry.getKey(), writer.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,13 +65,13 @@ public class PipelineExecutionService {
|
|||
if (ingestDocument.isSourceModified()) {
|
||||
indexRequest.source(ingestDocument.getSource());
|
||||
}
|
||||
indexRequest.index(ingestDocument.getMetadata(IngestDocument.MetaData.INDEX));
|
||||
indexRequest.type(ingestDocument.getMetadata(IngestDocument.MetaData.TYPE));
|
||||
indexRequest.id(ingestDocument.getMetadata(IngestDocument.MetaData.ID));
|
||||
indexRequest.routing(ingestDocument.getMetadata(IngestDocument.MetaData.ROUTING));
|
||||
indexRequest.parent(ingestDocument.getMetadata(IngestDocument.MetaData.PARENT));
|
||||
indexRequest.timestamp(ingestDocument.getMetadata(IngestDocument.MetaData.TIMESTAMP));
|
||||
indexRequest.ttl(ingestDocument.getMetadata(IngestDocument.MetaData.TTL));
|
||||
indexRequest.index(ingestDocument.getEsMetadata(IngestDocument.MetaData.INDEX));
|
||||
indexRequest.type(ingestDocument.getEsMetadata(IngestDocument.MetaData.TYPE));
|
||||
indexRequest.id(ingestDocument.getEsMetadata(IngestDocument.MetaData.ID));
|
||||
indexRequest.routing(ingestDocument.getEsMetadata(IngestDocument.MetaData.ROUTING));
|
||||
indexRequest.parent(ingestDocument.getEsMetadata(IngestDocument.MetaData.PARENT));
|
||||
indexRequest.timestamp(ingestDocument.getEsMetadata(IngestDocument.MetaData.TIMESTAMP));
|
||||
indexRequest.ttl(ingestDocument.getEsMetadata(IngestDocument.MetaData.TTL));
|
||||
listener.onResponse(ingestDocument);
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
|
|
|
@ -23,17 +23,20 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.plugin.ingest.transport.WriteableIngestDocument;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SimulateDocumentSimpleResult implements SimulateDocumentResult<SimulateDocumentSimpleResult> {
|
||||
|
||||
private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult((IngestDocument)null);
|
||||
private static final SimulateDocumentSimpleResult PROTOTYPE = new SimulateDocumentSimpleResult();
|
||||
|
||||
private WriteableIngestDocument ingestDocument;
|
||||
private Exception failure;
|
||||
|
||||
private SimulateDocumentSimpleResult() {
|
||||
this.ingestDocument = null;
|
||||
}
|
||||
|
||||
public SimulateDocumentSimpleResult(IngestDocument ingestDocument) {
|
||||
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
|
||||
}
|
||||
|
|
|
@ -26,18 +26,22 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.plugin.ingest.transport.WriteableIngestDocument;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SimulateProcessorResult implements Writeable<SimulateProcessorResult>, ToXContent {
|
||||
|
||||
private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult(null, (IngestDocument)null);
|
||||
private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult();
|
||||
|
||||
private String processorId;
|
||||
private WriteableIngestDocument ingestDocument;
|
||||
private Exception failure;
|
||||
|
||||
private SimulateProcessorResult() {
|
||||
this.processorId = null;
|
||||
this.ingestDocument = null;
|
||||
}
|
||||
|
||||
public SimulateProcessorResult(String processorId, IngestDocument ingestDocument) {
|
||||
this.processorId = processorId;
|
||||
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport;
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -31,63 +31,59 @@ import java.io.IOException;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData.*;
|
||||
final class WriteableIngestDocument implements Writeable<WriteableIngestDocument>, ToXContent {
|
||||
|
||||
public class WriteableIngestDocument implements Writeable<WriteableIngestDocument>, ToXContent {
|
||||
|
||||
private static final WriteableIngestDocument PROTOTYPE = new WriteableIngestDocument(null);
|
||||
private static final WriteableIngestDocument PROTOTYPE = new WriteableIngestDocument();
|
||||
|
||||
private final IngestDocument ingestDocument;
|
||||
|
||||
public WriteableIngestDocument(IngestDocument ingestDocument) {
|
||||
private WriteableIngestDocument() {
|
||||
this.ingestDocument = null;
|
||||
}
|
||||
|
||||
WriteableIngestDocument(IngestDocument ingestDocument) {
|
||||
assert ingestDocument != null;
|
||||
this.ingestDocument = ingestDocument;
|
||||
}
|
||||
|
||||
public IngestDocument getIngestDocument() {
|
||||
IngestDocument getIngestDocument() {
|
||||
return ingestDocument;
|
||||
}
|
||||
|
||||
public static WriteableIngestDocument readWriteableIngestDocumentFrom(StreamInput in) throws IOException {
|
||||
static WriteableIngestDocument readWriteableIngestDocumentFrom(StreamInput in) throws IOException {
|
||||
return PROTOTYPE.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteableIngestDocument readFrom(StreamInput in) throws IOException {
|
||||
String index = in.readString();
|
||||
String type = in.readString();
|
||||
String id = in.readString();
|
||||
String routing = in.readOptionalString();
|
||||
String parent = in.readOptionalString();
|
||||
String timestamp = in.readOptionalString();
|
||||
String ttl = in.readOptionalString();
|
||||
Map<String, Object> doc = in.readMap();
|
||||
return new WriteableIngestDocument(new IngestDocument(index, type, id, routing, parent, timestamp, ttl, doc));
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String> esMetadata = (Map<String, String>) in.readGenericValue();
|
||||
Map<String, Object> source = in.readMap();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String> ingestMetadata = (Map<String, String>) in.readGenericValue();
|
||||
return new WriteableIngestDocument(new IngestDocument(esMetadata, source, ingestMetadata));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(ingestDocument.getMetadata(INDEX));
|
||||
out.writeString(ingestDocument.getMetadata(TYPE));
|
||||
out.writeString(ingestDocument.getMetadata(ID));
|
||||
out.writeOptionalString(ingestDocument.getMetadata(ROUTING));
|
||||
out.writeOptionalString(ingestDocument.getMetadata(PARENT));
|
||||
out.writeOptionalString(ingestDocument.getMetadata(TIMESTAMP));
|
||||
out.writeOptionalString(ingestDocument.getMetadata(TTL));
|
||||
out.writeGenericValue(ingestDocument.getEsMetadata());
|
||||
out.writeMap(ingestDocument.getSource());
|
||||
out.writeGenericValue(ingestDocument.getIngestMetadata());
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.DOCUMENT);
|
||||
builder.field(Fields.MODIFIED, ingestDocument.isSourceModified());
|
||||
builder.field(Fields.INDEX, ingestDocument.getMetadata(INDEX));
|
||||
builder.field(Fields.TYPE, ingestDocument.getMetadata(TYPE));
|
||||
builder.field(Fields.ID, ingestDocument.getMetadata(ID));
|
||||
builder.field(Fields.ROUTING, ingestDocument.getMetadata(ROUTING));
|
||||
builder.field(Fields.PARENT, ingestDocument.getMetadata(PARENT));
|
||||
builder.field(Fields.TIMESTAMP, ingestDocument.getMetadata(TIMESTAMP));
|
||||
builder.field(Fields.TTL, ingestDocument.getMetadata(TTL));
|
||||
for (Map.Entry<String, String> esMetadata : ingestDocument.getEsMetadata().entrySet()) {
|
||||
builder.field(esMetadata.getKey(), esMetadata.getValue());
|
||||
}
|
||||
builder.field(Fields.SOURCE, ingestDocument.getSource());
|
||||
builder.startObject(Fields.INGEST);
|
||||
for (Map.Entry<String, String> ingestMetadata : ingestDocument.getIngestMetadata().entrySet()) {
|
||||
builder.field(ingestMetadata.getKey(), ingestMetadata.getValue());
|
||||
}
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -109,16 +105,15 @@ public class WriteableIngestDocument implements Writeable<WriteableIngestDocumen
|
|||
return Objects.hash(ingestDocument);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return ingestDocument.toString();
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
|
||||
static final XContentBuilderString MODIFIED = new XContentBuilderString("modified");
|
||||
static final XContentBuilderString INDEX = new XContentBuilderString(IngestDocument.MetaData.INDEX.getFieldName());
|
||||
static final XContentBuilderString TYPE = new XContentBuilderString(IngestDocument.MetaData.TYPE.getFieldName());
|
||||
static final XContentBuilderString ID = new XContentBuilderString(IngestDocument.MetaData.ID.getFieldName());
|
||||
static final XContentBuilderString ROUTING = new XContentBuilderString(IngestDocument.MetaData.ROUTING.getFieldName());
|
||||
static final XContentBuilderString PARENT = new XContentBuilderString(IngestDocument.MetaData.PARENT.getFieldName());
|
||||
static final XContentBuilderString TIMESTAMP = new XContentBuilderString(IngestDocument.MetaData.TIMESTAMP.getFieldName());
|
||||
static final XContentBuilderString TTL = new XContentBuilderString(IngestDocument.MetaData.TTL.getFieldName());
|
||||
static final XContentBuilderString SOURCE = new XContentBuilderString("_source");
|
||||
static final XContentBuilderString INGEST = new XContentBuilderString("_ingest");
|
||||
}
|
||||
}
|
|
@ -142,7 +142,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
source.put("val", 123.42f);
|
||||
source.put("status", 400);
|
||||
source.put("msg", "foo");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", source);
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
|
||||
assertThat(simulateDocumentSimpleResult.getIngestDocument().getSource(), equalTo(ingestDocument.getSource()));
|
||||
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.ingest;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
@ -46,7 +48,7 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
list.add(value);
|
||||
list.add(null);
|
||||
document.put("list", list);
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
|
||||
assertThat(ingestDocument.isSourceModified(), equalTo(false));
|
||||
}
|
||||
|
||||
|
@ -488,44 +490,53 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testEqualsAndHashcode() throws Exception {
|
||||
String index = randomAsciiOfLengthBetween(1, 10);
|
||||
String type = randomAsciiOfLengthBetween(1, 10);
|
||||
String id = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldName = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldValue = randomAsciiOfLengthBetween(1, 10);
|
||||
IngestDocument ingestDocument = new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue));
|
||||
Map<String, String> esMetadata = new HashMap<>();
|
||||
int numFields = randomIntBetween(1, IngestDocument.MetaData.values().length);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
esMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
Map<String, String> ingestMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
Map<String, Object> document = RandomDocumentPicks.randomDocument(random());
|
||||
IngestDocument ingestDocument = new IngestDocument(esMetadata, document, ingestMetadata);
|
||||
|
||||
boolean changed = false;
|
||||
String otherIndex;
|
||||
Map<String, String> otherEsMetadata;
|
||||
if (randomBoolean()) {
|
||||
otherIndex = randomAsciiOfLengthBetween(1, 10);
|
||||
otherEsMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, IngestDocument.MetaData.values().length);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
otherEsMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
changed = true;
|
||||
} else {
|
||||
otherIndex = index;
|
||||
}
|
||||
String otherType;
|
||||
if (randomBoolean()) {
|
||||
otherType = randomAsciiOfLengthBetween(1, 10);
|
||||
changed = true;
|
||||
} else {
|
||||
otherType = type;
|
||||
}
|
||||
String otherId;
|
||||
if (randomBoolean()) {
|
||||
otherId = randomAsciiOfLengthBetween(1, 10);
|
||||
changed = true;
|
||||
} else {
|
||||
otherId = id;
|
||||
}
|
||||
Map<String, Object> document;
|
||||
if (randomBoolean()) {
|
||||
document = Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
|
||||
changed = true;
|
||||
} else {
|
||||
document = Collections.singletonMap(fieldName, fieldValue);
|
||||
otherEsMetadata = Collections.unmodifiableMap(esMetadata);
|
||||
}
|
||||
|
||||
IngestDocument otherIngestDocument = new IngestDocument(otherIndex, otherType, otherId, document);
|
||||
Map<String, String> otherIngestMetadata;
|
||||
if (randomBoolean()) {
|
||||
otherIngestMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
otherIngestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
changed = true;
|
||||
} else {
|
||||
otherIngestMetadata = Collections.unmodifiableMap(ingestMetadata);
|
||||
}
|
||||
|
||||
Map<String, Object> otherDocument;
|
||||
if (randomBoolean()) {
|
||||
otherDocument = RandomDocumentPicks.randomDocument(random());
|
||||
changed = true;
|
||||
} else {
|
||||
otherDocument = Collections.unmodifiableMap(document);
|
||||
}
|
||||
|
||||
IngestDocument otherIngestDocument = new IngestDocument(otherEsMetadata, otherDocument, otherIngestMetadata);
|
||||
if (changed) {
|
||||
assertThat(ingestDocument, not(equalTo(otherIngestDocument)));
|
||||
assertThat(otherIngestDocument, not(equalTo(ingestDocument)));
|
||||
|
@ -533,7 +544,7 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
assertThat(ingestDocument, equalTo(otherIngestDocument));
|
||||
assertThat(otherIngestDocument, equalTo(ingestDocument));
|
||||
assertThat(ingestDocument.hashCode(), equalTo(otherIngestDocument.hashCode()));
|
||||
IngestDocument thirdIngestDocument = new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue));
|
||||
IngestDocument thirdIngestDocument = new IngestDocument(Collections.unmodifiableMap(esMetadata), Collections.unmodifiableMap(document), Collections.unmodifiableMap(ingestMetadata));
|
||||
assertThat(thirdIngestDocument, equalTo(ingestDocument));
|
||||
assertThat(ingestDocument, equalTo(thirdIngestDocument));
|
||||
assertThat(ingestDocument.hashCode(), equalTo(thirdIngestDocument.hashCode()));
|
||||
|
@ -554,7 +565,7 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
Map<String, Object> myPreciousMap = new HashMap<>();
|
||||
myPreciousMap.put("field2", "value2");
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", new HashMap<>());
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, new HashMap<>());
|
||||
ingestDocument.setFieldValue("field1", myPreciousMap);
|
||||
ingestDocument.removeField("field1.field2");
|
||||
|
||||
|
@ -566,11 +577,32 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
List<String> myPreciousList = new ArrayList<>();
|
||||
myPreciousList.add("value");
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", new HashMap<>());
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, new HashMap<>());
|
||||
ingestDocument.setFieldValue("field1", myPreciousList);
|
||||
ingestDocument.removeField("field1.0");
|
||||
|
||||
assertThat(myPreciousList.size(), equalTo(1));
|
||||
assertThat(myPreciousList.get(0), equalTo("value"));
|
||||
}
|
||||
|
||||
public void testIngestCustomMetadata() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String metadata = randomAsciiOfLengthBetween(1, 10);
|
||||
String value = randomAsciiOfLengthBetween(1, 10);
|
||||
ingestDocument.setIngestMetadata(metadata, value);
|
||||
assertThat(ingestDocument.getIngestMetadata(metadata), equalTo(value));
|
||||
}
|
||||
|
||||
public void testIngestMetadataTimestamp() throws Exception {
|
||||
long before = System.currentTimeMillis();
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
long after = System.currentTimeMillis();
|
||||
String timestampString = ingestDocument.getIngestMetadata("timestamp");
|
||||
assertThat(timestampString, notNullValue());
|
||||
assertThat(timestampString, endsWith("+0000"));
|
||||
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT);
|
||||
Date timestamp = df.parse(timestampString);
|
||||
assertThat(timestamp.getTime(), greaterThanOrEqualTo(before));
|
||||
assertThat(timestamp.getTime(), lessThanOrEqualTo(after));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,7 +132,23 @@ public final class RandomDocumentPicks {
|
|||
String index = randomString(random);
|
||||
String type = randomString(random);
|
||||
String id = randomString(random);
|
||||
return new IngestDocument(index, type, id, document);
|
||||
String routing = null;
|
||||
if (random.nextBoolean()) {
|
||||
routing = randomString(random);
|
||||
}
|
||||
String parent = null;
|
||||
if (random.nextBoolean()) {
|
||||
parent = randomString(random);
|
||||
}
|
||||
String timestamp = null;
|
||||
if (random.nextBoolean()) {
|
||||
timestamp = randomString(random);
|
||||
}
|
||||
String ttl = null;
|
||||
if (random.nextBoolean()) {
|
||||
ttl = randomString(random);
|
||||
}
|
||||
return new IngestDocument(index, type, id, routing, parent, timestamp, ttl, document);
|
||||
}
|
||||
|
||||
public static Map<String, Object> randomDocument(Random random) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.ingest.processor.date;
|
||||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
@ -36,7 +37,7 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList("yyyy dd MM hh:mm:ss"), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "2010 12 06 11:05:15");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("2010-06-12T11:05:15.000+02:00"));
|
||||
}
|
||||
|
@ -51,25 +52,25 @@ public class DateProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "2010 12 06");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
|
||||
document = new HashMap<>();
|
||||
document.put("date_as_string", "12/06/2010");
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
|
||||
document = new HashMap<>();
|
||||
document.put("date_as_string", "12-06-2010");
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
|
||||
document = new HashMap<>();
|
||||
document.put("date_as_string", "2010");
|
||||
ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
try {
|
||||
dateProcessor.execute(ingestDocument);
|
||||
fail("processor should have failed due to not supported date format");
|
||||
|
@ -83,7 +84,7 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList("yyyy dd MMM"), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "2010 12 giugno");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("2010-06-12T00:00:00.000+02:00"));
|
||||
}
|
||||
|
@ -93,7 +94,7 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList("dd/MM"), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "12/06");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo(DateTime.now().getYear() + "-06-12T00:00:00.000+02:00"));
|
||||
}
|
||||
|
@ -104,7 +105,7 @@ public class DateProcessorTests extends ESTestCase {
|
|||
Map<String, Object> document = new HashMap<>();
|
||||
String dateAsString = (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024";
|
||||
document.put("date_as_string", dateAsString);
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("2012-12-22T03:00:46.767+02:00"));
|
||||
}
|
||||
|
@ -114,7 +115,7 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList(DateFormat.UnixMs.toString()), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "1000500");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z"));
|
||||
}
|
||||
|
@ -124,7 +125,7 @@ public class DateProcessorTests extends ESTestCase {
|
|||
"date_as_string", Collections.singletonList(DateFormat.Unix.toString()), "date_as_date");
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("date_as_string", "1000.5");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
dateProcessor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo("1970-01-01T00:16:40.500Z"));
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor.geoip;
|
|||
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
@ -39,7 +40,7 @@ public class GeoIpProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("source_field", "82.170.213.79");
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
processor.execute(ingestDocument);
|
||||
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(2));
|
||||
|
@ -65,7 +66,7 @@ public class GeoIpProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("source_field", "82.170.213.79");
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
processor.execute(ingestDocument);
|
||||
|
||||
assertThat(ingestDocument.getSource().size(), equalTo(2));
|
||||
|
@ -85,7 +86,7 @@ public class GeoIpProcessorTests extends ESTestCase {
|
|||
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("source_field", "202.45.11.11");
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", document);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
|
||||
processor.execute(ingestDocument);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> geoData = (Map<String, Object>) ingestDocument.getSource().get("target_field");
|
||||
|
|
|
@ -32,7 +32,7 @@ public class GrokProcessorTests extends ESTestCase {
|
|||
|
||||
public void testMatch() throws Exception {
|
||||
String fieldName = RandomDocumentPicks.randomFieldName(random());
|
||||
IngestDocument doc = new IngestDocument("index", "type", "id", new HashMap<>());
|
||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
doc.setFieldValue(fieldName, "1");
|
||||
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
|
||||
GrokProcessor processor = new GrokProcessor(grok, fieldName);
|
||||
|
@ -42,7 +42,7 @@ public class GrokProcessorTests extends ESTestCase {
|
|||
|
||||
public void testNoMatch() {
|
||||
String fieldName = RandomDocumentPicks.randomFieldName(random());
|
||||
IngestDocument doc = new IngestDocument("index", "type", "id", new HashMap<>());
|
||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
doc.setFieldValue(fieldName, "23");
|
||||
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
|
||||
GrokProcessor processor = new GrokProcessor(grok, fieldName);
|
||||
|
@ -56,7 +56,7 @@ public class GrokProcessorTests extends ESTestCase {
|
|||
|
||||
public void testNotStringField() {
|
||||
String fieldName = RandomDocumentPicks.randomFieldName(random());
|
||||
IngestDocument doc = new IngestDocument("index", "type", "id", new HashMap<>());
|
||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
doc.setFieldValue(fieldName, 1);
|
||||
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
|
||||
GrokProcessor processor = new GrokProcessor(grok, fieldName);
|
||||
|
@ -70,7 +70,7 @@ public class GrokProcessorTests extends ESTestCase {
|
|||
|
||||
public void testMissingField() {
|
||||
String fieldName = "foo.bar";
|
||||
IngestDocument doc = new IngestDocument("index", "type", "id", new HashMap<>());
|
||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
Grok grok = new Grok(Collections.singletonMap("ONE", "1"), "%{ONE:one}");
|
||||
GrokProcessor processor = new GrokProcessor(grok, fieldName);
|
||||
try {
|
||||
|
|
|
@ -4,6 +4,7 @@ import com.github.mustachejava.DefaultMustacheFactory;
|
|||
import com.github.mustachejava.Mustache;
|
||||
import org.elasticsearch.common.io.FastStringReader;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
|
@ -22,12 +23,11 @@ public class MetaDataProcessorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
MetaDataProcessor processor = new MetaDataProcessor(templates);
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("field", "value"));
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", "value"));
|
||||
processor.execute(ingestDocument);
|
||||
|
||||
for (MetaData metaData : MetaData.values()) {
|
||||
assertThat(ingestDocument.getMetadata(metaData), Matchers.equalTo("some value"));
|
||||
assertThat(ingestDocument.getEsMetadata(metaData), Matchers.equalTo("some value"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -82,9 +82,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
||||
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
|
||||
if (metaData == IngestDocument.MetaData.TTL) {
|
||||
ingestDocument.setMetaData(IngestDocument.MetaData.TTL, "5w");
|
||||
ingestDocument.setEsMetadata(IngestDocument.MetaData.TTL, "5w");
|
||||
} else {
|
||||
ingestDocument.setMetaData(metaData, "update" + metaData.getFieldName());
|
||||
ingestDocument.setEsMetadata(metaData, "update" + metaData.getFieldName());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -176,12 +176,18 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
private final IngestDocument ingestDocument;
|
||||
|
||||
public IngestDocumentMatcher(String index, String type, String id, Map<String, Object> source) {
|
||||
this.ingestDocument = new IngestDocument(index, type, id, source);
|
||||
this.ingestDocument = new IngestDocument(index, type, id, null, null, null, null, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
return Objects.equals(ingestDocument, o);
|
||||
if (o.getClass() == IngestDocument.class) {
|
||||
IngestDocument otherIngestDocument = (IngestDocument) o;
|
||||
//ingest metadata will not be the same (timestamp differs every time)
|
||||
return Objects.equals(ingestDocument.getSource(), otherIngestDocument.getSource())
|
||||
&& Objects.equals(ingestDocument.getEsMetadata(), otherIngestDocument.getEsMetadata());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -123,7 +123,8 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
|
||||
Answer answer = invocationOnMock -> {
|
||||
ActionListener<IngestDocument> listener = (ActionListener) invocationOnMock.getArguments()[2];
|
||||
listener.onResponse(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap()));
|
||||
listener.onResponse(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing(), indexRequest.parent(),
|
||||
indexRequest.timestamp(), indexRequest.ttl() == null ? null : indexRequest.ttl().toString(), indexRequest.sourceAsMap()));
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
|
||||
|
|
|
@ -1,98 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport;
|
||||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class WriteableIngestDocumentTests extends ESTestCase {
|
||||
|
||||
public void testEqualsAndHashcode() throws Exception {
|
||||
String index = randomAsciiOfLengthBetween(1, 10);
|
||||
String type = randomAsciiOfLengthBetween(1, 10);
|
||||
String id = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldName = randomAsciiOfLengthBetween(1, 10);
|
||||
String fieldValue = randomAsciiOfLengthBetween(1, 10);
|
||||
WriteableIngestDocument writeableIngestDocument = new WriteableIngestDocument(new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
|
||||
|
||||
boolean changed = false;
|
||||
String otherIndex;
|
||||
if (randomBoolean()) {
|
||||
otherIndex = randomAsciiOfLengthBetween(1, 10);
|
||||
changed = true;
|
||||
} else {
|
||||
otherIndex = index;
|
||||
}
|
||||
String otherType;
|
||||
if (randomBoolean()) {
|
||||
otherType = randomAsciiOfLengthBetween(1, 10);
|
||||
changed = true;
|
||||
} else {
|
||||
otherType = type;
|
||||
}
|
||||
String otherId;
|
||||
if (randomBoolean()) {
|
||||
otherId = randomAsciiOfLengthBetween(1, 10);
|
||||
changed = true;
|
||||
} else {
|
||||
otherId = id;
|
||||
}
|
||||
Map<String, Object> document;
|
||||
if (randomBoolean()) {
|
||||
document = Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
|
||||
changed = true;
|
||||
} else {
|
||||
document = Collections.singletonMap(fieldName, fieldValue);
|
||||
}
|
||||
|
||||
WriteableIngestDocument otherWriteableIngestDocument = new WriteableIngestDocument(new IngestDocument(otherIndex, otherType, otherId, document));
|
||||
if (changed) {
|
||||
assertThat(writeableIngestDocument, not(equalTo(otherWriteableIngestDocument)));
|
||||
assertThat(otherWriteableIngestDocument, not(equalTo(writeableIngestDocument)));
|
||||
} else {
|
||||
assertThat(writeableIngestDocument, equalTo(otherWriteableIngestDocument));
|
||||
assertThat(otherWriteableIngestDocument, equalTo(writeableIngestDocument));
|
||||
WriteableIngestDocument thirdWriteableIngestDocument = new WriteableIngestDocument(new IngestDocument(index, type, id, Collections.singletonMap(fieldName, fieldValue)));
|
||||
assertThat(thirdWriteableIngestDocument, equalTo(writeableIngestDocument));
|
||||
assertThat(writeableIngestDocument, equalTo(thirdWriteableIngestDocument));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
WriteableIngestDocument writeableIngestDocument = new WriteableIngestDocument(ingestDocument);
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
writeableIngestDocument.writeTo(out);
|
||||
StreamInput streamInput = StreamInput.wrap(out.bytes());
|
||||
WriteableIngestDocument otherWriteableIngestDocument = WriteableIngestDocument.readWriteableIngestDocumentFrom(streamInput);
|
||||
assertThat(otherWriteableIngestDocument, equalTo(writeableIngestDocument));
|
||||
}
|
||||
}
|
|
@ -22,10 +22,10 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
|
@ -38,8 +38,7 @@ public class SimulateDocumentSimpleResultTests extends ESTestCase {
|
|||
if (isFailure) {
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(new IllegalArgumentException("test"));
|
||||
} else {
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
simulateDocumentSimpleResult = new SimulateDocumentSimpleResult(ingestDocument);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -29,7 +30,6 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -53,7 +53,8 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
processor = mock(Processor.class);
|
||||
when(processor.getType()).thenReturn("mock");
|
||||
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor));
|
||||
ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||
//ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
|
||||
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -84,9 +84,9 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
|
|||
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
|
||||
Map<String, Object> expectedDocument = expectedDocsIterator.next();
|
||||
assertThat(ingestDocument.getSource(), equalTo(expectedDocument.get(Fields.SOURCE)));
|
||||
assertThat(ingestDocument.getMetadata(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
|
||||
assertThat(ingestDocument.getMetadata(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName())));
|
||||
assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(ID.getFieldName())));
|
||||
assertThat(ingestDocument.getEsMetadata(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
|
||||
assertThat(ingestDocument.getEsMetadata(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName())));
|
||||
assertThat(ingestDocument.getEsMetadata(ID), equalTo(expectedDocument.get(ID.getFieldName())));
|
||||
}
|
||||
|
||||
assertThat(actualRequest.getPipeline().getId(), equalTo(SimulatePipelineRequest.SIMULATED_PIPELINE_ID));
|
||||
|
@ -137,9 +137,9 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
|
|||
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
|
||||
Map<String, Object> expectedDocument = expectedDocsIterator.next();
|
||||
assertThat(ingestDocument.getSource(), equalTo(expectedDocument.get(Fields.SOURCE)));
|
||||
assertThat(ingestDocument.getMetadata(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
|
||||
assertThat(ingestDocument.getMetadata(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName())));
|
||||
assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(ID.getFieldName())));
|
||||
assertThat(ingestDocument.getEsMetadata(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
|
||||
assertThat(ingestDocument.getEsMetadata(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName())));
|
||||
assertThat(ingestDocument.getEsMetadata(ID), equalTo(expectedDocument.get(ID.getFieldName())));
|
||||
}
|
||||
|
||||
assertThat(actualRequest.getPipeline().getId(), equalTo(SimulatePipelineRequest.SIMULATED_PIPELINE_ID));
|
||||
|
|
|
@ -22,17 +22,15 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
|
||||
public class SimulatePipelineResponseTests extends ESTestCase {
|
||||
|
||||
|
@ -42,8 +40,7 @@ public class SimulatePipelineResponseTests extends ESTestCase {
|
|||
List<SimulateDocumentResult> results = new ArrayList<>(numResults);
|
||||
for (int i = 0; i < numResults; i++) {
|
||||
boolean isFailure = randomBoolean();
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
if (isVerbose) {
|
||||
int numProcessors = randomIntBetween(1, 10);
|
||||
List<SimulateProcessorResult> processorResults = new ArrayList<>(numProcessors);
|
||||
|
|
|
@ -22,10 +22,10 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -39,8 +39,7 @@ public class SimulateProcessorResultTests extends ESTestCase {
|
|||
if (isFailure) {
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorId, new IllegalArgumentException("test"));
|
||||
} else {
|
||||
IngestDocument ingestDocument = new IngestDocument(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10),
|
||||
Collections.singletonMap(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorId, ingestDocument);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class WriteableIngestDocumentTests extends ESTestCase {
|
||||
|
||||
public void testEqualsAndHashcode() throws Exception {
|
||||
Map<String, String> esMetadata = new HashMap<>();
|
||||
int numFields = randomIntBetween(1, IngestDocument.MetaData.values().length);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
esMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
Map<String, String> ingestMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
Map<String, Object> document = RandomDocumentPicks.randomDocument(random());
|
||||
WriteableIngestDocument ingestDocument = new WriteableIngestDocument(new IngestDocument(esMetadata, document, ingestMetadata));
|
||||
|
||||
boolean changed = false;
|
||||
Map<String, String> otherEsMetadata;
|
||||
if (randomBoolean()) {
|
||||
otherEsMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, IngestDocument.MetaData.values().length);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
otherEsMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
changed = true;
|
||||
} else {
|
||||
otherEsMetadata = Collections.unmodifiableMap(esMetadata);
|
||||
}
|
||||
|
||||
Map<String, String> otherIngestMetadata;
|
||||
if (randomBoolean()) {
|
||||
otherIngestMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
otherIngestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
changed = true;
|
||||
} else {
|
||||
otherIngestMetadata = Collections.unmodifiableMap(ingestMetadata);
|
||||
}
|
||||
|
||||
Map<String, Object> otherDocument;
|
||||
if (randomBoolean()) {
|
||||
otherDocument = RandomDocumentPicks.randomDocument(random());
|
||||
changed = true;
|
||||
} else {
|
||||
otherDocument = Collections.unmodifiableMap(document);
|
||||
}
|
||||
|
||||
WriteableIngestDocument otherIngestDocument = new WriteableIngestDocument(new IngestDocument(otherEsMetadata, otherDocument, otherIngestMetadata));
|
||||
if (changed) {
|
||||
assertThat(ingestDocument, not(equalTo(otherIngestDocument)));
|
||||
assertThat(otherIngestDocument, not(equalTo(ingestDocument)));
|
||||
} else {
|
||||
assertThat(ingestDocument, equalTo(otherIngestDocument));
|
||||
assertThat(otherIngestDocument, equalTo(ingestDocument));
|
||||
assertThat(ingestDocument.hashCode(), equalTo(otherIngestDocument.hashCode()));
|
||||
WriteableIngestDocument thirdIngestDocument = new WriteableIngestDocument(new IngestDocument(Collections.unmodifiableMap(esMetadata), Collections.unmodifiableMap(document), Collections.unmodifiableMap(ingestMetadata)));
|
||||
assertThat(thirdIngestDocument, equalTo(ingestDocument));
|
||||
assertThat(ingestDocument, equalTo(thirdIngestDocument));
|
||||
assertThat(ingestDocument.hashCode(), equalTo(thirdIngestDocument.hashCode()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
Map<String, String> esMetadata = new HashMap<>();
|
||||
int numFields = randomIntBetween(1, IngestDocument.MetaData.values().length);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
esMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
Map<String, String> ingestMetadata = new HashMap<>();
|
||||
numFields = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numFields; i++) {
|
||||
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
|
||||
}
|
||||
Map<String, Object> document = RandomDocumentPicks.randomDocument(random());
|
||||
WriteableIngestDocument writeableIngestDocument = new WriteableIngestDocument(new IngestDocument(esMetadata, document, ingestMetadata));
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
writeableIngestDocument.writeTo(out);
|
||||
StreamInput streamInput = StreamInput.wrap(out.bytes());
|
||||
WriteableIngestDocument otherWriteableIngestDocument = WriteableIngestDocument.readWriteableIngestDocumentFrom(streamInput);
|
||||
assertThat(otherWriteableIngestDocument, equalTo(writeableIngestDocument));
|
||||
}
|
||||
}
|
|
@ -49,7 +49,8 @@
|
|||
- is_true: docs.0.doc.modified
|
||||
- match: { docs.0.doc._source.foo: "bar" }
|
||||
- match: { docs.0.doc._source.field2: "_value" }
|
||||
|
||||
- length: { docs.0.doc._ingest: 1 }
|
||||
- is_true: docs.0.doc._ingest.timestamp
|
||||
|
||||
---
|
||||
"Test simulate with provided pipeline definition":
|
||||
|
@ -158,10 +159,14 @@
|
|||
- length: { docs.0.processor_results.0.doc._source: 2 }
|
||||
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
|
||||
- match: { docs.0.processor_results.0.doc._source.field2: "_value" }
|
||||
- length: { docs.0.processor_results.0.doc._ingest: 1 }
|
||||
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
|
||||
- length: { docs.0.processor_results.1.doc._source: 3 }
|
||||
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
|
||||
- match: { docs.0.processor_results.1.doc._source.field2: "_value" }
|
||||
- match: { docs.0.processor_results.1.doc._source..field3: "third_val" }
|
||||
- length: { docs.0.processor_results.1.doc._ingest: 1 }
|
||||
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
|
||||
|
||||
---
|
||||
"Test simulate with exception thrown":
|
||||
|
@ -206,6 +211,8 @@
|
|||
- match: { docs.0.error.type: "illegal_argument_exception" }
|
||||
- is_true: docs.1.doc.modified
|
||||
- match: { docs.1.doc._source.foo: "BAR" }
|
||||
- length: { docs.1.doc._ingest: 1 }
|
||||
- is_true: docs.1.doc._ingest.timestamp
|
||||
|
||||
---
|
||||
"Test verbose simulate with exception thrown":
|
||||
|
@ -262,8 +269,15 @@
|
|||
- match: { docs.0.processor_results.1.doc._type: "type" }
|
||||
- match: { docs.0.processor_results.1.doc._id: "id" }
|
||||
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
|
||||
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
|
||||
- match: { docs.0.processor_results.1.doc._source.bar: "HELLO" }
|
||||
- length: { docs.0.processor_results.1.doc._ingest: 1 }
|
||||
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
|
||||
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
|
||||
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
|
||||
- length: { docs.1.processor_results.0.doc._ingest: 1 }
|
||||
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
|
||||
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
|
||||
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
|
||||
- length: { docs.1.processor_results.1.doc._ingest: 1 }
|
||||
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
|
||||
|
||||
|
|
Loading…
Reference in New Issue