version set in ingest pipeline (#27573)

Add support version and version_type in ingest pipelines

Add support for setting document version and version type in set
processor of an ingest pipeline.
This commit is contained in:
Yu 2018-02-21 09:34:51 +01:00 committed by Martijn van Groningen
parent 86e5e38b41
commit 7d8fb69d50
17 changed files with 244 additions and 69 deletions

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.MetaData;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
@ -122,10 +124,10 @@ public class AppendProcessorTests extends ESTestCase {
}
}
public void testAppendMetadata() throws Exception {
//here any metadata field value becomes a list, which won't make sense in most of the cases,
public void testAppendMetadataExceptVersion() throws Exception {
// here any metadata field value becomes a list, which won't make sense in most of the cases,
// but support for append is streamlined like for set so we test it
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT);
List<String> values = new ArrayList<>();
Processor appendProcessor;
if (randomBoolean()) {

View File

@ -38,7 +38,7 @@ public class DateIndexNameProcessorTests extends ESTestCase {
"events-", "y", "yyyyMMdd"
);
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", "2016-04-25T12:24:20.101Z"));
processor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{20160425||/y{yyyyMMdd|UTC}}>"));
@ -48,7 +48,7 @@ public class DateIndexNameProcessorTests extends ESTestCase {
Function<String, DateTime> function = DateFormat.Tai64n.getFunction(null, DateTimeZone.UTC, null);
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024"));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{20121222||/m{yyyyMMdd|UTC}}>"));
@ -58,12 +58,12 @@ public class DateIndexNameProcessorTests extends ESTestCase {
Function<String, DateTime> function = DateFormat.UnixMs.getFunction(null, DateTimeZone.UTC, null);
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", "1000500"));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));
document = new IngestDocument("_index", "_type", "_id", null, null,
document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", 1000500L));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));
@ -73,7 +73,7 @@ public class DateIndexNameProcessorTests extends ESTestCase {
Function<String, DateTime> function = DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null);
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("_field", "1000.5"));
dateProcessor.execute(document);
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));

View File

