diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index b3cd98051cf..e002dd52bc8 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -1,7 +1,158 @@ [[ingest]] == Ingest Plugin -TODO +=== Processors + +==== Mutate Processor + +The Mutate Processor applies functions on the structure of a document. The processor comes with a few +functions to help achieve this. + +The following are the supported configuration actions and how to use them. + +===== Convert +Convert a field's value to a different type, like turning a string to an integer. +If the field value is an array, all members will be converted. + +The supported types include: `integer`, `float`, `string`, and `boolean`. + +`boolean` will set a field to "true" if its string value does not match any of the following: "false", "0", "off", "no". + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "convert": { + "field1": "integer", + "field2": "float" + } + } +} +-------------------------------------------------- + +===== Gsub +Convert a string field by applying a regular expression and a replacement. +If the field is not a string, no action will be taken. + +This configuration takes an array consisting of two elements per field/substition. One for the +pattern to be replaced, and the second for the pattern to replace with. + + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "gsub": { + "field1": ["\.", "-"] + } + } +} +-------------------------------------------------- + +===== Join +Join an array with a separator character. Does nothing on non-array fields. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "join": { + "joined_array_field": "other_array_field" + } + } +} +-------------------------------------------------- + +===== Lowercase +Convert a string to its lowercase equivalent. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "lowercase": ["foo", "bar"] + } +} +-------------------------------------------------- + +===== Remove +Remove one or more fields. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "remove": ["foo", "bar"] + } +} +-------------------------------------------------- + +===== Rename +Renames one or more fields. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "rename": { + "foo": "update_foo", + "bar": "new_bar" + } + } +} +-------------------------------------------------- + +===== Split +Split a field to an array using a separator character. Only works on string fields. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "split": { + "message": "," + } + } +} +-------------------------------------------------- + +===== Strip +Strip whitespace from field. NOTE: this only works on leading and trailing whitespace. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "strip": ["foo", "bar"] + } +} +-------------------------------------------------- + +===== Update +Update an existing field with a new value. If the field does not exist, then no action will be taken. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "update": { + "field": 582.1 + } + } +} +-------------------------------------------------- + +===== Uppercase +Convert a string to its uppercase equivalent. + +[source,js] +-------------------------------------------------- +{ + "mutate": { + "uppercase": ["foo", "bar"] + } +} +-------------------------------------------------- === Processors diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java index 66b81e8fe83..879acfcbf8e 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java @@ -54,6 +54,36 @@ public final class Data { return (T) XContentMapValues.extractValue(path, document); } + public boolean containsProperty(String path) { + boolean containsProperty = false; + String[] pathElements = Strings.splitStringToArray(path, '.'); + if (pathElements.length == 0) { + return false; + } + + Map inner = document; + + for (int i = 0; i < pathElements.length; i++) { + if (inner == null) { + containsProperty = false; + break; + } + if (i == pathElements.length - 1) { + containsProperty = inner.containsKey(pathElements[i]); + break; + } + + Object obj = inner.get(pathElements[i]); + if (obj instanceof Map) { + inner = (Map) obj; + } else { + inner = null; + } + } + + return containsProperty; + } + /** * add `value` to path in document. If path does not exist, * nested hashmaps will be put in as parent key values until diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java index 182ff61c421..94fb6d14587 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java @@ -92,4 +92,82 @@ public final class ConfigurationUtils { throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); } } + + /** + * Returns and removes the specified property of type list from the specified configuration map. + * + * If the property value isn't of type list an {@link IllegalArgumentException} is thrown. + */ + public static List readOptionalStringList(Map configuration, String propertyName) { + Object value = configuration.remove(propertyName); + if (value == null) { + return null; + } + if (value instanceof List) { + @SuppressWarnings("unchecked") + List stringList = (List) value; + return stringList; + } else { + throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]"); + } + } + + /** + * Returns and removes the specified property of type map from the specified configuration map. + * + * If the property value isn't of type map an {@link IllegalArgumentException} is thrown. + */ + public static Map> readOptionalStringListMap(Map configuration, String propertyName) { + Object value = configuration.remove(propertyName); + if (value == null) { + return null; + } + if (value instanceof Map) { + @SuppressWarnings("unchecked") + Map> stringList = (Map>) value; + return stringList; + } else { + throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); + } + } + + /** + * Returns and removes the specified property of type map from the specified configuration map. + * + * If the property value isn't of type map an {@link IllegalArgumentException} is thrown. + */ + public static Map readOptionalStringMap(Map configuration, String propertyName) { + Object value = configuration.remove(propertyName); + + if (value == null) { + return null; + } + + if (value instanceof Map) { + Map map = (Map) value; + return map; + } else { + throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); + } + } + + /** + * Returns and removes the specified property of type map from the specified configuration map. + * + * If the property value isn't of type map an {@link IllegalArgumentException} is thrown. + */ + public static Map readOptionalObjectMap(Map configuration, String propertyName) { + Object value = configuration.remove(propertyName); + + if (value == null) { + return null; + } + + if (value instanceof Map) { + Map map = (Map) value; + return map; + } else { + throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]"); + } + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java new file mode 100644 index 00000000000..ab85a9dbd04 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/mutate/MutateProcessor.java @@ -0,0 +1,326 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.mutate; + +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.processor.ConfigurationUtils; +import org.elasticsearch.ingest.processor.Processor; + +import java.io.IOException; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public final class MutateProcessor implements Processor { + + public static final String TYPE = "mutate"; + + private final Map update; + private final Map rename; + private final Map convert; + private final Map split; + private final Map> gsub; + private final Map join; + private final List remove; + private final List trim; + private final List uppercase; + private final List lowercase; + + public MutateProcessor(Map update, + Map rename, + Map convert, + Map split, + Map> gsub, + Map join, + List remove, + List trim, + List uppercase, + List lowercase) { + this.update = update; + this.rename = rename; + this.convert = convert; + this.split = split; + this.gsub = gsub; + this.join = join; + this.remove = remove; + this.trim = trim; + this.uppercase = uppercase; + this.lowercase = lowercase; + } + + public Map getUpdate() { + return update; + } + + public Map getRename() { + return rename; + } + + public Map getConvert() { + return convert; + } + + public Map getSplit() { + return split; + } + + public Map> getGsub() { + return gsub; + } + + public Map getJoin() { + return join; + } + + public List getRemove() { + return remove; + } + + public List getTrim() { + return trim; + } + + public List getUppercase() { + return uppercase; + } + + public List getLowercase() { + return lowercase; + } + + @Override + public void execute(Data data) { + if (update != null) { + doUpdate(data); + } + if (rename != null) { + doRename(data); + } + if (convert != null) { + doConvert(data); + } + if (split != null) { + doSplit(data); + } + if (gsub != null) { + doGsub(data); + } + if (join != null) { + doJoin(data); + } + if (remove != null) { + doRemove(data); + } + if (trim != null) { + doTrim(data); + } + if (uppercase != null) { + doUppercase(data); + } + if (lowercase != null) { + doLowercase(data); + } + } + + private void doUpdate(Data data) { + for(Map.Entry entry : update.entrySet()) { + data.addField(entry.getKey(), entry.getValue()); + } + } + + private void doRename(Data data) { + for(Map.Entry entry : rename.entrySet()) { + if (data.containsProperty(entry.getKey())) { + Object oldVal = data.getProperty(entry.getKey()); + data.getDocument().remove(entry.getKey()); + data.addField(entry.getValue(), oldVal); + } + } + } + + private Object parseValueAsType(Object oldVal, String toType) { + switch (toType) { + case "integer": + oldVal = Integer.parseInt(oldVal.toString()); + break; + case "float": + oldVal = Float.parseFloat(oldVal.toString()); + break; + case "string": + oldVal = oldVal.toString(); + break; + case "boolean": + // TODO(talevy): Booleans#parseBoolean depends on Elasticsearch, should be moved into dedicated library. + oldVal = Booleans.parseBoolean(oldVal.toString(), false); + } + + return oldVal; + } + + @SuppressWarnings("unchecked") + private void doConvert(Data data) { + for(Map.Entry entry : convert.entrySet()) { + String toType = entry.getValue(); + + Object oldVal = data.getProperty(entry.getKey()); + Object newVal; + + if (oldVal instanceof List) { + newVal = new ArrayList<>(); + for (Object e : ((List) oldVal)) { + ((List) newVal).add(parseValueAsType(e, toType)); + } + } else { + if (oldVal == null) { + throw new IllegalArgumentException("Field \"" + entry.getKey() + "\" is null, cannot be converted to a/an " + toType); + } + newVal = parseValueAsType(oldVal, toType); + } + + data.addField(entry.getKey(), newVal); + } + } + + private void doSplit(Data data) { + for(Map.Entry entry : split.entrySet()) { + Object oldVal = data.getProperty(entry.getKey()); + if (oldVal instanceof String) { + data.addField(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue()))); + } else { + throw new IllegalArgumentException("Cannot split a field that is not a String type"); + } + } + } + + private void doGsub(Data data) { + for (Map.Entry> entry : gsub.entrySet()) { + String fieldName = entry.getKey(); + Tuple matchAndReplace = entry.getValue(); + String oldVal = data.getProperty(fieldName); + if (oldVal == null) { + throw new IllegalArgumentException("Field \"" + fieldName + "\" is null, cannot match pattern."); + } + Matcher matcher = matchAndReplace.v1().matcher(oldVal); + String newVal = matcher.replaceAll(matchAndReplace.v2()); + data.addField(entry.getKey(), newVal); + } + } + + @SuppressWarnings("unchecked") + private void doJoin(Data data) { + for(Map.Entry entry : join.entrySet()) { + Object oldVal = data.getProperty(entry.getKey()); + if (oldVal instanceof List) { + String joined = (String) ((List) oldVal) + .stream() + .map(Object::toString) + .collect(Collectors.joining(entry.getValue())); + + data.addField(entry.getKey(), joined); + } else { + throw new IllegalArgumentException("Cannot join field:" + entry.getKey() + " with type: " + oldVal.getClass()); + } + } + } + + private void doRemove(Data data) { + for(String field : remove) { + data.getDocument().remove(field); + } + } + + private void doTrim(Data data) { + for(String field : trim) { + Object val = data.getProperty(field); + if (val instanceof String) { + data.addField(field, ((String) val).trim()); + } else { + throw new IllegalArgumentException("Cannot trim field:" + field + " with type: " + val.getClass()); + } + } + } + + private void doUppercase(Data data) { + for(String field : uppercase) { + Object val = data.getProperty(field); + if (val instanceof String) { + data.addField(field, ((String) val).toUpperCase(Locale.ROOT)); + } else { + throw new IllegalArgumentException("Cannot uppercase field:" + field + " with type: " + val.getClass()); + } + } + } + + private void doLowercase(Data data) { + for(String field : lowercase) { + Object val = data.getProperty(field); + if (val instanceof String) { + data.addField(field, ((String) val).toLowerCase(Locale.ROOT)); + } else { + throw new IllegalArgumentException("Cannot lowercase field:" + field + " with type: " + val.getClass()); + } + } + } + + public static final class Factory implements Processor.Factory { + @Override + public MutateProcessor create(Map config) throws IOException { + Map update = ConfigurationUtils.readOptionalObjectMap(config, "update"); + Map rename = ConfigurationUtils.readOptionalStringMap(config, "rename"); + Map convert = ConfigurationUtils.readOptionalStringMap(config, "convert"); + Map split = ConfigurationUtils.readOptionalStringMap(config, "split"); + Map> gsubConfig = ConfigurationUtils.readOptionalStringListMap(config, "gsub"); + Map join = ConfigurationUtils.readOptionalStringMap(config, "join"); + List remove = ConfigurationUtils.readOptionalStringList(config, "remove"); + List trim = ConfigurationUtils.readOptionalStringList(config, "trim"); + List uppercase = ConfigurationUtils.readOptionalStringList(config, "uppercase"); + List lowercase = ConfigurationUtils.readOptionalStringList(config, "lowercase"); + + // pre-compile regex patterns + Map> gsub = null; + if (gsubConfig != null) { + gsub = new HashMap<>(); + for (Map.Entry> entry : gsubConfig.entrySet()) { + List searchAndReplace = entry.getValue(); + if (searchAndReplace.size() != 2) { + throw new IllegalArgumentException("Invalid search and replace values (" + Arrays.toString(searchAndReplace.toArray()) + ") for field: " + entry.getKey()); + } + Pattern searchPattern = Pattern.compile(searchAndReplace.get(0)); + gsub.put(entry.getKey(), new Tuple<>(searchPattern, searchAndReplace.get(1))); + } + } + + return new MutateProcessor( + (update == null) ? null : Collections.unmodifiableMap(update), + (rename == null) ? null : Collections.unmodifiableMap(rename), + (convert == null) ? null : Collections.unmodifiableMap(convert), + (split == null) ? null : Collections.unmodifiableMap(split), + (gsub == null) ? null : Collections.unmodifiableMap(gsub), + (join == null) ? null : Collections.unmodifiableMap(join), + (remove == null) ? null : Collections.unmodifiableList(remove), + (trim == null) ? null : Collections.unmodifiableList(trim), + (uppercase == null) ? null : Collections.unmodifiableList(uppercase), + (lowercase == null) ? null : Collections.unmodifiableList(lowercase)); + } + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index d7a52d0315c..675d38f5df6 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -25,6 +25,7 @@ import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.date.DateProcessor; import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor; import org.elasticsearch.ingest.processor.grok.GrokProcessor; +import org.elasticsearch.ingest.processor.mutate.MutateProcessor; import org.elasticsearch.ingest.processor.simple.SimpleProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; @@ -46,6 +47,7 @@ public class IngestModule extends AbstractModule { addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory()); addProcessor(DateProcessor.TYPE, new DateProcessor.Factory()); + addProcessor(MutateProcessor.TYPE, new MutateProcessor.Factory()); MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class); for (Map.Entry entry : processors.entrySet()) { diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java index d048620cea3..e6cae34035e 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/DataTests.java @@ -49,6 +49,22 @@ public class DataTests extends ESTestCase { assertThat(data.getProperty("fizz.buzz"), equalTo("hello world")); } + public void testContainsProperty() { + assertTrue(data.containsProperty("fizz")); + } + + public void testContainsProperty_Nested() { + assertTrue(data.containsProperty("fizz.buzz")); + } + + public void testContainsProperty_NotFound() { + assertFalse(data.containsProperty("doesnotexist")); + } + + public void testContainsProperty_NestedNotFound() { + assertFalse(data.containsProperty("fizz.doesnotexist")); + } + public void testSimpleAddField() { data.addField("new_field", "foo"); assertThat(data.getDocument().get("new_field"), equalTo("foo")); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java new file mode 100644 index 00000000000..274a952f935 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.*; + +import static org.hamcrest.Matchers.*; + + +public class ConfigurationUtilsTests extends ESTestCase { + private Map config; + + @Before + public void setConfig() { + config = new HashMap<>(); + config.put("foo", "bar"); + config.put("arr", Arrays.asList("1", "2", "3")); + List list = new ArrayList<>(); + list.add(2); + config.put("int", list); + config.put("ip", "127.0.0.1"); + Map fizz = new HashMap<>(); + fizz.put("buzz", "hello world"); + config.put("fizz", fizz); + } + + public void testReadStringProperty() { + String val = ConfigurationUtils.readStringProperty(config, "foo"); + assertThat(val, equalTo("bar")); + } + + public void testReadStringProperty_InvalidType() { + try { + ConfigurationUtils.readStringProperty(config, "arr"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("property [arr] isn't a string, but of type [java.util.Arrays$ArrayList]")); + } + } + + // TODO(talevy): Issue with generics. This test should fail, "int" is of type List + public void testOptional_InvalidType() { + List val = ConfigurationUtils.readStringList(config, "int"); + assertThat(val, equalTo(Arrays.asList(2))); + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorFactoryTests.java new file mode 100644 index 00000000000..ccc3e223adf --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorFactoryTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.mutate; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class MutateProcessorFactoryTests extends ESTestCase { + + public void testCreate() throws Exception { + MutateProcessor.Factory factory = new MutateProcessor.Factory(); + Map config = new HashMap<>(); + Map update = new HashMap<>(); + update.put("foo", 123); + config.put("update", update); + MutateProcessor processor = factory.create(config); + assertThat(processor.getUpdate(), equalTo(update)); + } + + public void testCreateGsubPattern() throws Exception { + MutateProcessor.Factory factory = new MutateProcessor.Factory(); + Map config = new HashMap<>(); + Map> gsub = new HashMap<>(); + gsub.put("foo", Arrays.asList("\\s.*e\\s", "")); + config.put("gsub", gsub); + + Map> compiledGsub = new HashMap<>(); + Pattern searchPattern = Pattern.compile("\\s.*e\\s"); + compiledGsub.put("foo", new Tuple<>(searchPattern, "")); + + MutateProcessor processor = factory.create(config); + for (Map.Entry> entry : compiledGsub.entrySet()) { + Tuple actualSearchAndReplace = processor.getGsub().get(entry.getKey()); + assertThat(actualSearchAndReplace, notNullValue()); + assertThat(actualSearchAndReplace.v1().pattern(), equalTo(entry.getValue().v1().pattern())); + assertThat(actualSearchAndReplace.v2(), equalTo(entry.getValue().v2())); + } + } + + public void testCreateGsubPattern_InvalidFormat() throws Exception { + MutateProcessor.Factory factory = new MutateProcessor.Factory(); + Map config = new HashMap<>(); + Map> gsub = new HashMap<>(); + gsub.put("foo", Arrays.asList("only_one")); + config.put("gsub", gsub); + + try { + factory.create(config); + fail(); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("Invalid search and replace values ([only_one]) for field: foo")); + } + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java new file mode 100644 index 00000000000..55066c4e2e7 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/mutate/MutateProcessorTests.java @@ -0,0 +1,193 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor.mutate; + +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + + +public class MutateProcessorTests extends ESTestCase { + private static final MutateProcessor.Factory FACTORY = new MutateProcessor.Factory(); + private Data data; + private Map config; + + @Before + public void setData() { + Map document = new HashMap<>(); + document.put("foo", "bar"); + document.put("alpha", "aBcD"); + document.put("num", "64"); + document.put("to_strip", " clean "); + document.put("arr", Arrays.asList("1", "2", "3")); + document.put("ip", "127.0.0.1"); + Map fizz = new HashMap<>(); + fizz.put("buzz", "hello world"); + document.put("fizz", fizz); + + data = new Data("index", "type", "id", document); + config = new HashMap<>(); + } + + public void testUpdate() throws IOException { + Map update = new HashMap<>(); + update.put("foo", 123); + config.put("update", update); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("foo"), equalTo(123)); + } + + public void testRename() throws IOException { + Map rename = new HashMap<>(); + rename.put("foo", "bar"); + config.put("rename", rename); + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("bar"), equalTo("bar")); + assertThat(data.containsProperty("foo"), is(false)); + } + + public void testConvert() throws IOException { + Map convert = new HashMap<>(); + convert.put("num", "integer"); + config.put("convert", convert); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("num"), equalTo(64)); + } + + public void testConvert_NullField() throws IOException { + Map convert = new HashMap<>(); + convert.put("null", "integer"); + config.put("convert", convert); + + Processor processor = FACTORY.create(config); + try { + processor.execute(data); + fail(); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("Field \"null\" is null, cannot be converted to a/an integer")); + } + } + + public void testConvert_List() throws IOException { + Map convert = new HashMap<>(); + convert.put("arr", "integer"); + config.put("convert", convert); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("arr"), equalTo(Arrays.asList(1, 2, 3))); + } + + public void testSplit() throws IOException { + HashMap split = new HashMap<>(); + split.put("ip", "\\."); + config.put("split", split); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("ip"), equalTo(Arrays.asList("127", "0", "0", "1"))); + } + + public void testGsub() throws IOException { + HashMap> gsub = new HashMap<>(); + gsub.put("ip", Arrays.asList("\\.", "-")); + config.put("gsub", gsub); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("ip"), equalTo("127-0-0-1")); + } + + public void testGsub_NullValue() throws IOException { + HashMap> gsub = new HashMap<>(); + gsub.put("null_field", Arrays.asList("\\.", "-")); + config.put("gsub", gsub); + + Processor processor = FACTORY.create(config); + try { + processor.execute(data); + fail(); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("Field \"null_field\" is null, cannot match pattern.")); + } + } + + public void testJoin() throws IOException { + HashMap join = new HashMap<>(); + join.put("arr", "-"); + config.put("join", join); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("arr"), equalTo("1-2-3")); + } + + public void testRemove() throws IOException { + List remove = Arrays.asList("foo", "ip"); + config.put("remove", remove); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("foo"), nullValue()); + assertThat(data.getProperty("ip"), nullValue()); + } + + public void testTrim() throws IOException { + List trim = Arrays.asList("to_strip", "foo"); + config.put("trim", trim); + + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("foo"), equalTo("bar")); + assertThat(data.getProperty("to_strip"), equalTo("clean")); + } + + public void testUppercase() throws IOException { + List uppercase = Arrays.asList("foo"); + config.put("uppercase", uppercase); + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("foo"), equalTo("BAR")); + } + + public void testLowercase() throws IOException { + List lowercase = Arrays.asList("alpha"); + config.put("lowercase", lowercase); + Processor processor = FACTORY.create(config); + processor.execute(data); + assertThat(data.getProperty("alpha"), equalTo("abcd")); + } +} diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate_processor.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate_processor.yaml new file mode 100644 index 00000000000..4ab6ba652ca --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate_processor.yaml @@ -0,0 +1,50 @@ +--- +"Test mutate processor": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "mutate" : { + "rename" : { + "field1": "foo" + }, + "update" : { + "field2": "bar" + } + } + } + ] + } + - match: { _id: "my_pipeline" } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline" + body: {field1: "val"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.foo: "val" } + - match: { _source.field2: "bar" }