introduce mutate processor.
fix forbiddenapis update clean up and add rest test update mutate factory to use configuration utilities compile gsub pattern cleanup, update parseBooleans, null tests
This commit is contained in:
parent
f657a7dbf3
commit
8fc5a3d032
|
@ -1,7 +1,158 @@
|
|||
[[ingest]]
|
||||
== Ingest Plugin
|
||||
|
||||
TODO
|
||||
=== Processors
|
||||
|
||||
==== Mutate Processor
|
||||
|
||||
The Mutate Processor applies functions on the structure of a document. The processor comes with a few
|
||||
functions to help achieve this.
|
||||
|
||||
The following are the supported configuration actions and how to use them.
|
||||
|
||||
===== Convert
|
||||
Convert a field's value to a different type, like turning a string to an integer.
|
||||
If the field value is an array, all members will be converted.
|
||||
|
||||
The supported types include: `integer`, `float`, `string`, and `boolean`.
|
||||
|
||||
`boolean` will set a field to "true" if its string value does not match any of the following: "false", "0", "off", "no".
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"convert": {
|
||||
"field1": "integer",
|
||||
"field2": "float"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Gsub
|
||||
Convert a string field by applying a regular expression and a replacement.
|
||||
If the field is not a string, no action will be taken.
|
||||
|
||||
This configuration takes an array consisting of two elements per field/substition. One for the
|
||||
pattern to be replaced, and the second for the pattern to replace with.
|
||||
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"gsub": {
|
||||
"field1": ["\.", "-"]
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Join
|
||||
Join an array with a separator character. Does nothing on non-array fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"join": {
|
||||
"joined_array_field": "other_array_field"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Lowercase
|
||||
Convert a string to its lowercase equivalent.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"lowercase": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Remove
|
||||
Remove one or more fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"remove": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Rename
|
||||
Renames one or more fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"rename": {
|
||||
"foo": "update_foo",
|
||||
"bar": "new_bar"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Split
|
||||
Split a field to an array using a separator character. Only works on string fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"split": {
|
||||
"message": ","
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Strip
|
||||
Strip whitespace from field. NOTE: this only works on leading and trailing whitespace.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"strip": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Update
|
||||
Update an existing field with a new value. If the field does not exist, then no action will be taken.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"update": {
|
||||
"field": 582.1
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Uppercase
|
||||
Convert a string to its uppercase equivalent.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"uppercase": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
=== Processors
|
||||
|
||||
|
|
|
@ -54,6 +54,36 @@ public final class Data {
|
|||
return (T) XContentMapValues.extractValue(path, document);
|
||||
}
|
||||
|
||||
public boolean containsProperty(String path) {
|
||||
boolean containsProperty = false;
|
||||
String[] pathElements = Strings.splitStringToArray(path, '.');
|
||||
if (pathElements.length == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Map<String, Object> inner = document;
|
||||
|
||||
for (int i = 0; i < pathElements.length; i++) {
|
||||
if (inner == null) {
|
||||
containsProperty = false;
|
||||
break;
|
||||
}
|
||||
if (i == pathElements.length - 1) {
|
||||
containsProperty = inner.containsKey(pathElements[i]);
|
||||
break;
|
||||
}
|
||||
|
||||
Object obj = inner.get(pathElements[i]);
|
||||
if (obj instanceof Map) {
|
||||
inner = (Map<String, Object>) obj;
|
||||
} else {
|
||||
inner = null;
|
||||
}
|
||||
}
|
||||
|
||||
return containsProperty;
|
||||
}
|
||||
|
||||
/**
|
||||
* add `value` to path in document. If path does not exist,
|
||||
* nested hashmaps will be put in as parent key values until
|
||||
|
|
|
@ -92,4 +92,82 @@ public final class ConfigurationUtils {
|
|||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type list from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static List<String> readOptionalStringList(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> stringList = (List<String>) value;
|
||||
return stringList;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type map from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static Map<String, List<String>> readOptionalStringListMap(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, List<String>> stringList = (Map<String, List<String>>) value;
|
||||
return stringList;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type map from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static Map<String, String> readOptionalStringMap(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (value instanceof Map) {
|
||||
Map<String, String> map = (Map<String, String>) value;
|
||||
return map;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type map from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static Map<String, Object> readOptionalObjectMap(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (value instanceof Map) {
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
return map;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,326 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class MutateProcessor implements Processor {
|
||||
|
||||
public static final String TYPE = "mutate";
|
||||
|
||||
private final Map<String, Object> update;
|
||||
private final Map<String, String> rename;
|
||||
private final Map<String, String> convert;
|
||||
private final Map<String, String> split;
|
||||
private final Map<String, Tuple<Pattern, String>> gsub;
|
||||
private final Map<String, String> join;
|
||||
private final List<String> remove;
|
||||
private final List<String> trim;
|
||||
private final List<String> uppercase;
|
||||
private final List<String> lowercase;
|
||||
|
||||
public MutateProcessor(Map<String, Object> update,
|
||||
Map<String, String> rename,
|
||||
Map<String, String> convert,
|
||||
Map<String, String> split,
|
||||
Map<String, Tuple<Pattern, String>> gsub,
|
||||
Map<String, String> join,
|
||||
List<String> remove,
|
||||
List<String> trim,
|
||||
List<String> uppercase,
|
||||
List<String> lowercase) {
|
||||
this.update = update;
|
||||
this.rename = rename;
|
||||
this.convert = convert;
|
||||
this.split = split;
|
||||
this.gsub = gsub;
|
||||
this.join = join;
|
||||
this.remove = remove;
|
||||
this.trim = trim;
|
||||
this.uppercase = uppercase;
|
||||
this.lowercase = lowercase;
|
||||
}
|
||||
|
||||
public Map<String, Object> getUpdate() {
|
||||
return update;
|
||||
}
|
||||
|
||||
public Map<String, String> getRename() {
|
||||
return rename;
|
||||
}
|
||||
|
||||
public Map<String, String> getConvert() {
|
||||
return convert;
|
||||
}
|
||||
|
||||
public Map<String, String> getSplit() {
|
||||
return split;
|
||||
}
|
||||
|
||||
public Map<String, Tuple<Pattern, String>> getGsub() {
|
||||
return gsub;
|
||||
}
|
||||
|
||||
public Map<String, String> getJoin() {
|
||||
return join;
|
||||
}
|
||||
|
||||
public List<String> getRemove() {
|
||||
return remove;
|
||||
}
|
||||
|
||||
public List<String> getTrim() {
|
||||
return trim;
|
||||
}
|
||||
|
||||
public List<String> getUppercase() {
|
||||
return uppercase;
|
||||
}
|
||||
|
||||
public List<String> getLowercase() {
|
||||
return lowercase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
if (update != null) {
|
||||
doUpdate(data);
|
||||
}
|
||||
if (rename != null) {
|
||||
doRename(data);
|
||||
}
|
||||
if (convert != null) {
|
||||
doConvert(data);
|
||||
}
|
||||
if (split != null) {
|
||||
doSplit(data);
|
||||
}
|
||||
if (gsub != null) {
|
||||
doGsub(data);
|
||||
}
|
||||
if (join != null) {
|
||||
doJoin(data);
|
||||
}
|
||||
if (remove != null) {
|
||||
doRemove(data);
|
||||
}
|
||||
if (trim != null) {
|
||||
doTrim(data);
|
||||
}
|
||||
if (uppercase != null) {
|
||||
doUppercase(data);
|
||||
}
|
||||
if (lowercase != null) {
|
||||
doLowercase(data);
|
||||
}
|
||||
}
|
||||
|
||||
private void doUpdate(Data data) {
|
||||
for(Map.Entry<String, Object> entry : update.entrySet()) {
|
||||
data.addField(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void doRename(Data data) {
|
||||
for(Map.Entry<String, String> entry : rename.entrySet()) {
|
||||
if (data.containsProperty(entry.getKey())) {
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
data.getDocument().remove(entry.getKey());
|
||||
data.addField(entry.getValue(), oldVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Object parseValueAsType(Object oldVal, String toType) {
|
||||
switch (toType) {
|
||||
case "integer":
|
||||
oldVal = Integer.parseInt(oldVal.toString());
|
||||
break;
|
||||
case "float":
|
||||
oldVal = Float.parseFloat(oldVal.toString());
|
||||
break;
|
||||
case "string":
|
||||
oldVal = oldVal.toString();
|
||||
break;
|
||||
case "boolean":
|
||||
// TODO(talevy): Booleans#parseBoolean depends on Elasticsearch, should be moved into dedicated library.
|
||||
oldVal = Booleans.parseBoolean(oldVal.toString(), false);
|
||||
}
|
||||
|
||||
return oldVal;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doConvert(Data data) {
|
||||
for(Map.Entry<String, String> entry : convert.entrySet()) {
|
||||
String toType = entry.getValue();
|
||||
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
Object newVal;
|
||||
|
||||
if (oldVal instanceof List) {
|
||||
newVal = new ArrayList<>();
|
||||
for (Object e : ((List<Object>) oldVal)) {
|
||||
((List<Object>) newVal).add(parseValueAsType(e, toType));
|
||||
}
|
||||
} else {
|
||||
if (oldVal == null) {
|
||||
throw new IllegalArgumentException("Field \"" + entry.getKey() + "\" is null, cannot be converted to a/an " + toType);
|
||||
}
|
||||
newVal = parseValueAsType(oldVal, toType);
|
||||
}
|
||||
|
||||
data.addField(entry.getKey(), newVal);
|
||||
}
|
||||
}
|
||||
|
||||
private void doSplit(Data data) {
|
||||
for(Map.Entry<String, String> entry : split.entrySet()) {
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
if (oldVal instanceof String) {
|
||||
data.addField(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue())));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot split a field that is not a String type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doGsub(Data data) {
|
||||
for (Map.Entry<String, Tuple<Pattern, String>> entry : gsub.entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
Tuple<Pattern, String> matchAndReplace = entry.getValue();
|
||||
String oldVal = data.getProperty(fieldName);
|
||||
if (oldVal == null) {
|
||||
throw new IllegalArgumentException("Field \"" + fieldName + "\" is null, cannot match pattern.");
|
||||
}
|
||||
Matcher matcher = matchAndReplace.v1().matcher(oldVal);
|
||||
String newVal = matcher.replaceAll(matchAndReplace.v2());
|
||||
data.addField(entry.getKey(), newVal);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doJoin(Data data) {
|
||||
for(Map.Entry<String, String> entry : join.entrySet()) {
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
if (oldVal instanceof List) {
|
||||
String joined = (String) ((List) oldVal)
|
||||
.stream()
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.joining(entry.getValue()));
|
||||
|
||||
data.addField(entry.getKey(), joined);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot join field:" + entry.getKey() + " with type: " + oldVal.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doRemove(Data data) {
|
||||
for(String field : remove) {
|
||||
data.getDocument().remove(field);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTrim(Data data) {
|
||||
for(String field : trim) {
|
||||
Object val = data.getProperty(field);
|
||||
if (val instanceof String) {
|
||||
data.addField(field, ((String) val).trim());
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot trim field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doUppercase(Data data) {
|
||||
for(String field : uppercase) {
|
||||
Object val = data.getProperty(field);
|
||||
if (val instanceof String) {
|
||||
data.addField(field, ((String) val).toUpperCase(Locale.ROOT));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot uppercase field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doLowercase(Data data) {
|
||||
for(String field : lowercase) {
|
||||
Object val = data.getProperty(field);
|
||||
if (val instanceof String) {
|
||||
data.addField(field, ((String) val).toLowerCase(Locale.ROOT));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot lowercase field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Factory implements Processor.Factory<MutateProcessor> {
|
||||
@Override
|
||||
public MutateProcessor create(Map<String, Object> config) throws IOException {
|
||||
Map<String, Object> update = ConfigurationUtils.readOptionalObjectMap(config, "update");
|
||||
Map<String, String> rename = ConfigurationUtils.readOptionalStringMap(config, "rename");
|
||||
Map<String, String> convert = ConfigurationUtils.readOptionalStringMap(config, "convert");
|
||||
Map<String, String> split = ConfigurationUtils.readOptionalStringMap(config, "split");
|
||||
Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalStringListMap(config, "gsub");
|
||||
Map<String, String> join = ConfigurationUtils.readOptionalStringMap(config, "join");
|
||||
List<String> remove = ConfigurationUtils.readOptionalStringList(config, "remove");
|
||||
List<String> trim = ConfigurationUtils.readOptionalStringList(config, "trim");
|
||||
List<String> uppercase = ConfigurationUtils.readOptionalStringList(config, "uppercase");
|
||||
List<String> lowercase = ConfigurationUtils.readOptionalStringList(config, "lowercase");
|
||||
|
||||
// pre-compile regex patterns
|
||||
Map<String, Tuple<Pattern, String>> gsub = null;
|
||||
if (gsubConfig != null) {
|
||||
gsub = new HashMap<>();
|
||||
for (Map.Entry<String, List<String>> entry : gsubConfig.entrySet()) {
|
||||
List<String> searchAndReplace = entry.getValue();
|
||||
if (searchAndReplace.size() != 2) {
|
||||
throw new IllegalArgumentException("Invalid search and replace values (" + Arrays.toString(searchAndReplace.toArray()) + ") for field: " + entry.getKey());
|
||||
}
|
||||
Pattern searchPattern = Pattern.compile(searchAndReplace.get(0));
|
||||
gsub.put(entry.getKey(), new Tuple<>(searchPattern, searchAndReplace.get(1)));
|
||||
}
|
||||
}
|
||||
|
||||
return new MutateProcessor(
|
||||
(update == null) ? null : Collections.unmodifiableMap(update),
|
||||
(rename == null) ? null : Collections.unmodifiableMap(rename),
|
||||
(convert == null) ? null : Collections.unmodifiableMap(convert),
|
||||
(split == null) ? null : Collections.unmodifiableMap(split),
|
||||
(gsub == null) ? null : Collections.unmodifiableMap(gsub),
|
||||
(join == null) ? null : Collections.unmodifiableMap(join),
|
||||
(remove == null) ? null : Collections.unmodifiableList(remove),
|
||||
(trim == null) ? null : Collections.unmodifiableList(trim),
|
||||
(uppercase == null) ? null : Collections.unmodifiableList(uppercase),
|
||||
(lowercase == null) ? null : Collections.unmodifiableList(lowercase));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.ingest.processor.Processor;
|
|||
import org.elasticsearch.ingest.processor.date.DateProcessor;
|
||||
import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
|
||||
import org.elasticsearch.ingest.processor.grok.GrokProcessor;
|
||||
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
|
||||
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
|
||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||
|
||||
|
@ -46,6 +47,7 @@ public class IngestModule extends AbstractModule {
|
|||
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
|
||||
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
|
||||
addProcessor(DateProcessor.TYPE, new DateProcessor.Factory());
|
||||
addProcessor(MutateProcessor.TYPE, new MutateProcessor.Factory());
|
||||
|
||||
MapBinder<String, Processor.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
|
||||
for (Map.Entry<String, Processor.Factory> entry : processors.entrySet()) {
|
||||
|
|
|
@ -49,6 +49,22 @@ public class DataTests extends ESTestCase {
|
|||
assertThat(data.getProperty("fizz.buzz"), equalTo("hello world"));
|
||||
}
|
||||
|
||||
public void testContainsProperty() {
|
||||
assertTrue(data.containsProperty("fizz"));
|
||||
}
|
||||
|
||||
public void testContainsProperty_Nested() {
|
||||
assertTrue(data.containsProperty("fizz.buzz"));
|
||||
}
|
||||
|
||||
public void testContainsProperty_NotFound() {
|
||||
assertFalse(data.containsProperty("doesnotexist"));
|
||||
}
|
||||
|
||||
public void testContainsProperty_NestedNotFound() {
|
||||
assertFalse(data.containsProperty("fizz.doesnotexist"));
|
||||
}
|
||||
|
||||
public void testSimpleAddField() {
|
||||
data.addField("new_field", "foo");
|
||||
assertThat(data.getDocument().get("new_field"), equalTo("foo"));
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
|
||||
public class ConfigurationUtilsTests extends ESTestCase {
|
||||
private Map<String, Object> config;
|
||||
|
||||
@Before
|
||||
public void setConfig() {
|
||||
config = new HashMap<>();
|
||||
config.put("foo", "bar");
|
||||
config.put("arr", Arrays.asList("1", "2", "3"));
|
||||
List<Integer> list = new ArrayList<>();
|
||||
list.add(2);
|
||||
config.put("int", list);
|
||||
config.put("ip", "127.0.0.1");
|
||||
Map<String, Object> fizz = new HashMap<>();
|
||||
fizz.put("buzz", "hello world");
|
||||
config.put("fizz", fizz);
|
||||
}
|
||||
|
||||
public void testReadStringProperty() {
|
||||
String val = ConfigurationUtils.readStringProperty(config, "foo");
|
||||
assertThat(val, equalTo("bar"));
|
||||
}
|
||||
|
||||
public void testReadStringProperty_InvalidType() {
|
||||
try {
|
||||
ConfigurationUtils.readStringProperty(config, "arr");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("property [arr] isn't a string, but of type [java.util.Arrays$ArrayList]"));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer>
|
||||
public void testOptional_InvalidType() {
|
||||
List<String> val = ConfigurationUtils.readStringList(config, "int");
|
||||
assertThat(val, equalTo(Arrays.asList(2)));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class MutateProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
public void testCreate() throws Exception {
|
||||
MutateProcessor.Factory factory = new MutateProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
Map<String, Object> update = new HashMap<>();
|
||||
update.put("foo", 123);
|
||||
config.put("update", update);
|
||||
MutateProcessor processor = factory.create(config);
|
||||
assertThat(processor.getUpdate(), equalTo(update));
|
||||
}
|
||||
|
||||
public void testCreateGsubPattern() throws Exception {
|
||||
MutateProcessor.Factory factory = new MutateProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
Map<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("foo", Arrays.asList("\\s.*e\\s", "<word_ending_with_e>"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
Map<String, Tuple<Pattern, String>> compiledGsub = new HashMap<>();
|
||||
Pattern searchPattern = Pattern.compile("\\s.*e\\s");
|
||||
compiledGsub.put("foo", new Tuple<>(searchPattern, "<word_ending_with_e>"));
|
||||
|
||||
MutateProcessor processor = factory.create(config);
|
||||
for (Map.Entry<String, Tuple<Pattern, String>> entry : compiledGsub.entrySet()) {
|
||||
Tuple<Pattern, String> actualSearchAndReplace = processor.getGsub().get(entry.getKey());
|
||||
assertThat(actualSearchAndReplace, notNullValue());
|
||||
assertThat(actualSearchAndReplace.v1().pattern(), equalTo(entry.getValue().v1().pattern()));
|
||||
assertThat(actualSearchAndReplace.v2(), equalTo(entry.getValue().v2()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCreateGsubPattern_InvalidFormat() throws Exception {
|
||||
MutateProcessor.Factory factory = new MutateProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
Map<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("foo", Arrays.asList("only_one"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
try {
|
||||
factory.create(config);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Invalid search and replace values ([only_one]) for field: foo"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
||||
public class MutateProcessorTests extends ESTestCase {
|
||||
private static final MutateProcessor.Factory FACTORY = new MutateProcessor.Factory();
|
||||
private Data data;
|
||||
private Map<String, Object> config;
|
||||
|
||||
@Before
|
||||
public void setData() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("foo", "bar");
|
||||
document.put("alpha", "aBcD");
|
||||
document.put("num", "64");
|
||||
document.put("to_strip", " clean ");
|
||||
document.put("arr", Arrays.asList("1", "2", "3"));
|
||||
document.put("ip", "127.0.0.1");
|
||||
Map<String, Object> fizz = new HashMap<>();
|
||||
fizz.put("buzz", "hello world");
|
||||
document.put("fizz", fizz);
|
||||
|
||||
data = new Data("index", "type", "id", document);
|
||||
config = new HashMap<>();
|
||||
}
|
||||
|
||||
public void testUpdate() throws IOException {
|
||||
Map<String, Object> update = new HashMap<>();
|
||||
update.put("foo", 123);
|
||||
config.put("update", update);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), equalTo(123));
|
||||
}
|
||||
|
||||
public void testRename() throws IOException {
|
||||
Map<String, String> rename = new HashMap<>();
|
||||
rename.put("foo", "bar");
|
||||
config.put("rename", rename);
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("bar"), equalTo("bar"));
|
||||
assertThat(data.containsProperty("foo"), is(false));
|
||||
}
|
||||
|
||||
public void testConvert() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("num", "integer");
|
||||
config.put("convert", convert);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("num"), equalTo(64));
|
||||
}
|
||||
|
||||
public void testConvert_NullField() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("null", "integer");
|
||||
config.put("convert", convert);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
try {
|
||||
processor.execute(data);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Field \"null\" is null, cannot be converted to a/an integer"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testConvert_List() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("arr", "integer");
|
||||
config.put("convert", convert);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("arr"), equalTo(Arrays.asList(1, 2, 3)));
|
||||
}
|
||||
|
||||
public void testSplit() throws IOException {
|
||||
HashMap<String, String> split = new HashMap<>();
|
||||
split.put("ip", "\\.");
|
||||
config.put("split", split);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("ip"), equalTo(Arrays.asList("127", "0", "0", "1")));
|
||||
}
|
||||
|
||||
public void testGsub() throws IOException {
|
||||
HashMap<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("ip", Arrays.asList("\\.", "-"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("ip"), equalTo("127-0-0-1"));
|
||||
}
|
||||
|
||||
public void testGsub_NullValue() throws IOException {
|
||||
HashMap<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("null_field", Arrays.asList("\\.", "-"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
try {
|
||||
processor.execute(data);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Field \"null_field\" is null, cannot match pattern."));
|
||||
}
|
||||
}
|
||||
|
||||
public void testJoin() throws IOException {
|
||||
HashMap<String, String> join = new HashMap<>();
|
||||
join.put("arr", "-");
|
||||
config.put("join", join);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("arr"), equalTo("1-2-3"));
|
||||
}
|
||||
|
||||
public void testRemove() throws IOException {
|
||||
List<String> remove = Arrays.asList("foo", "ip");
|
||||
config.put("remove", remove);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), nullValue());
|
||||
assertThat(data.getProperty("ip"), nullValue());
|
||||
}
|
||||
|
||||
public void testTrim() throws IOException {
|
||||
List<String> trim = Arrays.asList("to_strip", "foo");
|
||||
config.put("trim", trim);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), equalTo("bar"));
|
||||
assertThat(data.getProperty("to_strip"), equalTo("clean"));
|
||||
}
|
||||
|
||||
public void testUppercase() throws IOException {
|
||||
List<String> uppercase = Arrays.asList("foo");
|
||||
config.put("uppercase", uppercase);
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), equalTo("BAR"));
|
||||
}
|
||||
|
||||
public void testLowercase() throws IOException {
|
||||
List<String> lowercase = Arrays.asList("alpha");
|
||||
config.put("lowercase", lowercase);
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("alpha"), equalTo("abcd"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
---
|
||||
"Test mutate processor":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"mutate" : {
|
||||
"rename" : {
|
||||
"field1": "foo"
|
||||
},
|
||||
"update" : {
|
||||
"field2": "bar"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 99
|
||||
timeout: 2s
|
||||
- match: { "timed_out": true }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field1: "val"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.foo: "val" }
|
||||
- match: { _source.field2: "bar" }
|
Loading…
Reference in New Issue