@ -45,7 +45,7 @@ public class ForEachProcessorTests extends ESTestCase {
values.add("bar");
values.add("baz");
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
ForEachProcessor processor = new ForEachProcessor(
@ -61,7 +61,7 @@ public class ForEachProcessorTests extends ESTestCase {
public void testExecuteWithFailure() throws Exception {
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c"))
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c"))
);
TestProcessor testProcessor = new TestProcessor(id -> {
@ -101,7 +101,7 @@ public class ForEachProcessorTests extends ESTestCase {
values.add(new HashMap<>());
values.add(new HashMap<>());
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
TestProcessor innerProcessor = new TestProcessor(id -> {
@ -132,7 +132,7 @@ public class ForEachProcessorTests extends ESTestCase {
document.put("values", values);
document.put("flat_values", new ArrayList<>());
document.put("other", "value");
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, document);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, document);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new SetProcessor("_tag",
@ -171,7 +171,7 @@ public class ForEachProcessorTests extends ESTestCase {
values.add("");
}
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
@ -190,7 +190,7 @@ public class ForEachProcessorTests extends ESTestCase {
values.add(1);
values.add(null);
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
TemplateScript.Factory template = new TestTemplateService.MockTemplateScript.Factory("errors");
@ -220,7 +220,7 @@ public class ForEachProcessorTests extends ESTestCase {
source.put("_value", "new_value");
source.put("values", values);
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, source
"_index", "_type", "_id", null, null, null, null, source
);
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
@ -251,7 +251,7 @@ public class ForEachProcessorTests extends ESTestCase {
values.add(value);
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, Collections.singletonMap("values1", values)
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values1", values)
);
TestProcessor testProcessor = new TestProcessor(

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.MetaData;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
@ -99,14 +101,30 @@ public class SetProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
}
public void testSetMetadata() throws Exception {
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
public void testSetMetadataExceptVersion() throws Exception {
MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT);
Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value"));
}
public void testSetMetadataVersion() throws Exception {
long version = randomNonNegativeLong();
Processor processor = createSetProcessor(MetaData.VERSION.getFieldName(), version, true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(MetaData.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
}
public void testSetMetadataVersionType() throws Exception {
String versionType = randomFrom("internal", "external", "external_gte");
Processor processor = createSetProcessor(MetaData.VERSION_TYPE.getFieldName(), versionType, true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(MetaData.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
}
private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) {
return new SetProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled);

View File

@ -0,0 +1,76 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
---
"Test set document version & version type":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline1"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "_version",
"value": 1
}
},
{
"set" : {
"field" : "_version_type",
"value": "internal"
}
}
]
}
- match: { acknowledged: true }
- do:
ingest.put_pipeline:
id: "my_pipeline2"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "_version",
"value": 1
}
},
{
"set" : {
"field" : "_version_type",
"value": "external"
}
}
]
}
- match: { acknowledged: true }
- do:
catch: conflict
index:
index: test
type: test
id: 1
pipeline: "my_pipeline1"
body: {}
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline2"
body: {}
- match: { _version: 1 }

View File

@ -33,7 +33,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase {
public void testAccessMetaDataViaTemplate() {
Map<String, Object> document = new HashMap<>();
document.put("foo", "bar");
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{foo}}", scriptService));
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 bar"));
@ -48,7 +48,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase {
innerObject.put("baz", "hello baz");
innerObject.put("qux", Collections.singletonMap("fubar", "hello qux and fubar"));
document.put("foo", innerObject);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
ingestDocument.setFieldValue(compile("field1"),
ValueSource.wrap("1 {{foo.bar}} {{foo.baz}} {{foo.qux.fubar}}", scriptService));
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 hello bar hello baz hello qux and fubar"));
@ -67,7 +67,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase {
list.add(value);
list.add(null);
document.put("list2", list);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{list1.0}} {{list2.0}}", scriptService));
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 foo {field=value}"));
}
@ -77,7 +77,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase {
Map<String, Object> ingestMap = new HashMap<>();
ingestMap.put("timestamp", "bogus_timestamp");
document.put("_ingest", ingestMap);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
ingestDocument.setFieldValue(compile("ingest_timestamp"),
ValueSource.wrap("{{_ingest.timestamp}} and {{_source._ingest.timestamp}}", scriptService));
assertThat(ingestDocument.getFieldValue("ingest_timestamp", String.class),

View File

@ -64,7 +64,7 @@ public class ValueSourceMustacheIT extends AbstractScriptTestCase {
}
public void testAccessSourceViaTemplate() {
IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, new HashMap<>());
IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, null, null, new HashMap<>());
assertThat(ingestDocument.hasField("marvel"), is(false));
ingestDocument.setFieldValue(compile("{{_index}}"), ValueSource.wrap("{{_index}}", scriptService));
assertThat(ingestDocument.getFieldValue("marvel", String.class), equalTo("marvel"));

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
@ -195,8 +196,17 @@ public class SimulatePipelineRequest extends ActionRequest {
dataMap, MetaData.ROUTING.getFieldName());
String parent = ConfigurationUtils.readOptionalStringOrIntProperty(null, null,
dataMap, MetaData.PARENT.getFieldName());
Long version = null;
if (dataMap.containsKey(MetaData.VERSION.getFieldName())) {
version = (Long) ConfigurationUtils.readObject(null, null, dataMap, MetaData.VERSION.getFieldName());
}
VersionType versionType = null;
if (dataMap.containsKey(MetaData.VERSION_TYPE.getFieldName())) {
versionType = VersionType.fromString(ConfigurationUtils.readStringProperty(null, null, dataMap,
MetaData.VERSION_TYPE.getFieldName()));
}
IngestDocument ingestDocument =
new IngestDocument(index, type, id, routing, parent, document);
new IngestDocument(index, type, id, routing, parent, version, versionType, document);
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;

View File

@ -68,10 +68,10 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("doc");
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, Object> metadata : metadataMap.entrySet()) {
if (metadata.getValue() != null) {
builder.field(metadata.getKey().getFieldName(), metadata.getValue());
builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString());
}
}
builder.field("_source", ingestDocument.getSourceAndMetadata());

View File

@ -20,14 +20,14 @@
package org.elasticsearch.ingest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.script.TemplateScript;
import java.time.ZoneOffset;
@ -56,7 +56,8 @@ public final class IngestDocument {
private final Map<String, Object> sourceAndMetadata;
private final Map<String, Object> ingestMetadata;
public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source) {
public IngestDocument(String index, String type, String id, String routing, String parent,
Long version, VersionType versionType, Map<String, Object> source) {
this.sourceAndMetadata = new HashMap<>();
this.sourceAndMetadata.putAll(source);
this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index);
@ -68,6 +69,12 @@ public final class IngestDocument {
if (parent != null) {
this.sourceAndMetadata.put(MetaData.PARENT.getFieldName(), parent);
}
if (version != null) {
sourceAndMetadata.put(MetaData.VERSION.getFieldName(), version);
}
if (versionType != null) {
sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), VersionType.toString(versionType));
}
this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
@ -559,10 +566,10 @@ public final class IngestDocument {
* one time operation that extracts the metadata fields from the ingest document and returns them.
* Metadata fields that used to be accessible as ordinary top level fields will be removed as part of this call.
*/
public Map<MetaData, String> extractMetadata() {
Map<MetaData, String> metadataMap = new EnumMap<>(MetaData.class);
public Map<MetaData, Object> extractMetadata() {
Map<MetaData, Object> metadataMap = new EnumMap<>(MetaData.class);
for (MetaData metaData : MetaData.values()) {
metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), String.class));
metadataMap.put(metaData, sourceAndMetadata.remove(metaData.getFieldName()));
}
return metadataMap;
}
@ -649,7 +656,9 @@ public final class IngestDocument {
TYPE(TypeFieldMapper.NAME),
ID(IdFieldMapper.NAME),
ROUTING(RoutingFieldMapper.NAME),
PARENT(ParentFieldMapper.NAME);
PARENT(ParentFieldMapper.NAME),
VERSION(VersionFieldMapper.NAME),
VERSION_TYPE("_version_type");
private final String fieldName;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
@ -164,18 +165,24 @@ public class PipelineExecutionService implements ClusterStateApplier {
String id = indexRequest.id();
String routing = indexRequest.routing();
String parent = indexRequest.parent();
Long version = indexRequest.version();
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap);
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, version, versionType, sourceAsMap);
pipeline.execute(ingestDocument);
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values
//before ingestion, which might also get modified during ingestion.
indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX));
indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE));
indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID));
indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING));
indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT));
indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX));
indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE));
indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID));
indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING));
indexRequest.parent((String) metadataMap.get(IngestDocument.MetaData.PARENT));
indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue());
if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
}
indexRequest.source(ingestDocument.getSourceAndMetadata());
} catch (Exception e) {
totalStats.ingestFailed();

View File

@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
@ -44,6 +45,8 @@ import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.IngestDocument.MetaData.PARENT;
import static org.elasticsearch.ingest.IngestDocument.MetaData.ROUTING;
import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE;
import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION;
import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION_TYPE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
@ -98,7 +101,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
Map<String, Object> expectedDocument = expectedDocsIterator.next();
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName())));
assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName())));
@ -120,17 +123,28 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
for (int i = 0; i < numDocs; i++) {
Map<String, Object> doc = new HashMap<>();
Map<String, Object> expectedDoc = new HashMap<>();
List<IngestDocument.MetaData> fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT);
List<IngestDocument.MetaData> fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT, VERSION, VERSION_TYPE);
for(IngestDocument.MetaData field : fields) {
if(randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
if (field == VERSION) {
Long value = randomLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
}
else {
Integer value = randomIntBetween(1, 1000000);
} else if (field == VERSION_TYPE) {
String value = VersionType.toString(
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)
);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), String.valueOf(value));
expectedDoc.put(field.getFieldName(), value);
} else {
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
Integer value = randomIntBetween(1, 1000000);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), String.valueOf(value));
}
}
}
String fieldName = randomAlphaOfLengthBetween(1, 10);
@ -175,12 +189,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
Map<String, Object> expectedDocument = expectedDocsIterator.next();
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName())));
assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName())));
assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName())));
assertThat(metadataMap.get(PARENT), equalTo(expectedDocument.get(PARENT.getFieldName())));
assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName())));
assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName())));
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
}

