Add append processor

The append processor allows to append one or more values to an existing list; add a new list with the provided values if the field doesn't exist yet, or convert an existing scalar into a list and add the provided values to the newly created  list.

This required adapting of IngestDocument#appendFieldValue behaviour, also added support for templating to it.

Closes #14324
This commit is contained in:
javanna 2015-12-18 10:33:36 +01:00 committed by Luca Cavanna
parent 1b7dc45c28
commit 46f99a11a0
11 changed files with 785 additions and 59 deletions

View File

@ -16,6 +16,21 @@ its value will be replaced with the provided one.
} }
-------------------------------------------------- --------------------------------------------------
==== Append processor
Appends one or more values to an existing array if the field already exists and it is an array.
Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar.
Creates an array containing the provided values if the fields doesn't exist.
Accepts a single value or an array of values.
[source,js]
--------------------------------------------------
{
"append": {
"field1": ["item2", "item3", "item4"]
}
}
--------------------------------------------------
==== Remove processor ==== Remove processor
Removes an existing field. If the field doesn't exist, an exception will be thrown Removes an existing field. If the field doesn't exist, an exception will be thrown

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.Strings;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -243,23 +244,46 @@ public final class IngestDocument {
/** /**
* Appends the provided value to the provided path in the document. * Appends the provided value to the provided path in the document.
* Any non existing path element will be created. Same as {@link #setFieldValue(String, Object)} * Any non existing path element will be created.
* but if the last element is a list, the value will be appended to the existing list. * If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appeneded to the
* existing (or newly created) list.
* @param path The path within the document in dot-notation * @param path The path within the document in dot-notation
* @param value The value to put in for the path key * @param value The value or values to append to the existing ones
* @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. * @throws IllegalArgumentException if the path is null, empty or invalid.
*/ */
public void appendFieldValue(String path, Object value) { public void appendFieldValue(String path, Object value) {
setFieldValue(path, value, true); setFieldValue(path, value, true);
} }
/**
* Appends the provided value to the provided path in the document.
* Any non existing path element will be created.
* If the path identifies a list, the value will be appended to the existing list.
* If the path identifies a scalar, the scalar will be converted to a list and
* the provided value will be added to the newly created list.
* Supports multiple values too provided in forms of list, in that case all the values will be appeneded to the
* existing (or newly created) list.
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
* @param valueSource The value source that will produce the value or values to append to the existing ones
* @throws IllegalArgumentException if the path is null, empty or invalid.
*/
public void appendFieldValue(TemplateService.Template fieldPathTemplate, ValueSource valueSource) {
Map<String, Object> model = createTemplateModel();
appendFieldValue(fieldPathTemplate.execute(model), valueSource.copyAndResolve(model));
}
/** /**
* Sets the provided value to the provided path in the document. * Sets the provided value to the provided path in the document.
* Any non existing path element will be created. If the last element is a list, * Any non existing path element will be created.
* the value will replace the existing list. * If the last item in the path is a list, the value will replace the existing list as a whole.
* Use {@link #appendFieldValue(String, Object)} to append values to lists instead.
* @param path The path within the document in dot-notation * @param path The path within the document in dot-notation
* @param value The value to put in for the path key * @param value The value to put in for the path key
* @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. * @throws IllegalArgumentException if the path is null, empty, invalid or if the value cannot be set to the
* item identified by the provided path.
*/ */
public void setFieldValue(String path, Object value) { public void setFieldValue(String path, Object value) {
setFieldValue(path, value, false); setFieldValue(path, value, false);
@ -271,7 +295,8 @@ public final class IngestDocument {
* the value will replace the existing list. * the value will replace the existing list.
* @param fieldPathTemplate Resolves to the path with dot-notation within the document * @param fieldPathTemplate Resolves to the path with dot-notation within the document
* @param valueSource The value source that will produce the value to put in for the path key * @param valueSource The value source that will produce the value to put in for the path key
* @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. * @throws IllegalArgumentException if the path is null, empty, invalid or if the value cannot be set to the
* item identified by the provided path.
*/ */
public void setFieldValue(TemplateService.Template fieldPathTemplate, ValueSource valueSource) { public void setFieldValue(TemplateService.Template fieldPathTemplate, ValueSource valueSource) {
Map<String, Object> model = createTemplateModel(); Map<String, Object> model = createTemplateModel();
@ -324,14 +349,17 @@ public final class IngestDocument {
if (append) { if (append) {
if (map.containsKey(leafKey)) { if (map.containsKey(leafKey)) {
Object object = map.get(leafKey); Object object = map.get(leafKey);
if (object instanceof List) { List<Object> list = appendValues(path, object, value);
@SuppressWarnings("unchecked") if (list != object) {
List<Object> list = (List<Object>) object; map.put(leafKey, list);
list.add(value); }
} else {
List<Object> list = new ArrayList<>();
appendValues(list, value);
map.put(leafKey, list);
}
return; return;
} }
}
}
map.put(leafKey, value); map.put(leafKey, value);
} else if (context instanceof List) { } else if (context instanceof List) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -345,12 +373,45 @@ public final class IngestDocument {
if (index < 0 || index >= list.size()) { if (index < 0 || index >= list.size()) {
throw new IllegalArgumentException("[" + index + "] is out of bounds for array with length [" + list.size() + "] as part of path [" + path + "]"); throw new IllegalArgumentException("[" + index + "] is out of bounds for array with length [" + list.size() + "] as part of path [" + path + "]");
} }
if (append) {
Object object = list.get(index);
List<Object> newList = appendValues(path, object, value);
if (newList != object) {
list.set(index, newList);
}
return;
}
list.set(index, value); list.set(index, value);
} else { } else {
throw new IllegalArgumentException("cannot set [" + leafKey + "] with parent object of type [" + context.getClass().getName() + "] as part of path [" + path + "]"); throw new IllegalArgumentException("cannot set [" + leafKey + "] with parent object of type [" + context.getClass().getName() + "] as part of path [" + path + "]");
} }
} }
@SuppressWarnings("unchecked")
private static List<Object> appendValues(String path, Object maybeList, Object value) {
List<Object> list;
if (maybeList instanceof List) {
//maybeList is already a list, we append the provided values to it
list = (List<Object>) maybeList;
} else {
//maybeList is a scalar, we convert it to a list and append the provided values to it
list = new ArrayList<>();
list.add(maybeList);
}
appendValues(list, value);
return list;
}
private static void appendValues(List<Object> list, Object value) {
if (value instanceof List) {
@SuppressWarnings("unchecked")
List<?> valueList = (List<?>) value;
valueList.stream().forEach(list::add);
} else {
list.add(value);
}
}
private static <T> T cast(String path, Object object, Class<T> clazz) { private static <T> T cast(String path, Object object, Class<T> clazz) {
if (object == null) { if (object == null) {
return null; return null;

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.append;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.util.Map;
/**
* Processor that appends value or values to existing lists. If the field is not present a new list holding the
* provided values will be added. If the field is a scalar it will be converted to a single item list and the provided
* values will be added to the newly created list.
*/
public class AppendProcessor implements Processor {
public static final String TYPE = "append";
private final TemplateService.Template field;
private final ValueSource value;
AppendProcessor(TemplateService.Template field, ValueSource value) {
this.field = field;
this.value = value;
}
public TemplateService.Template getField() {
return field;
}
public ValueSource getValue() {
return value;
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue(field, value);
}
@Override
public String getType() {
return TYPE;
}
public static final class Factory implements Processor.Factory<AppendProcessor> {
private final TemplateService templateService;
public Factory(TemplateService templateService) {
this.templateService = templateService;
}
@Override
public AppendProcessor create(Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(config, "field");
Object value = ConfigurationUtils.readObject(config, "value");
return new AppendProcessor(templateService.compile(field), ValueSource.wrap(value, templateService));
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.ingest.processor.append.AppendProcessor;
import org.elasticsearch.ingest.processor.convert.ConvertProcessor; import org.elasticsearch.ingest.processor.convert.ConvertProcessor;
import org.elasticsearch.ingest.processor.date.DateProcessor; import org.elasticsearch.ingest.processor.date.DateProcessor;
import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor; import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
@ -52,6 +53,7 @@ public class IngestModule extends AbstractModule {
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());

View File

@ -22,18 +22,27 @@ package org.elasticsearch.ingest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class IngestDocumentTests extends ESTestCase { public class IngestDocumentTests extends ESTestCase {
@ -51,12 +60,18 @@ public class IngestDocumentTests extends ESTestCase {
innerObject.put("buzz", "hello world"); innerObject.put("buzz", "hello world");
innerObject.put("foo_null", null); innerObject.put("foo_null", null);
innerObject.put("1", "bar"); innerObject.put("1", "bar");
List<String> innerInnerList = new ArrayList<>();
innerInnerList.add("item1");
List<Object> innerList = new ArrayList<>();
innerList.add(innerInnerList);
innerObject.put("list", innerList);
document.put("fizz", innerObject); document.put("fizz", innerObject);
List<Map<String, Object>> list = new ArrayList<>(); List<Map<String, Object>> list = new ArrayList<>();
Map<String, Object> value = new HashMap<>(); Map<String, Object> value = new HashMap<>();
value.put("field", "value"); value.put("field", "value");
list.add(value); list.add(value);
list.add(null); list.add(null);
document.put("list", list); document.put("list", list);
ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
} }
@ -414,6 +429,254 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(list.get(2), equalTo("new_value")); assertThat(list.get(2), equalTo("new_value"));
} }
public void testListAppendFieldValues() {
ingestDocument.appendFieldValue("list", Arrays.asList("item1", "item2", "item3"));
Object object = ingestDocument.getSourceAndMetadata().get("list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(5));
assertThat(list.get(0), equalTo(Collections.singletonMap("field", "value")));
assertThat(list.get(1), nullValue());
assertThat(list.get(2), equalTo("item1"));
assertThat(list.get(3), equalTo("item2"));
assertThat(list.get(4), equalTo("item3"));
}
public void testAppendFieldValueToNonExistingList() {
ingestDocument.appendFieldValue("non_existing_list", "new_value");
Object object = ingestDocument.getSourceAndMetadata().get("non_existing_list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(1));
assertThat(list.get(0), equalTo("new_value"));
}
public void testAppendFieldValuesToNonExistingList() {
ingestDocument.appendFieldValue("non_existing_list", Arrays.asList("item1", "item2", "item3"));
Object object = ingestDocument.getSourceAndMetadata().get("non_existing_list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(3));
assertThat(list.get(0), equalTo("item1"));
assertThat(list.get(1), equalTo("item2"));
assertThat(list.get(2), equalTo("item3"));
}
public void testAppendFieldValueConvertStringToList() {
ingestDocument.appendFieldValue("fizz.buzz", "new_value");
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("buzz");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), equalTo("hello world"));
assertThat(list.get(1), equalTo("new_value"));
}
public void testAppendFieldValuesConvertStringToList() {
ingestDocument.appendFieldValue("fizz.buzz", Arrays.asList("item1", "item2", "item3"));
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("buzz");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(4));
assertThat(list.get(0), equalTo("hello world"));
assertThat(list.get(1), equalTo("item1"));
assertThat(list.get(2), equalTo("item2"));
assertThat(list.get(3), equalTo("item3"));
}
public void testAppendFieldValueConvertIntegerToList() {
ingestDocument.appendFieldValue("int", 456);
Object object = ingestDocument.getSourceAndMetadata().get("int");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), equalTo(123));
assertThat(list.get(1), equalTo(456));
}
public void testAppendFieldValuesConvertIntegerToList() {
ingestDocument.appendFieldValue("int", Arrays.asList(456, 789));
Object object = ingestDocument.getSourceAndMetadata().get("int");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(3));
assertThat(list.get(0), equalTo(123));
assertThat(list.get(1), equalTo(456));
assertThat(list.get(2), equalTo(789));
}
public void testAppendFieldValueConvertMapToList() {
ingestDocument.appendFieldValue("fizz", Collections.singletonMap("field", "value"));
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(List.class));
List<?> list = (List<?>) object;
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) list.get(0);
assertThat(map.size(), equalTo(4));
assertThat(list.get(1), equalTo(Collections.singletonMap("field", "value")));
}
public void testAppendFieldValueToNull() {
ingestDocument.appendFieldValue("fizz.foo_null", "new_value");
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("foo_null");
assertThat(object, instanceOf(List.class));
List<?> list = (List<?>) object;
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), nullValue());
assertThat(list.get(1), equalTo("new_value"));
}
public void testAppendFieldValueToListElement() {
ingestDocument.appendFieldValue("fizz.list.0", "item2");
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(1));
object = list.get(0);
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<String> innerList = (List<String>) object;
assertThat(innerList.size(), equalTo(2));
assertThat(innerList.get(0), equalTo("item1"));
assertThat(innerList.get(1), equalTo("item2"));
}
public void testAppendFieldValuesToListElement() {
ingestDocument.appendFieldValue("fizz.list.0", Arrays.asList("item2", "item3", "item4"));
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(1));
object = list.get(0);
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<String> innerList = (List<String>) object;
assertThat(innerList.size(), equalTo(4));
assertThat(innerList.get(0), equalTo("item1"));
assertThat(innerList.get(1), equalTo("item2"));
assertThat(innerList.get(2), equalTo("item3"));
assertThat(innerList.get(3), equalTo("item4"));
}
public void testAppendFieldValueConvertStringListElementToList() {
ingestDocument.appendFieldValue("fizz.list.0.0", "new_value");
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(1));
object = list.get(0);
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> innerList = (List<Object>) object;
object = innerList.get(0);
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<String> innerInnerList = (List<String>) object;
assertThat(innerInnerList.size(), equalTo(2));
assertThat(innerInnerList.get(0), equalTo("item1"));
assertThat(innerInnerList.get(1), equalTo("new_value"));
}
public void testAppendFieldValuesConvertStringListElementToList() {
ingestDocument.appendFieldValue("fizz.list.0.0", Arrays.asList("item2", "item3", "item4"));
Object object = ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(object, instanceOf(Map.class));
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) object;
object = map.get("list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(1));
object = list.get(0);
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> innerList = (List<Object>) object;
object = innerList.get(0);
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<String> innerInnerList = (List<String>) object;
assertThat(innerInnerList.size(), equalTo(4));
assertThat(innerInnerList.get(0), equalTo("item1"));
assertThat(innerInnerList.get(1), equalTo("item2"));
assertThat(innerInnerList.get(2), equalTo("item3"));
assertThat(innerInnerList.get(3), equalTo("item4"));
}
public void testAppendFieldValueListElementConvertMapToList() {
ingestDocument.appendFieldValue("list.0", Collections.singletonMap("item2", "value2"));
Object object = ingestDocument.getSourceAndMetadata().get("list");
assertThat(object, instanceOf(List.class));
List<?> list = (List<?>) object;
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), instanceOf(List.class));
assertThat(list.get(1), nullValue());
list = (List<?>) list.get(0);
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), equalTo(Collections.singletonMap("field", "value")));
assertThat(list.get(1), equalTo(Collections.singletonMap("item2", "value2")));
}
public void testAppendFieldValueToNullListElement() {
ingestDocument.appendFieldValue("list.1", "new_value");
Object object = ingestDocument.getSourceAndMetadata().get("list");
assertThat(object, instanceOf(List.class));
List<?> list = (List<?>) object;
assertThat(list.get(1), instanceOf(List.class));
list = (List<?>) list.get(1);
assertThat(list.size(), equalTo(2));
assertThat(list.get(0), nullValue());
assertThat(list.get(1), equalTo("new_value"));
}
public void testAppendFieldValueToListOfMaps() {
ingestDocument.appendFieldValue("list", Collections.singletonMap("item2", "value2"));
Object object = ingestDocument.getSourceAndMetadata().get("list");
assertThat(object, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) object;
assertThat(list.size(), equalTo(3));
assertThat(list.get(0), equalTo(Collections.singletonMap("field", "value")));
assertThat(list.get(1), nullValue());
assertThat(list.get(2), equalTo(Collections.singletonMap("item2", "value2")));
}
public void testListSetFieldValueIndexProvided() { public void testListSetFieldValueIndexProvided() {
ingestDocument.setFieldValue("list.1", "value"); ingestDocument.setFieldValue("list.1", "value");
Object object = ingestDocument.getSourceAndMetadata().get("list"); Object object = ingestDocument.getSourceAndMetadata().get("list");
@ -495,15 +758,20 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(ingestDocument.getSourceAndMetadata().get("fizz"), instanceOf(Map.class)); assertThat(ingestDocument.getSourceAndMetadata().get("fizz"), instanceOf(Map.class));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("fizz"); Map<String, Object> map = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("fizz");
assertThat(map.size(), equalTo(2)); assertThat(map.size(), equalTo(3));
assertThat(map.containsKey("buzz"), equalTo(false)); assertThat(map.containsKey("buzz"), equalTo(false));
ingestDocument.removeField("fizz.foo_null"); ingestDocument.removeField("fizz.foo_null");
assertThat(map.size(), equalTo(1)); assertThat(map.size(), equalTo(2));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true)); assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true));
ingestDocument.removeField("fizz.1"); ingestDocument.removeField("fizz.1");
assertThat(map.size(), equalTo(1));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true));
ingestDocument.removeField("fizz.list");
assertThat(map.size(), equalTo(0)); assertThat(map.size(), equalTo(0));
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(8));
assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true)); assertThat(ingestDocument.getSourceAndMetadata().containsKey("fizz"), equalTo(true));
@ -684,4 +952,23 @@ public class IngestDocumentTests extends ESTestCase {
} }
} }
public void testIngestMetadataTimestamp() throws Exception {
long before = System.currentTimeMillis();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
long after = System.currentTimeMillis();
String timestampString = ingestDocument.getIngestMetadata().get("timestamp");
assertThat(timestampString, notNullValue());
assertThat(timestampString, endsWith("+0000"));
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT);
Date timestamp = df.parse(timestampString);
assertThat(timestamp.getTime(), greaterThanOrEqualTo(before));
assertThat(timestamp.getTime(), lessThanOrEqualTo(after));
}
public void testCopyConstructor() {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
IngestDocument copy = new IngestDocument(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata(), not(sameInstance(copy.getSourceAndMetadata())));
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(copy.getSourceAndMetadata()));
}
} }

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.append;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class AppendProcessorFactoryTests extends ESTestCase {
private AppendProcessor.Factory factory;
@Before
public void init() {
factory = new AppendProcessor.Factory(TestTemplateService.instance());
}
public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
Object value;
if (randomBoolean()) {
value = "value1";
} else {
value = Arrays.asList("value1", "value2", "value3");
}
config.put("value", value);
AppendProcessor setProcessor = factory.create(config);
assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo(value));
}
public void testCreateNoFieldPresent() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("value", "value1");
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
}
}
public void testCreateNoValuePresent() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
}
}
public void testCreateNullValue() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("value", null);
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
}
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.append;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
public class AppendProcessorTests extends ESTestCase {
public void testAppendValuesToExistingList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Scalar scalar = randomFrom(Scalar.values());
List<Object> list = new ArrayList<>();
int size = randomIntBetween(0, 10);
for (int i = 0; i < size; i++) {
list.add(scalar.randomValue());
}
List<Object> checkList = new ArrayList<>(list);
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, list);
List<Object> values = new ArrayList<>();
Processor appendProcessor;
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
}
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
assertThat(fieldValue, sameInstance(list));
assertThat(list.size(), equalTo(size + values.size()));
for (int i = 0; i < size; i++) {
assertThat(list.get(i), equalTo(checkList.get(i)));
}
for (int i = size; i < size + values.size(); i++) {
assertThat(list.get(i), equalTo(values.get(i - size)));
}
}
public void testAppendValuesToNonExistingList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String field = RandomDocumentPicks.randomFieldName(random());
Scalar scalar = randomFrom(Scalar.values());
List<Object> values = new ArrayList<>();
Processor appendProcessor;
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
}
appendProcessor.execute(ingestDocument);
List list = ingestDocument.getFieldValue(field, List.class);
assertThat(list, not(sameInstance(values)));
assertThat(list, equalTo(values));
}
public void testConvertScalarToList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Scalar scalar = randomFrom(Scalar.values());
Object initialValue = scalar.randomValue();
String field = RandomDocumentPicks.addRandomField(random(), ingestDocument, initialValue);
List<Object> values = new ArrayList<>();
Processor appendProcessor;
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values);
}
appendProcessor.execute(ingestDocument);
List fieldValue = ingestDocument.getFieldValue(field, List.class);
assertThat(fieldValue.size(), equalTo(values.size() + 1));
assertThat(fieldValue.get(0), equalTo(initialValue));
for (int i = 1; i < values.size() + 1; i++) {
assertThat(fieldValue.get(i), equalTo(values.get(i - 1)));
}
}
public void testAppendMetadata() throws Exception {
//here any metadata field value becomes a list, which won't make sense in most of the cases,
// but support for append is streamlined like for set so we test it
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
List<String> values = new ArrayList<>();
Processor appendProcessor;
if (randomBoolean()) {
String value = randomAsciiOfLengthBetween(1, 10);
values.add(value);
appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), value);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(randomAsciiOfLengthBetween(1, 10));
}
appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values);
}
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Object initialValue = ingestDocument.getSourceAndMetadata().get(randomMetaData.getFieldName());
appendProcessor.execute(ingestDocument);
List list = ingestDocument.getFieldValue(randomMetaData.getFieldName(), List.class);
if (initialValue == null) {
assertThat(list, equalTo(values));
} else {
assertThat(list.size(), equalTo(values.size() + 1));
assertThat(list.get(0), equalTo(initialValue));
for (int i = 1; i < list.size(); i++) {
assertThat(list.get(i), equalTo(values.get(i - 1)));
}
}
}
private static Processor createAppendProcessor(String fieldName, Object fieldValue) {
TemplateService templateService = TestTemplateService.instance();
return new AppendProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
}
private enum Scalar {
INTEGER {
@Override
Object randomValue() {
return randomInt();
}
}, DOUBLE {
@Override
Object randomValue() {
return randomDouble();
}
}, FLOAT {
@Override
Object randomValue() {
return randomFloat();
}
}, BOOLEAN {
@Override
Object randomValue() {
return randomBoolean();
}
}, STRING {
@Override
Object randomValue() {
return randomAsciiOfLengthBetween(1, 10);
}
}, MAP {
@Override
Object randomValue() {
int numItems = randomIntBetween(1, 10);
Map<String, Object> map = new HashMap<>(numItems);
for (int i = 0; i < numItems; i++) {
map.put(randomAsciiOfLengthBetween(1, 10), randomFrom(Scalar.values()).randomValue());
}
return map;
}
}, NULL {
@Override
Object randomValue() {
return null;
}
};
abstract Object randomValue();
}
}

