[7.x] Allow_duplicates option for append processor (#61916) (#63257)

This commit is contained in:
Dan Hermann 2020-10-06 09:03:47 -05:00 committed by GitHub
parent a3252af5c0
commit 7a59ae8fa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 196 additions and 32 deletions

View File

@ -40,11 +40,13 @@ public final class AppendProcessor extends AbstractProcessor {
private final TemplateScript.Factory field;
private final ValueSource value;
private final boolean allowDuplicates;
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
super(tag, description);
this.field = field;
this.value = value;
this.allowDuplicates = allowDuplicates;
}
public TemplateScript.Factory getField() {
@ -57,7 +59,7 @@ public final class AppendProcessor extends AbstractProcessor {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue(field, value);
ingestDocument.appendFieldValue(field, value, allowDuplicates);
return ingestDocument;
}
@ -79,9 +81,11 @@ public final class AppendProcessor extends AbstractProcessor {
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"field", field, scriptService);
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService));
return new AppendProcessor(processorTag, description, compiledTemplate, ValueSource.wrap(value, scriptService),
allowDuplicates);
}
}
}

View File

@ -28,13 +28,16 @@ import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.containsInAnyOrder;
public class AppendProcessorTests extends ESTestCase {
@ -53,13 +56,13 @@ public class AppendProcessorTests extends ESTestCase {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
@ -82,13 +85,13 @@ public class AppendProcessorTests extends ESTestCase {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
@ -106,13 +109,13 @@ public class AppendProcessorTests extends ESTestCase {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
appendProcessor = createAppendProcessor(field, value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
appendProcessor = createAppendProcessor(field, values, true);
}
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
@ -132,13 +135,13 @@ public class AppendProcessorTests extends ESTestCase {
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
values.add(value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(randomAlphaOfLengthBetween(1, 10));
}
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values, true);
}
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
@ -156,10 +159,65 @@ public class AppendProcessorTests extends ESTestCase {
}
}
private static Processor createAppendProcessor(String fieldName, Object fieldValue) {
public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);
List<Object> valuesToAppend = new ArrayList<>();
valuesToAppend.add(originalValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
assertThat(fieldValue, not(instanceOf(List.class)));
assertThat(fieldValue, equalTo(originalValue));
}
public void testAppendingUniqueValueToScalar() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, originalValue);
List<Object> valuesToAppend = new ArrayList<>();
String newValue = randomAlphaOfLengthBetween(1, 10);
valuesToAppend.add(newValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
assertThat(list.size(), equalTo(2));
assertThat(list, equalTo(org.elasticsearch.common.collect.List.of(originalValue, newValue)));
}
public void testAppendingToListWithDuplicatesDisallowed() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
List<String> list = new ArrayList<>();
int size = randomIntBetween(0, 10);
for (int i = 0; i < size; i++) {
list.add(randomAlphaOfLengthBetween(1, 10));
}
String originalField = RandomDocumentPicks.addRandomField(random(), ingestDocument, list);
List<String> expectedValues = new ArrayList<>(list);
List<String> existingValues = randomSubsetOf(list);
int uniqueValuesSize = randomIntBetween(0, 10);
List<String> uniqueValues = new ArrayList<>();
for (int i = 0; i < uniqueValuesSize; i++) {
uniqueValues.add(randomAlphaOfLengthBetween(1, 10));
}
List<String> valuesToAppend = new ArrayList<>(existingValues);
valuesToAppend.addAll(uniqueValues);
expectedValues.addAll(uniqueValues);
Collections.sort(valuesToAppend);
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
assertThat(fieldValue, sameInstance(list));
assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
}
private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
return new AppendProcessor(randomAlphaOfLength(10),
null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()));
ValueSource.wrap(fieldValue, TestTemplateService.instance()), allowDuplicates);
}
private enum Scalar {

View File

@ -206,8 +206,10 @@ public class ForEachProcessorTests extends ESTestCase {
ForEachProcessor processor = new ForEachProcessor(
"_tag", null, "values", new CompoundProcessor(false,
Collections.singletonList(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added"))))
org.elasticsearch.common.collect.List.of(
new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
org.elasticsearch.common.collect.List.of(
new AppendProcessor("_tag", null, template, (model) -> (Collections.singletonList("added")), true))
), false);
processor.execute(ingestDocument, (result, e) -> {});

View File

@ -379,7 +379,24 @@ public final class IngestDocument {
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(String path, Object value) {
setFieldValue(path, value, true);
appendFieldValue(path, value, true);
}
/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param path The path within the document in dot-notation
* @param value The value or values to append to the existing ones
* @param allowDuplicates When false, any values that already exist in the field will not be added
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(String path, Object value, boolean allowDuplicates) {
setFieldValue(path, value, true, allowDuplicates);
}
/**
@ -399,6 +416,24 @@ public final class IngestDocument {
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model));
}
/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appended to the
* existing (or newly created) list.
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
* @param valueSource The value source that will produce the value or values to append to the existing ones
* @param allowDuplicates When false, any values that already exist in the field will not be added
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource, boolean allowDuplicates) {
Map<String, Object> model = createTemplateModel();
appendFieldValue(fieldPathTemplate.newInstance(model).execute(), valueSource.copyAndResolve(model), allowDuplicates);
}
/**
* Sets the provided value to the provided path in the document.
* Any non existing path element will be created.
@ -454,6 +489,10 @@ public final class IngestDocument {
}
private void setFieldValue(String path, Object value, boolean append) {
setFieldValue(path, value, append, true);
}
private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
FieldPath fieldPath = new FieldPath(path);
Object context = fieldPath.initialContext;
for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
@ -502,7 +541,7 @@ public final class IngestDocument {
if (append) {
if (map.containsKey(leafKey)) {
Object object = map.get(leafKey);
List<Object> list = appendValues(object, value);
Object list = appendValues(object, value, allowDuplicates);
if (list != object) {
map.put(leafKey, list);
}
@ -530,7 +569,7 @@ public final class IngestDocument {
}
if (append) {
Object object = list.get(index);
List<Object> newList = appendValues(object, value);
Object newList = appendValues(object, value, allowDuplicates);
if (newList != object) {
list.set(index, newList);
}
@ -544,7 +583,7 @@ public final class IngestDocument {
}
@SuppressWarnings("unchecked")
private static List<Object> appendValues(Object maybeList, Object value) {
private static Object appendValues(Object maybeList, Object value, boolean allowDuplicates) {
List<Object> list;
if (maybeList instanceof List) {
//maybeList is already a list, we append the provided values to it
@ -554,8 +593,13 @@ public final class IngestDocument {
list = new ArrayList<>();
list.add(maybeList);
}
appendValues(list, value);
return list;
if (allowDuplicates) {
appendValues(list, value);
return list;
} else {
// if no values were appended due to duplication, return the original object so the ingest document remains unmodified
return appendValuesWithoutDuplicates(list, value) ? list : maybeList;
}
}
private static void appendValues(List<Object> list, Object value) {
@ -566,6 +610,25 @@ public final class IngestDocument {
}
}
private static boolean appendValuesWithoutDuplicates(List<Object> list, Object value) {
boolean valuesWereAppended = false;
if (value instanceof List) {
List<?> valueList = (List<?>) value;
for (Object val : valueList) {
if (list.contains(val) == false) {
list.add(val);
valuesWereAppended = true;
}
}
} else {
if (list.contains(value) == false) {
list.add(value);
valuesWereAppended = true;
}
}
return valuesWereAppended;
}
private static <T> T cast(String path, Object object, Class<T> clazz) {
if (object == null) {
return null;

View File

@ -73,8 +73,12 @@ public class IngestDocumentTests extends ESTestCase {
value.put("field", "value");
list.add(value);
list.add(null);
document.put("list", list);
List<String> list2 = new ArrayList<>();
list2.add("foo");
list2.add("bar");
list2.add("baz");
document.put("list2", list2);
ingestDocument = new IngestDocument("index", "type", "id", null, null, null, document);
}
@ -444,6 +448,26 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(list.get(2), equalTo("new_value"));
}
public void testListAppendFieldValueWithDuplicate() {
ingestDocument.appendFieldValue("list2", "foo", false);
Object object = ingestDocument.getSourceAndMetadata().get("list2");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(3));
assertThat(list, equalTo(org.elasticsearch.common.collect.List.of("foo", "bar", "baz")));
}
public void testListAppendFieldValueWithoutDuplicate() {
ingestDocument.appendFieldValue("list2", "foo2", false);
Object object = ingestDocument.getSourceAndMetadata().get("list2");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(4));
assertThat(list, equalTo(org.elasticsearch.common.collect.List.of("foo", "bar", "baz", "foo2")));
}
public void testListAppendFieldValues() {
ingestDocument.appendFieldValue("list", Arrays.asList("item1", "item2", "item3"));
Object object = ingestDocument.getSourceAndMetadata().get("list");
@ -458,6 +482,19 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(list.get(4), equalTo("item3"));
}
public void testListAppendFieldValuesWithoutDuplicates() {
ingestDocument.appendFieldValue("list2", org.elasticsearch.common.collect.List.of("foo", "bar", "baz", "foo2"), false);
Object object = ingestDocument.getSourceAndMetadata().get("list2");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(4));
assertThat(list.get(0), equalTo("foo"));
assertThat(list.get(1), equalTo("bar"));
assertThat(list.get(2), equalTo("baz"));
assertThat(list.get(3), equalTo("foo2"));
}
public void testAppendFieldValueToNonExistingList() {
ingestDocument.appendFieldValue("non_existing_list", "new_value");
Object object = ingestDocument.getSourceAndMetadata().get("non_existing_list");
@ -753,23 +790,23 @@ public class IngestDocumentTests extends ESTestCase {
public void testRemoveField() {
ingestDocument.removeField("foo");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(7));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("foo"), equalTo(false));
ingestDocument.removeField("_index");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(7));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("_index"), equalTo(false));
ingestDocument.removeField("_source.fizz");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(5));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(false));
assertThat(ingestDocument.getIngestMetadata().size(), equalTo(1));
ingestDocument.removeField("_ingest.timestamp");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(5));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
assertThat(ingestDocument.getIngestMetadata().size(), equalTo(0));
}
public void testRemoveInnerField() {
ingestDocument.removeField("fizz.buzz");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(9));
assertThat(ingestDocument.getSourceAndMetadata().get("fizz"), instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("fizz");
@ -778,17 +815,17 @@ public class IngestDocumentTests extends ESTestCase {
ingestDocument.removeField("fizz.foo_null");
assertThat(map.size(), equalTo(2));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(9));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true));
ingestDocument.removeField("fizz.1");
assertThat(map.size(), equalTo(1));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(9));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true));
ingestDocument.removeField("fizz.list");
assertThat(map.size(), equalTo(0));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(9));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true));
}
@ -822,7 +859,7 @@ public class IngestDocumentTests extends ESTestCase {
public void testRemoveIngestObject() {
ingestDocument.removeField("_ingest");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(7));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("_ingest"), equalTo(false));
}
@ -844,7 +881,7 @@ public class IngestDocumentTests extends ESTestCase {
public void testListRemoveField() {
ingestDocument.removeField("list.0.field");
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(9));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("list"), equalTo(true));
Object object = ingestDocument.getSourceAndMetadata().get("list");
assertThat(object, instanceOf(List.class));