commit
33035611a5
|
@ -1,7 +1,158 @@
|
|||
[[ingest]]
|
||||
== Ingest Plugin
|
||||
|
||||
TODO
|
||||
=== Processors
|
||||
|
||||
==== Mutate Processor
|
||||
|
||||
The Mutate Processor applies functions on the structure of a document. The processor comes with a few
|
||||
functions to help achieve this.
|
||||
|
||||
The following are the supported configuration actions and how to use them.
|
||||
|
||||
===== Convert
|
||||
Convert a field's value to a different type, like turning a string to an integer.
|
||||
If the field value is an array, all members will be converted.
|
||||
|
||||
The supported types include: `integer`, `float`, `string`, and `boolean`.
|
||||
|
||||
`boolean` will set a field to "true" if its string value does not match any of the following: "false", "0", "off", "no".
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"convert": {
|
||||
"field1": "integer",
|
||||
"field2": "float"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Gsub
|
||||
Convert a string field by applying a regular expression and a replacement.
|
||||
If the field is not a string, no action will be taken.
|
||||
|
||||
This configuration takes an array consisting of two elements per field/substition. One for the
|
||||
pattern to be replaced, and the second for the pattern to replace with.
|
||||
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"gsub": {
|
||||
"field1": ["\.", "-"]
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Join
|
||||
Join an array with a separator character. Does nothing on non-array fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"join": {
|
||||
"joined_array_field": "other_array_field"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Lowercase
|
||||
Convert a string to its lowercase equivalent.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"lowercase": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Remove
|
||||
Remove one or more fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"remove": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Rename
|
||||
Renames one or more fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"rename": {
|
||||
"foo": "update_foo",
|
||||
"bar": "new_bar"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Split
|
||||
Split a field to an array using a separator character. Only works on string fields.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"split": {
|
||||
"message": ","
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Strip
|
||||
Strip whitespace from field. NOTE: this only works on leading and trailing whitespace.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"strip": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Update
|
||||
Update an existing field with a new value. If the field does not exist, then no action will be taken.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"update": {
|
||||
"field": 582.1
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
===== Uppercase
|
||||
Convert a string to its uppercase equivalent.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"mutate": {
|
||||
"uppercase": ["foo", "bar"]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
=== Processors
|
||||
|
||||
|
|
|
@ -54,6 +54,36 @@ public final class Data {
|
|||
return (T) XContentMapValues.extractValue(path, document);
|
||||
}
|
||||
|
||||
public boolean containsProperty(String path) {
|
||||
boolean containsProperty = false;
|
||||
String[] pathElements = Strings.splitStringToArray(path, '.');
|
||||
if (pathElements.length == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Map<String, Object> inner = document;
|
||||
|
||||
for (int i = 0; i < pathElements.length; i++) {
|
||||
if (inner == null) {
|
||||
containsProperty = false;
|
||||
break;
|
||||
}
|
||||
if (i == pathElements.length - 1) {
|
||||
containsProperty = inner.containsKey(pathElements[i]);
|
||||
break;
|
||||
}
|
||||
|
||||
Object obj = inner.get(pathElements[i]);
|
||||
if (obj instanceof Map) {
|
||||
inner = (Map<String, Object>) obj;
|
||||
} else {
|
||||
inner = null;
|
||||
}
|
||||
}
|
||||
|
||||
return containsProperty;
|
||||
}
|
||||
|
||||
/**
|
||||
* add `value` to path in document. If path does not exist,
|
||||
* nested hashmaps will be put in as parent key values until
|
||||
|
|
|
@ -92,4 +92,82 @@ public final class ConfigurationUtils {
|
|||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type list from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type list an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static List<String> readOptionalStringList(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> stringList = (List<String>) value;
|
||||
return stringList;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a list, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type map from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static Map<String, List<String>> readOptionalStringListMap(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, List<String>> stringList = (Map<String, List<String>>) value;
|
||||
return stringList;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type map from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static Map<String, String> readOptionalStringMap(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (value instanceof Map) {
|
||||
Map<String, String> map = (Map<String, String>) value;
|
||||
return map;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns and removes the specified property of type map from the specified configuration map.
|
||||
*
|
||||
* If the property value isn't of type map an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public static Map<String, Object> readOptionalObjectMap(Map<String, Object> configuration, String propertyName) {
|
||||
Object value = configuration.remove(propertyName);
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (value instanceof Map) {
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
return map;
|
||||
} else {
|
||||
throw new IllegalArgumentException("property [" + propertyName + "] isn't a map, but of type [" + value.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,326 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class MutateProcessor implements Processor {
|
||||
|
||||
public static final String TYPE = "mutate";
|
||||
|
||||
private final Map<String, Object> update;
|
||||
private final Map<String, String> rename;
|
||||
private final Map<String, String> convert;
|
||||
private final Map<String, String> split;
|
||||
private final Map<String, Tuple<Pattern, String>> gsub;
|
||||
private final Map<String, String> join;
|
||||
private final List<String> remove;
|
||||
private final List<String> trim;
|
||||
private final List<String> uppercase;
|
||||
private final List<String> lowercase;
|
||||
|
||||
public MutateProcessor(Map<String, Object> update,
|
||||
Map<String, String> rename,
|
||||
Map<String, String> convert,
|
||||
Map<String, String> split,
|
||||
Map<String, Tuple<Pattern, String>> gsub,
|
||||
Map<String, String> join,
|
||||
List<String> remove,
|
||||
List<String> trim,
|
||||
List<String> uppercase,
|
||||
List<String> lowercase) {
|
||||
this.update = update;
|
||||
this.rename = rename;
|
||||
this.convert = convert;
|
||||
this.split = split;
|
||||
this.gsub = gsub;
|
||||
this.join = join;
|
||||
this.remove = remove;
|
||||
this.trim = trim;
|
||||
this.uppercase = uppercase;
|
||||
this.lowercase = lowercase;
|
||||
}
|
||||
|
||||
public Map<String, Object> getUpdate() {
|
||||
return update;
|
||||
}
|
||||
|
||||
public Map<String, String> getRename() {
|
||||
return rename;
|
||||
}
|
||||
|
||||
public Map<String, String> getConvert() {
|
||||
return convert;
|
||||
}
|
||||
|
||||
public Map<String, String> getSplit() {
|
||||
return split;
|
||||
}
|
||||
|
||||
public Map<String, Tuple<Pattern, String>> getGsub() {
|
||||
return gsub;
|
||||
}
|
||||
|
||||
public Map<String, String> getJoin() {
|
||||
return join;
|
||||
}
|
||||
|
||||
public List<String> getRemove() {
|
||||
return remove;
|
||||
}
|
||||
|
||||
public List<String> getTrim() {
|
||||
return trim;
|
||||
}
|
||||
|
||||
public List<String> getUppercase() {
|
||||
return uppercase;
|
||||
}
|
||||
|
||||
public List<String> getLowercase() {
|
||||
return lowercase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Data data) {
|
||||
if (update != null) {
|
||||
doUpdate(data);
|
||||
}
|
||||
if (rename != null) {
|
||||
doRename(data);
|
||||
}
|
||||
if (convert != null) {
|
||||
doConvert(data);
|
||||
}
|
||||
if (split != null) {
|
||||
doSplit(data);
|
||||
}
|
||||
if (gsub != null) {
|
||||
doGsub(data);
|
||||
}
|
||||
if (join != null) {
|
||||
doJoin(data);
|
||||
}
|
||||
if (remove != null) {
|
||||
doRemove(data);
|
||||
}
|
||||
if (trim != null) {
|
||||
doTrim(data);
|
||||
}
|
||||
if (uppercase != null) {
|
||||
doUppercase(data);
|
||||
}
|
||||
if (lowercase != null) {
|
||||
doLowercase(data);
|
||||
}
|
||||
}
|
||||
|
||||
private void doUpdate(Data data) {
|
||||
for(Map.Entry<String, Object> entry : update.entrySet()) {
|
||||
data.addField(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void doRename(Data data) {
|
||||
for(Map.Entry<String, String> entry : rename.entrySet()) {
|
||||
if (data.containsProperty(entry.getKey())) {
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
data.getDocument().remove(entry.getKey());
|
||||
data.addField(entry.getValue(), oldVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Object parseValueAsType(Object oldVal, String toType) {
|
||||
switch (toType) {
|
||||
case "integer":
|
||||
oldVal = Integer.parseInt(oldVal.toString());
|
||||
break;
|
||||
case "float":
|
||||
oldVal = Float.parseFloat(oldVal.toString());
|
||||
break;
|
||||
case "string":
|
||||
oldVal = oldVal.toString();
|
||||
break;
|
||||
case "boolean":
|
||||
// TODO(talevy): Booleans#parseBoolean depends on Elasticsearch, should be moved into dedicated library.
|
||||
oldVal = Booleans.parseBoolean(oldVal.toString(), false);
|
||||
}
|
||||
|
||||
return oldVal;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doConvert(Data data) {
|
||||
for(Map.Entry<String, String> entry : convert.entrySet()) {
|
||||
String toType = entry.getValue();
|
||||
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
Object newVal;
|
||||
|
||||
if (oldVal instanceof List) {
|
||||
newVal = new ArrayList<>();
|
||||
for (Object e : ((List<Object>) oldVal)) {
|
||||
((List<Object>) newVal).add(parseValueAsType(e, toType));
|
||||
}
|
||||
} else {
|
||||
if (oldVal == null) {
|
||||
throw new IllegalArgumentException("Field \"" + entry.getKey() + "\" is null, cannot be converted to a/an " + toType);
|
||||
}
|
||||
newVal = parseValueAsType(oldVal, toType);
|
||||
}
|
||||
|
||||
data.addField(entry.getKey(), newVal);
|
||||
}
|
||||
}
|
||||
|
||||
private void doSplit(Data data) {
|
||||
for(Map.Entry<String, String> entry : split.entrySet()) {
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
if (oldVal instanceof String) {
|
||||
data.addField(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue())));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot split a field that is not a String type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doGsub(Data data) {
|
||||
for (Map.Entry<String, Tuple<Pattern, String>> entry : gsub.entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
Tuple<Pattern, String> matchAndReplace = entry.getValue();
|
||||
String oldVal = data.getProperty(fieldName);
|
||||
if (oldVal == null) {
|
||||
throw new IllegalArgumentException("Field \"" + fieldName + "\" is null, cannot match pattern.");
|
||||
}
|
||||
Matcher matcher = matchAndReplace.v1().matcher(oldVal);
|
||||
String newVal = matcher.replaceAll(matchAndReplace.v2());
|
||||
data.addField(entry.getKey(), newVal);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void doJoin(Data data) {
|
||||
for(Map.Entry<String, String> entry : join.entrySet()) {
|
||||
Object oldVal = data.getProperty(entry.getKey());
|
||||
if (oldVal instanceof List) {
|
||||
String joined = (String) ((List) oldVal)
|
||||
.stream()
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.joining(entry.getValue()));
|
||||
|
||||
data.addField(entry.getKey(), joined);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot join field:" + entry.getKey() + " with type: " + oldVal.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doRemove(Data data) {
|
||||
for(String field : remove) {
|
||||
data.getDocument().remove(field);
|
||||
}
|
||||
}
|
||||
|
||||
private void doTrim(Data data) {
|
||||
for(String field : trim) {
|
||||
Object val = data.getProperty(field);
|
||||
if (val instanceof String) {
|
||||
data.addField(field, ((String) val).trim());
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot trim field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doUppercase(Data data) {
|
||||
for(String field : uppercase) {
|
||||
Object val = data.getProperty(field);
|
||||
if (val instanceof String) {
|
||||
data.addField(field, ((String) val).toUpperCase(Locale.ROOT));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot uppercase field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doLowercase(Data data) {
|
||||
for(String field : lowercase) {
|
||||
Object val = data.getProperty(field);
|
||||
if (val instanceof String) {
|
||||
data.addField(field, ((String) val).toLowerCase(Locale.ROOT));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Cannot lowercase field:" + field + " with type: " + val.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Factory implements Processor.Factory<MutateProcessor> {
|
||||
@Override
|
||||
public MutateProcessor create(Map<String, Object> config) throws IOException {
|
||||
Map<String, Object> update = ConfigurationUtils.readOptionalObjectMap(config, "update");
|
||||
Map<String, String> rename = ConfigurationUtils.readOptionalStringMap(config, "rename");
|
||||
Map<String, String> convert = ConfigurationUtils.readOptionalStringMap(config, "convert");
|
||||
Map<String, String> split = ConfigurationUtils.readOptionalStringMap(config, "split");
|
||||
Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalStringListMap(config, "gsub");
|
||||
Map<String, String> join = ConfigurationUtils.readOptionalStringMap(config, "join");
|
||||
List<String> remove = ConfigurationUtils.readOptionalStringList(config, "remove");
|
||||
List<String> trim = ConfigurationUtils.readOptionalStringList(config, "trim");
|
||||
List<String> uppercase = ConfigurationUtils.readOptionalStringList(config, "uppercase");
|
||||
List<String> lowercase = ConfigurationUtils.readOptionalStringList(config, "lowercase");
|
||||
|
||||
// pre-compile regex patterns
|
||||
Map<String, Tuple<Pattern, String>> gsub = null;
|
||||
if (gsubConfig != null) {
|
||||
gsub = new HashMap<>();
|
||||
for (Map.Entry<String, List<String>> entry : gsubConfig.entrySet()) {
|
||||
List<String> searchAndReplace = entry.getValue();
|
||||
if (searchAndReplace.size() != 2) {
|
||||
throw new IllegalArgumentException("Invalid search and replace values (" + Arrays.toString(searchAndReplace.toArray()) + ") for field: " + entry.getKey());
|
||||
}
|
||||
Pattern searchPattern = Pattern.compile(searchAndReplace.get(0));
|
||||
gsub.put(entry.getKey(), new Tuple<>(searchPattern, searchAndReplace.get(1)));
|
||||
}
|
||||
}
|
||||
|
||||
return new MutateProcessor(
|
||||
(update == null) ? null : Collections.unmodifiableMap(update),
|
||||
(rename == null) ? null : Collections.unmodifiableMap(rename),
|
||||
(convert == null) ? null : Collections.unmodifiableMap(convert),
|
||||
(split == null) ? null : Collections.unmodifiableMap(split),
|
||||
(gsub == null) ? null : Collections.unmodifiableMap(gsub),
|
||||
(join == null) ? null : Collections.unmodifiableMap(join),
|
||||
(remove == null) ? null : Collections.unmodifiableList(remove),
|
||||
(trim == null) ? null : Collections.unmodifiableList(trim),
|
||||
(uppercase == null) ? null : Collections.unmodifiableList(uppercase),
|
||||
(lowercase == null) ? null : Collections.unmodifiableList(lowercase));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.ingest.processor.Processor;
|
|||
import org.elasticsearch.ingest.processor.date.DateProcessor;
|
||||
import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
|
||||
import org.elasticsearch.ingest.processor.grok.GrokProcessor;
|
||||
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
|
||||
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
|
||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||
|
||||
|
@ -46,6 +47,7 @@ public class IngestModule extends AbstractModule {
|
|||
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
|
||||
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
|
||||
addProcessor(DateProcessor.TYPE, new DateProcessor.Factory());
|
||||
addProcessor(MutateProcessor.TYPE, new MutateProcessor.Factory());
|
||||
|
||||
MapBinder<String, Processor.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
|
||||
for (Map.Entry<String, Processor.Factory> entry : processors.entrySet()) {
|
||||
|
|
|
@ -49,6 +49,22 @@ public class DataTests extends ESTestCase {
|
|||
assertThat(data.getProperty("fizz.buzz"), equalTo("hello world"));
|
||||
}
|
||||
|
||||
public void testContainsProperty() {
|
||||
assertTrue(data.containsProperty("fizz"));
|
||||
}
|
||||
|
||||
public void testContainsProperty_Nested() {
|
||||
assertTrue(data.containsProperty("fizz.buzz"));
|
||||
}
|
||||
|
||||
public void testContainsProperty_NotFound() {
|
||||
assertFalse(data.containsProperty("doesnotexist"));
|
||||
}
|
||||
|
||||
public void testContainsProperty_NestedNotFound() {
|
||||
assertFalse(data.containsProperty("fizz.doesnotexist"));
|
||||
}
|
||||
|
||||
public void testSimpleAddField() {
|
||||
data.addField("new_field", "foo");
|
||||
assertThat(data.getDocument().get("new_field"), equalTo("foo"));
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
|
||||
public class ConfigurationUtilsTests extends ESTestCase {
|
||||
private Map<String, Object> config;
|
||||
|
||||
@Before
|
||||
public void setConfig() {
|
||||
config = new HashMap<>();
|
||||
config.put("foo", "bar");
|
||||
config.put("arr", Arrays.asList("1", "2", "3"));
|
||||
List<Integer> list = new ArrayList<>();
|
||||
list.add(2);
|
||||
config.put("int", list);
|
||||
config.put("ip", "127.0.0.1");
|
||||
Map<String, Object> fizz = new HashMap<>();
|
||||
fizz.put("buzz", "hello world");
|
||||
config.put("fizz", fizz);
|
||||
}
|
||||
|
||||
public void testReadStringProperty() {
|
||||
String val = ConfigurationUtils.readStringProperty(config, "foo");
|
||||
assertThat(val, equalTo("bar"));
|
||||
}
|
||||
|
||||
public void testReadStringProperty_InvalidType() {
|
||||
try {
|
||||
ConfigurationUtils.readStringProperty(config, "arr");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("property [arr] isn't a string, but of type [java.util.Arrays$ArrayList]"));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(talevy): Issue with generics. This test should fail, "int" is of type List<Integer>
|
||||
public void testOptional_InvalidType() {
|
||||
List<String> val = ConfigurationUtils.readStringList(config, "int");
|
||||
assertThat(val, equalTo(Arrays.asList(2)));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class MutateProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
public void testCreate() throws Exception {
|
||||
MutateProcessor.Factory factory = new MutateProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
Map<String, Object> update = new HashMap<>();
|
||||
update.put("foo", 123);
|
||||
config.put("update", update);
|
||||
MutateProcessor processor = factory.create(config);
|
||||
assertThat(processor.getUpdate(), equalTo(update));
|
||||
}
|
||||
|
||||
public void testCreateGsubPattern() throws Exception {
|
||||
MutateProcessor.Factory factory = new MutateProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
Map<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("foo", Arrays.asList("\\s.*e\\s", "<word_ending_with_e>"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
Map<String, Tuple<Pattern, String>> compiledGsub = new HashMap<>();
|
||||
Pattern searchPattern = Pattern.compile("\\s.*e\\s");
|
||||
compiledGsub.put("foo", new Tuple<>(searchPattern, "<word_ending_with_e>"));
|
||||
|
||||
MutateProcessor processor = factory.create(config);
|
||||
for (Map.Entry<String, Tuple<Pattern, String>> entry : compiledGsub.entrySet()) {
|
||||
Tuple<Pattern, String> actualSearchAndReplace = processor.getGsub().get(entry.getKey());
|
||||
assertThat(actualSearchAndReplace, notNullValue());
|
||||
assertThat(actualSearchAndReplace.v1().pattern(), equalTo(entry.getValue().v1().pattern()));
|
||||
assertThat(actualSearchAndReplace.v2(), equalTo(entry.getValue().v2()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCreateGsubPattern_InvalidFormat() throws Exception {
|
||||
MutateProcessor.Factory factory = new MutateProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
Map<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("foo", Arrays.asList("only_one"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
try {
|
||||
factory.create(config);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Invalid search and replace values ([only_one]) for field: foo"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.mutate;
|
||||
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
||||
public class MutateProcessorTests extends ESTestCase {
|
||||
private static final MutateProcessor.Factory FACTORY = new MutateProcessor.Factory();
|
||||
private Data data;
|
||||
private Map<String, Object> config;
|
||||
|
||||
@Before
|
||||
public void setData() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("foo", "bar");
|
||||
document.put("alpha", "aBcD");
|
||||
document.put("num", "64");
|
||||
document.put("to_strip", " clean ");
|
||||
document.put("arr", Arrays.asList("1", "2", "3"));
|
||||
document.put("ip", "127.0.0.1");
|
||||
Map<String, Object> fizz = new HashMap<>();
|
||||
fizz.put("buzz", "hello world");
|
||||
document.put("fizz", fizz);
|
||||
|
||||
data = new Data("index", "type", "id", document);
|
||||
config = new HashMap<>();
|
||||
}
|
||||
|
||||
public void testUpdate() throws IOException {
|
||||
Map<String, Object> update = new HashMap<>();
|
||||
update.put("foo", 123);
|
||||
config.put("update", update);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), equalTo(123));
|
||||
}
|
||||
|
||||
public void testRename() throws IOException {
|
||||
Map<String, String> rename = new HashMap<>();
|
||||
rename.put("foo", "bar");
|
||||
config.put("rename", rename);
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("bar"), equalTo("bar"));
|
||||
assertThat(data.containsProperty("foo"), is(false));
|
||||
}
|
||||
|
||||
public void testConvert() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("num", "integer");
|
||||
config.put("convert", convert);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("num"), equalTo(64));
|
||||
}
|
||||
|
||||
public void testConvert_NullField() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("null", "integer");
|
||||
config.put("convert", convert);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
try {
|
||||
processor.execute(data);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Field \"null\" is null, cannot be converted to a/an integer"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testConvert_List() throws IOException {
|
||||
Map<String, String> convert = new HashMap<>();
|
||||
convert.put("arr", "integer");
|
||||
config.put("convert", convert);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("arr"), equalTo(Arrays.asList(1, 2, 3)));
|
||||
}
|
||||
|
||||
public void testSplit() throws IOException {
|
||||
HashMap<String, String> split = new HashMap<>();
|
||||
split.put("ip", "\\.");
|
||||
config.put("split", split);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("ip"), equalTo(Arrays.asList("127", "0", "0", "1")));
|
||||
}
|
||||
|
||||
public void testGsub() throws IOException {
|
||||
HashMap<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("ip", Arrays.asList("\\.", "-"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("ip"), equalTo("127-0-0-1"));
|
||||
}
|
||||
|
||||
public void testGsub_NullValue() throws IOException {
|
||||
HashMap<String, List<String>> gsub = new HashMap<>();
|
||||
gsub.put("null_field", Arrays.asList("\\.", "-"));
|
||||
config.put("gsub", gsub);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
try {
|
||||
processor.execute(data);
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Field \"null_field\" is null, cannot match pattern."));
|
||||
}
|
||||
}
|
||||
|
||||
public void testJoin() throws IOException {
|
||||
HashMap<String, String> join = new HashMap<>();
|
||||
join.put("arr", "-");
|
||||
config.put("join", join);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("arr"), equalTo("1-2-3"));
|
||||
}
|
||||
|
||||
public void testRemove() throws IOException {
|
||||
List<String> remove = Arrays.asList("foo", "ip");
|
||||
config.put("remove", remove);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), nullValue());
|
||||
assertThat(data.getProperty("ip"), nullValue());
|
||||
}
|
||||
|
||||
public void testTrim() throws IOException {
|
||||
List<String> trim = Arrays.asList("to_strip", "foo");
|
||||
config.put("trim", trim);
|
||||
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), equalTo("bar"));
|
||||
assertThat(data.getProperty("to_strip"), equalTo("clean"));
|
||||
}
|
||||
|
||||
public void testUppercase() throws IOException {
|
||||
List<String> uppercase = Arrays.asList("foo");
|
||||
config.put("uppercase", uppercase);
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("foo"), equalTo("BAR"));
|
||||
}
|
||||
|
||||
public void testLowercase() throws IOException {
|
||||
List<String> lowercase = Arrays.asList("alpha");
|
||||
config.put("lowercase", lowercase);
|
||||
Processor processor = FACTORY.create(config);
|
||||
processor.execute(data);
|
||||
assertThat(data.getProperty("alpha"), equalTo("abcd"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
---
|
||||
"Test mutate processor":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"mutate" : {
|
||||
"rename" : {
|
||||
"field1": "foo"
|
||||
},
|
||||
"update" : {
|
||||
"field2": "bar"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 99
|
||||
timeout: 2s
|
||||
- match: { "timed_out": true }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field1: "val"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.foo: "val" }
|
||||
- match: { _source.field2: "bar" }
|
Loading…
Reference in New Issue