View File

@ -133,13 +133,13 @@ public class WriteableIngestDocumentTests extends ESTestCase {
Map<String, Object> toXContentSource = (Map<String, Object>) toXContentDoc.get("_source");
Map<String, Object> toXContentIngestMetadata = (Map<String, Object>) toXContentDoc.get("_ingest");
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, Object> metadata : metadataMap.entrySet()) {
String fieldName = metadata.getKey().getFieldName();
if (metadata.getValue() == null) {
assertThat(toXContentDoc.containsKey(fieldName), is(false));
} else {
assertThat(toXContentDoc.get(fieldName), equalTo(metadata.getValue()));
assertThat(toXContentDoc.get(fieldName), equalTo(metadata.getValue().toString()));
}
}

View File

@ -127,7 +127,7 @@ public class IngestClientIT extends ESIntegTestCase {
source.put("foo", "bar");
source.put("fail", false);
source.put("processed", true);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, source);
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
}

View File

@ -76,7 +76,7 @@ public class IngestDocumentTests extends ESTestCase {
list.add(null);
document.put("list", list);
ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
}
public void testSimpleGetFieldValue() {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.CustomTypeSafeMatcher;
@ -157,10 +158,18 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
long newVersion = randomLong();
String versionType = randomFrom("internal", "external", "external_gt", "external_gte");
doAnswer((InvocationOnMock invocationOnMock) -> {
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName());
if (metaData == IngestDocument.MetaData.VERSION) {
ingestDocument.setFieldValue(metaData.getFieldName(), newVersion);
} else if (metaData == IngestDocument.MetaData.VERSION_TYPE) {
ingestDocument.setFieldValue(metaData.getFieldName(), versionType);
} else {
ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName());
}
}
return null;
}).when(processor).execute(any());
@ -175,12 +184,13 @@ public class PipelineExecutionServiceTests extends ESTestCase {
verify(processor).execute(any());
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
assertThat(indexRequest.index(), equalTo("update_index"));
assertThat(indexRequest.type(), equalTo("update_type"));
assertThat(indexRequest.id(), equalTo("update_id"));
assertThat(indexRequest.routing(), equalTo("update_routing"));
assertThat(indexRequest.parent(), equalTo("update_parent"));
assertThat(indexRequest.version(), equalTo(newVersion));
assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType)));
}
public void testExecuteFailure() throws Exception {
@ -188,13 +198,15 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(processor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
@ -207,7 +219,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@ -225,14 +238,17 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(),
indexRequest.versionType(), Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(processor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
@ -246,15 +262,19 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Collections.singletonList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(processor).execute(eqID("_index", "_type", "_id",
indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
@ -380,12 +400,20 @@ public class PipelineExecutionServiceTests extends ESTestCase {
return argThat(new IngestDocumentMatcher(index, type, id, source));
}
private IngestDocument eqID(String index, String type, String id, Long version, VersionType versionType, Map<String, Object> source) {
return argThat(new IngestDocumentMatcher(index, type, id, version, versionType, source));
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {
private final IngestDocument ingestDocument;
IngestDocumentMatcher(String index, String type, String id, Map<String, Object> source) {
this.ingestDocument = new IngestDocument(index, type, id, null, null, source);
this.ingestDocument = new IngestDocument(index, type, id, null, null, null, null, source);
}
IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map<String, Object> source) {
this.ingestDocument = new IngestDocument(index, type, id, null, null, version, versionType, source);
}
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.elasticsearch.index.VersionType;
import java.util.ArrayList;
import java.util.HashMap;
@ -138,6 +139,9 @@ public final class RandomDocumentPicks {
String type = randomString(random);
String id = randomString(random);
String routing = null;
Long version = randomNonNegtiveLong(random);
VersionType versionType = RandomPicks.randomFrom(random,
new VersionType[]{VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE});
if (random.nextBoolean()) {
routing = randomString(random);
}
@ -145,7 +149,7 @@ public final class RandomDocumentPicks {
if (random.nextBoolean()) {
parent = randomString(random);
}
return new IngestDocument(index, type, id, routing, parent, source);
return new IngestDocument(index, type, id, routing, parent, version, versionType, source);
}
public static Map<String, Object> randomSource(Random random) {
@ -219,6 +223,11 @@ public final class RandomDocumentPicks {
return RandomStrings.randomUnicodeOfCodepointLengthBetween(random, 1, 10);
}
private static Long randomNonNegtiveLong(Random random) {
long randomLong = random.nextLong();
return randomLong == Long.MIN_VALUE ? 0 : Math.abs(randomLong);
}
private static void addRandomFields(Random random, Map<String, Object> parentNode, int currentDepth) {
if (currentDepth > 5) {
return;