View File

@ -76,9 +76,8 @@ public class SetProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value"));
} }
private Processor createSetProcessor(String fieldName, Object fieldValue) { private static Processor createSetProcessor(String fieldName, Object fieldValue) {
TemplateService templateService = TestTemplateService.instance(); TemplateService templateService = TestTemplateService.instance();
return new SetProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService)); return new SetProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
} }
} }

View File

@ -56,14 +56,6 @@
} }
- match: { _id: "my_pipeline" } - match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do: - do:
ingest.index: ingest.index:
index: test index: test
@ -101,14 +93,6 @@
} }
- match: { _id: "my_pipeline" } - match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do: - do:
ingest.index: ingest.index:
index: test index: test

View File

@ -13,6 +13,12 @@
"value": "new_value" "value": "new_value"
} }
}, },
{
"append" : {
"field" : "new_field",
"value": ["item2", "item3", "item4"]
}
},
{ {
"rename" : { "rename" : {
"field" : "field_to_rename", "field" : "field_to_rename",
@ -93,6 +99,7 @@
id: 1 id: 1
- is_false: _source.field_to_rename - is_false: _source.field_to_rename
- is_false: _source.field_to_remove - is_false: _source.field_to_remove
- match: { _source.new_field: ["new_value", "item2", "item3", "item4"] }
- match: { _source.renamed_field: "value" } - match: { _source.renamed_field: "value" }
- match: { _source.field_to_lowercase: "lowercase" } - match: { _source.field_to_lowercase: "lowercase" }
- match: { _source.field_to_uppercase: "UPPERCASE" } - match: { _source.field_to_uppercase: "UPPERCASE" }
@ -125,14 +132,6 @@
} }
- match: { _id: "my_pipeline" } - match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do: - do:
ingest.index: ingest.index:
index: test index: test

View File

@ -16,19 +16,17 @@
"field" : "index_type_id", "field" : "index_type_id",
"value": "{{_index}}/{{_type}}/{{_id}}" "value": "{{_index}}/{{_type}}/{{_id}}"
} }
},
{
"append" : {
"field" : "metadata",
"value": ["{{_index}}", "{{_type}}", "{{_id}}"]
}
} }
] ]
} }
- match: { _id: "my_pipeline_1" } - match: { _id: "my_pipeline_1" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do: - do:
ingest.index: ingest.index:
index: test index: test
@ -42,8 +40,9 @@
index: test index: test
type: test type: test
id: 1 id: 1
- length: { _source: 1 } - length: { _source: 2 }
- match: { _source.index_type_id: "test/test/1" } - match: { _source.index_type_id: "test/test/1" }
- match: { _source.metadata: ["test", "test", "1"] }
--- ---
"Test templateing": "Test templateing":
@ -63,7 +62,14 @@
"field" : "field4", "field" : "field4",
"value": "{{field1}}/{{field2}}/{{field3}}" "value": "{{field1}}/{{field2}}/{{field3}}"
} }
},
{
"append" : {
"field" : "metadata",
"value": ["{{field1}}", "{{field2}}", "{{field3}}"]
} }
}
] ]
} }
- match: { _id: "my_pipeline_1" } - match: { _id: "my_pipeline_1" }
@ -101,14 +107,6 @@
} }
- match: { _id: "my_pipeline_3" } - match: { _id: "my_pipeline_3" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do: - do:
ingest.index: ingest.index:
index: test index: test
@ -116,6 +114,7 @@
id: 1 id: 1
pipeline_id: "my_pipeline_1" pipeline_id: "my_pipeline_1"
body: { body: {
metadata: "0",
field1: "1", field1: "1",
field2: "2", field2: "2",
field3: "3" field3: "3"
@ -126,11 +125,12 @@
index: test index: test
type: test type: test
id: 1 id: 1
- length: { _source: 4 } - length: { _source: 5 }
- match: { _source.field1: "1" } - match: { _source.field1: "1" }
- match: { _source.field2: "2" } - match: { _source.field2: "2" }
- match: { _source.field3: "3" } - match: { _source.field3: "3" }
- match: { _source.field4: "1/2/3" } - match: { _source.field4: "1/2/3" }
- match: { _source.metadata: ["0","1","2","3"] }
- do: - do:
ingest.index: ingest.index: