Split mutate processor into one processor per function

This commit is contained in:
javanna 2015-11-23 14:38:31 +01:00 committed by Luca Cavanna
parent 1e9d5c7b22
commit 8f1f5d4da0
47 changed files with 2984 additions and 971 deletions

View File

@ -3,26 +3,64 @@
=== Processors
==== Mutate Processor
The Mutate Processor applies functions on the structure of a document. The processor comes with a few
functions to help achieve this.
The following are the supported configuration actions and how to use them.
===== Convert
Convert a field's value to a different type, like turning a string to an integer.
If the field value is an array, all members will be converted.
The supported types include: `integer`, `float`, `string`, and `boolean`.
`boolean` will set a field to "true" if its string value does not match any of the following: "false", "0", "off", "no".
==== Add processor
Adds one or more fields and associates them with the specified values. If a field already exists,
its value will be replaced with the provided one.
[source,js]
--------------------------------------------------
{
"mutate": {
"convert": {
"add": {
"fields": {
"field": 582.1
}
}
}
--------------------------------------------------
==== Remove processor
Removes one or more existing fields. If a field doesn't exist, nothing will happen.
[source,js]
--------------------------------------------------
{
"remove": {
"fields": [
"field1","field2"
]
}
}
--------------------------------------------------
==== Rename processor
Renames one or more existing fields. If a field doesn't exist, an exception will be thrown.
[source,js]
--------------------------------------------------
{
"rename": {
"fields": {
"field1": "field2"
}
}
}
--------------------------------------------------
==== Convert processor
Converts one or more field value to a different type, like turning a string to an integer.
If the field value is an array, all members will be converted.
The supported types include: `integer`, `float`, `string`, and `boolean`.
`boolean` will set a field to true if its string value is equal to `true` (ignore case), to
false if its string value is equal to `false` (ignore case) and it will throw exception otherwise.
[source,js]
--------------------------------------------------
{
"convert": {
"fields": {
"field1": "integer",
"field2": "float"
}
@ -30,132 +68,95 @@ The supported types include: `integer`, `float`, `string`, and `boolean`.
}
--------------------------------------------------
===== Gsub
Convert a string field by applying a regular expression and a replacement.
If the field is not a string, no action will be taken.
==== Gsub processor
Converts a string field by applying a regular expression and a replacement.
If the field is not a string, the processor will throw an exception.
This configuration takes an array consisting of two elements per field/substition. One for the
pattern to be replaced, and the second for the pattern to replace with.
This configuration takes an `expression` array consisting of objects. Each object
holds three elements: `field` for the field name, `pattern` for the
pattern to be replaced, and `replacement` for the string to replace the matching patterns with.
[source,js]
--------------------------------------------------
{
"mutate": {
"gsub": {
"field1": ["\.", "-"]
}
"gsub": {
"expressions": [
{
"field": "field1",
"pattern": "\.",
"replacement": "-"
}
]
}
}
--------------------------------------------------
===== Join
Join an array with a separator character. Does nothing on non-array fields.
==== Join processor
Joins each element of an array into a single string using a separator character between each element.
Throws error when the field is not an array.
[source,js]
--------------------------------------------------
{
"mutate": {
"join": {
"join": {
"fields": {
"joined_array_field": "other_array_field"
}
}
}
--------------------------------------------------
===== Lowercase
Convert a string to its lowercase equivalent.
[source,js]
--------------------------------------------------
{
"mutate": {
"lowercase": ["foo", "bar"]
}
}
--------------------------------------------------
===== Remove
Remove one or more fields.
[source,js]
--------------------------------------------------
{
"mutate": {
"remove": ["foo", "bar"]
}
}
--------------------------------------------------
===== Rename
Renames one or more fields.
[source,js]
--------------------------------------------------
{
"mutate": {
"rename": {
"foo": "update_foo",
"bar": "new_bar"
}
}
}
--------------------------------------------------
===== Split
==== Split processor
Split a field to an array using a separator character. Only works on string fields.
[source,js]
--------------------------------------------------
{
"mutate": {
"split": {
"split": {
"fields": {
"message": ","
}
}
}
--------------------------------------------------
===== Strip
Strip whitespace from field. NOTE: this only works on leading and trailing whitespace.
==== Lowercase processor
Converts a string to its lowercase equivalent.
[source,js]
--------------------------------------------------
{
"mutate": {
"strip": ["foo", "bar"]
"lowercase": {
"fields": ["foo", "bar"]
}
}
--------------------------------------------------
===== Update
Update an existing field with a new value. If the field does not exist, then no action will be taken.
==== Uppercase processor
Converts a string to its uppercase equivalent.
[source,js]
--------------------------------------------------
{
"mutate": {
"update": {
"field": 582.1
}
"uppercase": {
"fields": ["foo", "bar"]
}
}
--------------------------------------------------
===== Uppercase
Convert a string to its uppercase equivalent.
==== Trim processor
Trims whitespace from field. NOTE: this only works on leading and trailing whitespaces.
[source,js]
--------------------------------------------------
{
"mutate": {
"uppercase": ["foo", "bar"]
"trim": {
"fields": ["foo", "bar"]
}
}
--------------------------------------------------
=== Processors
==== Grok Processor
The Grok Processor extracts structured fields out of a single text field within a document. You choose which field to

View File

@ -86,7 +86,7 @@ public final class Pipeline {
Map<String, Object> processorConfig = entry.getValue();
processors.add(factory.create(processorConfig));
if (processorConfig.isEmpty() == false) {
throw new IllegalArgumentException("processor [" + entry.getKey() + "] doesn't support one or more provided configuration parameters [" + Arrays.toString(processorConfig.keySet().toArray()) + "]");
throw new IllegalArgumentException("processor [" + entry.getKey() + "] doesn't support one or more provided configuration parameters " + Arrays.toString(processorConfig.keySet().toArray()));
}
} else {
throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]");

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Base class for processors that manipulate strings and require a single "fields" array config value, which
* holds a list of field names in string format.
*/
public abstract class AbstractStringProcessor implements Processor {
private final Collection<String> fields;
protected AbstractStringProcessor(Collection<String> fields) {
this.fields = fields;
}
public Collection<String> getFields() {
return fields;
}
@Override
public final void execute(IngestDocument document) {
for(String field : fields) {
String val = document.getPropertyValue(field, String.class);
if (val == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
document.setPropertyValue(field, process(val));
}
}
protected abstract String process(String value);
public static abstract class Factory<T extends AbstractStringProcessor> implements Processor.Factory<T> {
@Override
public T create(Map<String, Object> config) throws IOException {
List<String> fields = ConfigurationUtils.readList(config, "fields");
return newProcessor(Collections.unmodifiableList(fields));
}
protected abstract T newProcessor(Collection<String> fields);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.add;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
* Processor that adds new fields with their corresponding values. If the field is already present, its value
* will be replaced with the provided one.
*/
public class AddProcessor implements Processor {
public static final String TYPE = "add";
private final Map<String, Object> fields;
AddProcessor(Map<String, Object> fields) {
this.fields = fields;
}
Map<String, Object> getFields() {
return fields;
}
@Override
public void execute(IngestDocument document) {
for(Map.Entry<String, Object> entry : fields.entrySet()) {
document.setPropertyValue(entry.getKey(), entry.getValue());
}
}
@Override
public String getType() {
return TYPE;
}
public static final class Factory implements Processor.Factory<AddProcessor> {
@Override
public AddProcessor create(Map<String, Object> config) throws IOException {
Map<String, Object> fields = ConfigurationUtils.readMap(config, "fields");
return new AddProcessor(Collections.unmodifiableMap(fields));
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.convert;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.*;
/**
* Processor that converts fields content to a different type. Supported types are: integer, float, boolean and string.
* Throws exception if the field is not there or the conversion fails.
*/
public class ConvertProcessor implements Processor {
enum Type {
INTEGER {
@Override
public Object convert(Object value) {
try {
return Integer.parseInt(value.toString());
} catch(NumberFormatException e) {
throw new IllegalArgumentException("unable to convert [" + value + "] to integer", e);
}
}
}, FLOAT {
@Override
public Object convert(Object value) {
try {
return Float.parseFloat(value.toString());
} catch(NumberFormatException e) {
throw new IllegalArgumentException("unable to convert [" + value + "] to float", e);
}
}
}, BOOLEAN {
@Override
public Object convert(Object value) {
if (value.toString().equalsIgnoreCase("true")) {
return true;
} else if (value.toString().equalsIgnoreCase("false")) {
return false;
} else {
throw new IllegalArgumentException("[" + value + "] is not a boolean value, cannot convert to boolean");
}
}
}, STRING {
@Override
public Object convert(Object value) {
return value.toString();
}
};
@Override
public final String toString() {
return name().toLowerCase(Locale.ROOT);
}
public abstract Object convert(Object value);
public static Type fromString(String type) {
try {
return Type.valueOf(type.toUpperCase(Locale.ROOT));
} catch(IllegalArgumentException e) {
throw new IllegalArgumentException("type [" + type + "] not supported, cannot convert field.", e);
}
}
}
public static final String TYPE = "convert";
private final Map<String, Type> fields;
ConvertProcessor(Map<String, Type> fields) {
this.fields = fields;
}
Map<String, Type> getFields() {
return fields;
}
@Override
public void execute(IngestDocument document) {
for(Map.Entry<String, Type> entry : fields.entrySet()) {
Type type = entry.getValue();
Object oldValue = document.getPropertyValue(entry.getKey(), Object.class);
Object newValue;
if (oldValue == null) {
throw new IllegalArgumentException("Field [" + entry.getKey() + "] is null, cannot be converted to type [" + type + "]");
}
if (oldValue instanceof List) {
List<?> list = (List<?>) oldValue;
List<Object> newList = new ArrayList<>();
for (Object value : list) {
newList.add(type.convert(value));
}
newValue = newList;
} else {
newValue = type.convert(oldValue);
}
document.setPropertyValue(entry.getKey(), newValue);
}
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<ConvertProcessor> {
@Override
public ConvertProcessor create(Map<String, Object> config) throws IOException {
Map<String, String> fields = ConfigurationUtils.readMap(config, "fields");
Map<String, Type> convertFields = new HashMap<>();
for (Map.Entry<String, String> entry : fields.entrySet()) {
convertFields.put(entry.getKey(), Type.fromString(entry.getValue()));
}
return new ConvertProcessor(Collections.unmodifiableMap(convertFields));
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.ingest.processor.mutate;
package org.elasticsearch.ingest.processor.gsub;
import java.util.Objects;
import java.util.regex.Pattern;

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.gsub;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Processor that allows to search for patterns in field content and replace them with corresponding string replacement.
* Support fields of string type only, throws exception if a field is of a different type.
*/
public class GsubProcessor implements Processor {
public static final String TYPE = "gsub";
private final List<GsubExpression> gsubExpressions;
GsubProcessor(List<GsubExpression> gsubExpressions) {
this.gsubExpressions = gsubExpressions;
}
List<GsubExpression> getGsubExpressions() {
return gsubExpressions;
}
@Override
public void execute(IngestDocument document) {
for (GsubExpression gsubExpression : gsubExpressions) {
String oldVal = document.getPropertyValue(gsubExpression.getFieldName(), String.class);
if (oldVal == null) {
throw new IllegalArgumentException("field [" + gsubExpression.getFieldName() + "] is null, cannot match pattern.");
}
Matcher matcher = gsubExpression.getPattern().matcher(oldVal);
String newVal = matcher.replaceAll(gsubExpression.getReplacement());
document.setPropertyValue(gsubExpression.getFieldName(), newVal);
}
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<GsubProcessor> {
@Override
public GsubProcessor create(Map<String, Object> config) throws IOException {
List<Map<String, String>> gsubConfig = ConfigurationUtils.readList(config, "expressions");
List<GsubExpression> gsubExpressions = new ArrayList<>();
for (Map<String, String> stringObjectMap : gsubConfig) {
String field = stringObjectMap.get("field");
if (field == null) {
throw new IllegalArgumentException("no [field] specified for gsub expression");
}
String pattern = stringObjectMap.get("pattern");
if (pattern == null) {
throw new IllegalArgumentException("no [pattern] specified for gsub expression");
}
String replacement = stringObjectMap.get("replacement");
if (replacement == null) {
throw new IllegalArgumentException("no [replacement] specified for gsub expression");
}
Pattern searchPattern = Pattern.compile(pattern);
gsubExpressions.add(new GsubExpression(field, searchPattern, replacement));
}
return new GsubProcessor(gsubExpressions);
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.join;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Processor that joins the different items of an array into a single string value using a separator between each item.
* Throws exception is the specified field is not an array.
*/
public class JoinProcessor implements Processor {
public static final String TYPE = "join";
private final Map<String, String> fields;
JoinProcessor(Map<String, String> fields) {
this.fields = fields;
}
Map<String, String> getFields() {
return fields;
}
@Override
public void execute(IngestDocument document) {
for(Map.Entry<String, String> entry : fields.entrySet()) {
List<?> list = document.getPropertyValue(entry.getKey(), List.class);
if (list == null) {
throw new IllegalArgumentException("field [" + entry.getKey() + "] is null, cannot join.");
}
String joined = list.stream()
.map(Object::toString)
.collect(Collectors.joining(entry.getValue()));
document.setPropertyValue(entry.getKey(), joined);
}
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<JoinProcessor> {
@Override
public JoinProcessor create(Map<String, Object> config) throws IOException {
Map<String, String> fields = ConfigurationUtils.readMap(config, "fields");
return new JoinProcessor(Collections.unmodifiableMap(fields));
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.lowercase;
import org.elasticsearch.ingest.processor.AbstractStringProcessor;
import java.util.Collection;
import java.util.Locale;
/**
* Processor that converts the content of string fields to lowercase.
* Throws exception is the field is not of type string.
*/
public class LowercaseProcessor extends AbstractStringProcessor {
public static final String TYPE = "lowercase";
LowercaseProcessor(Collection<String> fields) {
super(fields);
}
@Override
protected String process(String value) {
return value.toLowerCase(Locale.ROOT);
}
@Override
public String getType() {
return TYPE;
}
public static class Factory extends AbstractStringProcessor.Factory<LowercaseProcessor> {
@Override
protected LowercaseProcessor newProcessor(Collection<String> fields) {
return new LowercaseProcessor(fields);
}
}
}

View File

@ -1,329 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.mutate;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public final class MutateProcessor implements Processor {
public static final String TYPE = "mutate";
private final Map<String, Object> update;
private final Map<String, String> rename;
private final Map<String, String> convert;
private final Map<String, String> split;
private final List<GsubExpression> gsub;
private final Map<String, String> join;
private final List<String> remove;
private final List<String> trim;
private final List<String> uppercase;
private final List<String> lowercase;
MutateProcessor(Map<String, Object> update, Map<String, String> rename, Map<String, String> convert,
Map<String, String> split, List<GsubExpression> gsub, Map<String, String> join,
List<String> remove, List<String> trim, List<String> uppercase, List<String> lowercase) {
this.update = update;
this.rename = rename;
this.convert = convert;
this.split = split;
this.gsub = gsub;
this.join = join;
this.remove = remove;
this.trim = trim;
this.uppercase = uppercase;
this.lowercase = lowercase;
}
Map<String, Object> getUpdate() {
return update;
}
Map<String, String> getRename() {
return rename;
}
Map<String, String> getConvert() {
return convert;
}
Map<String, String> getSplit() {
return split;
}
List<GsubExpression> getGsub() {
return gsub;
}
Map<String, String> getJoin() {
return join;
}
List<String> getRemove() {
return remove;
}
List<String> getTrim() {
return trim;
}
List<String> getUppercase() {
return uppercase;
}
List<String> getLowercase() {
return lowercase;
}
@Override
public void execute(IngestDocument ingestDocument) {
if (update != null) {
doUpdate(ingestDocument);
}
if (rename != null) {
doRename(ingestDocument);
}
if (convert != null) {
doConvert(ingestDocument);
}
if (split != null) {
doSplit(ingestDocument);
}
if (gsub != null) {
doGsub(ingestDocument);
}
if (join != null) {
doJoin(ingestDocument);
}
if (remove != null) {
doRemove(ingestDocument);
}
if (trim != null) {
doTrim(ingestDocument);
}
if (uppercase != null) {
doUppercase(ingestDocument);
}
if (lowercase != null) {
doLowercase(ingestDocument);
}
}
@Override
public String getType() {
return TYPE;
}
private void doUpdate(IngestDocument ingestDocument) {
for(Map.Entry<String, Object> entry : update.entrySet()) {
ingestDocument.setPropertyValue(entry.getKey(), entry.getValue());
}
}
private void doRename(IngestDocument ingestDocument) {
for(Map.Entry<String, String> entry : rename.entrySet()) {
if (ingestDocument.hasPropertyValue(entry.getKey())) {
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
ingestDocument.getSource().remove(entry.getKey());
ingestDocument.setPropertyValue(entry.getValue(), oldVal);
}
}
}
private Object parseValueAsType(Object oldVal, String toType) {
switch (toType) {
case "integer":
oldVal = Integer.parseInt(oldVal.toString());
break;
case "float":
oldVal = Float.parseFloat(oldVal.toString());
break;
case "string":
oldVal = oldVal.toString();
break;
case "boolean":
// TODO(talevy): Booleans#parseBoolean depends on Elasticsearch, should be moved into dedicated library.
oldVal = Booleans.parseBoolean(oldVal.toString(), false);
}
return oldVal;
}
@SuppressWarnings("unchecked")
private void doConvert(IngestDocument ingestDocument) {
for(Map.Entry<String, String> entry : convert.entrySet()) {
String toType = entry.getValue();
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
Object newVal;
if (oldVal instanceof List) {
newVal = new ArrayList<>();
for (Object e : ((List<Object>) oldVal)) {
((List<Object>) newVal).add(parseValueAsType(e, toType));
}
} else {
if (oldVal == null) {
throw new IllegalArgumentException("Field \"" + entry.getKey() + "\" is null, cannot be converted to a/an " + toType);
}
newVal = parseValueAsType(oldVal, toType);
}
ingestDocument.setPropertyValue(entry.getKey(), newVal);
}
}
private void doSplit(IngestDocument ingestDocument) {
for(Map.Entry<String, String> entry : split.entrySet()) {
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
if (oldVal == null) {
throw new IllegalArgumentException("Cannot split field. [" + entry.getKey() + "] is null.");
} else if (oldVal instanceof String) {
ingestDocument.setPropertyValue(entry.getKey(), Arrays.asList(((String) oldVal).split(entry.getValue())));
} else {
throw new IllegalArgumentException("Cannot split a field that is not a String type");
}
}
}
private void doGsub(IngestDocument ingestDocument) {
for (GsubExpression gsubExpression : gsub) {
String oldVal = ingestDocument.getPropertyValue(gsubExpression.getFieldName(), String.class);
if (oldVal == null) {
throw new IllegalArgumentException("Field \"" + gsubExpression.getFieldName() + "\" is null, cannot match pattern.");
}
Matcher matcher = gsubExpression.getPattern().matcher(oldVal);
String newVal = matcher.replaceAll(gsubExpression.getReplacement());
ingestDocument.setPropertyValue(gsubExpression.getFieldName(), newVal);
}
}
@SuppressWarnings("unchecked")
private void doJoin(IngestDocument ingestDocument) {
for(Map.Entry<String, String> entry : join.entrySet()) {
Object oldVal = ingestDocument.getPropertyValue(entry.getKey(), Object.class);
if (oldVal instanceof List) {
String joined = (String) ((List) oldVal)
.stream()
.map(Object::toString)
.collect(Collectors.joining(entry.getValue()));
ingestDocument.setPropertyValue(entry.getKey(), joined);
} else {
throw new IllegalArgumentException("Cannot join field:" + entry.getKey() + " with type: " + oldVal.getClass());
}
}
}
private void doRemove(IngestDocument ingestDocument) {
for(String field : remove) {
ingestDocument.getSource().remove(field);
}
}
private void doTrim(IngestDocument ingestDocument) {
for(String field : trim) {
Object val = ingestDocument.getPropertyValue(field, Object.class);
if (val == null) {
throw new IllegalArgumentException("Cannot trim field. [" + field + "] is null.");
} else if (val instanceof String) {
ingestDocument.setPropertyValue(field, ((String) val).trim());
} else {
throw new IllegalArgumentException("Cannot trim field:" + field + " with type: " + val.getClass());
}
}
}
private void doUppercase(IngestDocument ingestDocument) {
for(String field : uppercase) {
Object val = ingestDocument.getPropertyValue(field, Object.class);
if (val == null) {
throw new IllegalArgumentException("Cannot uppercase field. [" + field + "] is null.");
} else if (val instanceof String) {
ingestDocument.setPropertyValue(field, ((String) val).toUpperCase(Locale.ROOT));
} else {
throw new IllegalArgumentException("Cannot uppercase field:" + field + " with type: " + val.getClass());
}
}
}
private void doLowercase(IngestDocument ingestDocument) {
for(String field : lowercase) {
Object val = ingestDocument.getPropertyValue(field, Object.class);
if (val == null) {
throw new IllegalArgumentException("Cannot lowercase field. [" + field + "] is null.");
} else if (val instanceof String) {
ingestDocument.setPropertyValue(field, ((String) val).toLowerCase(Locale.ROOT));
} else {
throw new IllegalArgumentException("Cannot lowercase field:" + field + " with type: " + val.getClass());
}
}
}
public static final class Factory implements Processor.Factory<MutateProcessor> {
@Override
public MutateProcessor create(Map<String, Object> config) throws IOException {
Map<String, Object> update = ConfigurationUtils.readOptionalMap(config, "update");
Map<String, String> rename = ConfigurationUtils.readOptionalMap(config, "rename");
Map<String, String> convert = ConfigurationUtils.readOptionalMap(config, "convert");
Map<String, String> split = ConfigurationUtils.readOptionalMap(config, "split");
Map<String, List<String>> gsubConfig = ConfigurationUtils.readOptionalMap(config, "gsub");
Map<String, String> join = ConfigurationUtils.readOptionalMap(config, "join");
List<String> remove = ConfigurationUtils.readOptionalList(config, "remove");
List<String> trim = ConfigurationUtils.readOptionalList(config, "trim");
List<String> uppercase = ConfigurationUtils.readOptionalList(config, "uppercase");
List<String> lowercase = ConfigurationUtils.readOptionalList(config, "lowercase");
// pre-compile regex patterns
List<GsubExpression> gsubExpressions = null;
if (gsubConfig != null) {
gsubExpressions = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : gsubConfig.entrySet()) {
List<String> searchAndReplace = entry.getValue();
if (searchAndReplace.size() != 2) {
throw new IllegalArgumentException("Invalid search and replace values " + searchAndReplace + " for field: " + entry.getKey());
}
Pattern searchPattern = Pattern.compile(searchAndReplace.get(0));
gsubExpressions.add(new GsubExpression(entry.getKey(), searchPattern, searchAndReplace.get(1)));
}
}
return new MutateProcessor(
(update == null) ? null : Collections.unmodifiableMap(update),
(rename == null) ? null : Collections.unmodifiableMap(rename),
(convert == null) ? null : Collections.unmodifiableMap(convert),
(split == null) ? null : Collections.unmodifiableMap(split),
(gsubExpressions == null) ? null : Collections.unmodifiableList(gsubExpressions),
(join == null) ? null : Collections.unmodifiableMap(join),
(remove == null) ? null : Collections.unmodifiableList(remove),
(trim == null) ? null : Collections.unmodifiableList(trim),
(uppercase == null) ? null : Collections.unmodifiableList(uppercase),
(lowercase == null) ? null : Collections.unmodifiableList(lowercase));
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.remove;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Processor that removes existing fields. Nothing happens if the field is not present.
*/
public class RemoveProcessor implements Processor {
public static final String TYPE = "remove";
private final Collection<String> fields;
RemoveProcessor(Collection<String> fields) {
this.fields = fields;
}
Collection<String> getFields() {
return fields;
}
@Override
public void execute(IngestDocument document) {
for(String field : fields) {
document.removeProperty(field);
}
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<RemoveProcessor> {
@Override
public RemoveProcessor create(Map<String, Object> config) throws IOException {
List<String> fields = ConfigurationUtils.readList(config, "fields");
return new RemoveProcessor(Collections.unmodifiableList(fields));
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.rename;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
* Processor that allows to rename existing fields. Will throw exception if the field is not present.
*/
public class RenameProcessor implements Processor {
public static final String TYPE = "rename";
private final Map<String, String> fields;
RenameProcessor(Map<String, String> fields) {
this.fields = fields;
}
Map<String, String> getFields() {
return fields;
}
@Override
public void execute(IngestDocument document) {
for(Map.Entry<String, String> entry : fields.entrySet()) {
if (document.hasPropertyValue(entry.getKey())) {
if (document.hasPropertyValue(entry.getKey()) == false) {
throw new IllegalArgumentException("field [" + entry.getKey() + "] doesn't exist");
}
Object oldValue = document.getPropertyValue(entry.getKey(), Object.class);
document.removeProperty(entry.getKey());
document.setPropertyValue(entry.getValue(), oldValue);
}
}
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<RenameProcessor> {
@Override
public RenameProcessor create(Map<String, Object> config) throws IOException {
Map<String, String> fields = ConfigurationUtils.readMap(config, "fields");
return new RenameProcessor(Collections.unmodifiableMap(fields));
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.split;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
/**
* Processor that splits fields content into different items based on the occurrence of a specified separator.
* New field value will be an array containing all of the different extracted items.
* Throws exception if the field is null or a type other than string.
*/
public class SplitProcessor implements Processor {
public static final String TYPE = "split";
private final Map<String, String> fields;
SplitProcessor(Map<String, String> fields) {
this.fields = fields;
}
Map<String, String> getFields() {
return fields;
}
@Override
public void execute(IngestDocument document) {
for(Map.Entry<String, String> entry : fields.entrySet()) {
String oldVal = document.getPropertyValue(entry.getKey(), String.class);
if (oldVal == null) {
throw new IllegalArgumentException("field [" + entry.getKey() + "] is null, cannot split.");
}
document.setPropertyValue(entry.getKey(), Arrays.asList(oldVal.split(entry.getValue())));
}
}
@Override
public String getType() {
return TYPE;
}
public static class Factory implements Processor.Factory<SplitProcessor> {
@Override
public SplitProcessor create(Map<String, Object> config) throws IOException {
Map<String, String> fields = ConfigurationUtils.readMap(config, "fields");
return new SplitProcessor(Collections.unmodifiableMap(fields));
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.trim;
import org.elasticsearch.ingest.processor.AbstractStringProcessor;
import java.util.Collection;
/**
* Processor that trims the content of string fields.
* Throws exception is the field is not of type string.
*/
public class TrimProcessor extends AbstractStringProcessor {
public static final String TYPE = "trim";
TrimProcessor(Collection<String> fields) {
super(fields);
}
@Override
protected String process(String value) {
return value.trim();
}
@Override
public String getType() {
return TYPE;
}
public static class Factory extends AbstractStringProcessor.Factory<TrimProcessor> {
@Override
protected TrimProcessor newProcessor(Collection<String> fields) {
return new TrimProcessor(fields);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.uppercase;
import org.elasticsearch.ingest.processor.AbstractStringProcessor;
import java.util.Collection;
import java.util.Locale;
/**
* Processor that converts the content of string fields to uppercase.
* Throws exception is the field is not of type string.
*/
public class UppercaseProcessor extends AbstractStringProcessor {
public static final String TYPE = "uppercase";
UppercaseProcessor(Collection<String> fields) {
super(fields);
}
@Override
protected String process(String value) {
return value.toUpperCase(Locale.ROOT);
}
@Override
public String getType() {
return TYPE;
}
public static class Factory extends AbstractStringProcessor.Factory<UppercaseProcessor> {
@Override
protected UppercaseProcessor newProcessor(Collection<String> fields) {
return new UppercaseProcessor(fields);
}
}
}

View File

@ -22,10 +22,19 @@ package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.add.AddProcessor;
import org.elasticsearch.ingest.processor.convert.ConvertProcessor;
import org.elasticsearch.ingest.processor.date.DateProcessor;
import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
import org.elasticsearch.ingest.processor.grok.GrokProcessor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.ingest.processor.gsub.GsubProcessor;
import org.elasticsearch.ingest.processor.join.JoinProcessor;
import org.elasticsearch.ingest.processor.lowercase.LowercaseProcessor;
import org.elasticsearch.ingest.processor.remove.RemoveProcessor;
import org.elasticsearch.ingest.processor.rename.RenameProcessor;
import org.elasticsearch.ingest.processor.split.SplitProcessor;
import org.elasticsearch.ingest.processor.trim.TrimProcessor;
import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService;
@ -47,7 +56,16 @@ public class IngestModule extends AbstractModule {
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
addProcessor(DateProcessor.TYPE, new DateProcessor.Factory());
addProcessor(MutateProcessor.TYPE, new MutateProcessor.Factory());
addProcessor(AddProcessor.TYPE, new AddProcessor.Factory());
addProcessor(RenameProcessor.TYPE, new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, new RemoveProcessor.Factory());
addProcessor(SplitProcessor.TYPE, new SplitProcessor.Factory());
addProcessor(JoinProcessor.TYPE, new JoinProcessor.Factory());
addProcessor(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory());
addProcessor(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory());
addProcessor(TrimProcessor.TYPE, new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, new GsubProcessor.Factory());
MapBinder<String, Processor.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
for (Map.Entry<String, Processor.Factory> entry : processors.entrySet()) {

View File

@ -20,54 +20,55 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PipelineFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mutate", new MutateProcessor.Factory());
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("uppercase", Arrays.asList("field1"));
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig)));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorRegistry.put("test", processorFactory);
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0), instanceOf(MutateProcessor.class));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor"));
}
public void testCreate_unusedProcessorOptions() throws Exception {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mutate", new MutateProcessor.Factory());
public void testCreateUnusedProcessorOptions() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("uppercase", Arrays.asList("field1"));
processorConfig.put("foo", "bar");
processorConfig.put("unused", "value");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig)));
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorRegistry.put("test", processorFactory);
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("processor [mutate] doesn't support one or more provided configuration parameters [[foo]]"));
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.elasticsearch.common.Strings;
import java.util.*;
public final class RandomDocumentPicks {
private RandomDocumentPicks() {
}
/**
* Returns a random field name. Can be a leaf field name or the
* path to refer to a field name using the dot notation.
*/
public static String randomFieldName(Random random) {
int numLevels = RandomInts.randomIntBetween(random, 1, 5);
String fieldName = "";
for (int i = 0; i < numLevels; i++) {
if (i > 0) {
fieldName += ".";
}
fieldName += randomString(random);
}
return fieldName;
}
/**
* Returns a random leaf field name.
*/
public static String randomLeafFieldName(Random random) {
String fieldName;
do {
fieldName = randomString(random);
} while (fieldName.contains("."));
return fieldName;
}
/**
* Returns a randomly selected existing field name out of the fields that are contained
* in the document provided as an argument.
*/
public static String randomExistingFieldName(Random random, IngestDocument ingestDocument) {
Map<String, Object> source = new TreeMap<>(ingestDocument.getSource());
Map.Entry<String, Object> randomEntry = RandomPicks.randomFrom(random, source.entrySet());
String key = randomEntry.getKey();
while (randomEntry.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) randomEntry.getValue();
Map<String, Object> treeMap = new TreeMap<>(map);
randomEntry = RandomPicks.randomFrom(random, treeMap.entrySet());
key += "." + randomEntry.getKey();
}
assert ingestDocument.getPropertyValue(key, Object.class) != null;
return key;
}
/**
* Adds a random non existing field to the provided document and associates it
* with the provided value. The field will be added at a random position within the document,
* not necessarily at the top level using a leaf field name.
*/
public static String addRandomField(Random random, IngestDocument ingestDocument, Object value) {
String fieldName;
do {
fieldName = randomFieldName(random);
} while (canAddField(fieldName, ingestDocument) == false);
ingestDocument.setPropertyValue(fieldName, value);
return fieldName;
}
/**
* Checks whether the provided field name can be safely added to the provided document.
* When the provided field name holds the path using the dot notation, we have to make sure
* that each node of the tree either doesn't exist or is a map, otherwise new fields cannot be added.
*/
public static boolean canAddField(String path, IngestDocument ingestDocument) {
String[] pathElements = Strings.splitStringToArray(path, '.');
Map<String, Object> innerMap = ingestDocument.getSource();
if (pathElements.length > 1) {
for (int i = 0; i < pathElements.length - 1; i++) {
Object currentLevel = innerMap.get(pathElements[i]);
if (currentLevel == null) {
return true;
}
if (currentLevel instanceof Map == false) {
return false;
}
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) currentLevel;
innerMap = map;
}
}
String leafKey = pathElements[pathElements.length - 1];
return innerMap.containsKey(leafKey) == false;
}
/**
* Generates a random document and random metadata
*/
public static IngestDocument randomIngestDocument(Random random) {
return randomIngestDocument(random, randomDocument(random));
}
/**
* Generates a document that holds random metadata and the document provided as a map argument
*/
public static IngestDocument randomIngestDocument(Random random, Map<String, Object> document) {
String index = randomString(random);
String type = randomString(random);
String id = randomString(random);
return new IngestDocument(index, type, id, document);
}
private static Map<String, Object> randomDocument(Random random) {
Map<String, Object> document = new HashMap<>();
addRandomFields(random, document, 0);
return document;
}
/**
* Generates a random field value, can be a string, a number, a list of an object itself.
*/
public static Object randomFieldValue(Random random) {
return randomFieldValue(random, 0);
}
private static Object randomFieldValue(Random random, int currentDepth) {
switch(RandomInts.randomIntBetween(random, 0, 8)) {
case 0:
return randomString(random);
case 1:
return random.nextInt();
case 2:
return random.nextBoolean();
case 3:
return random.nextDouble();
case 4:
List<String> stringList = new ArrayList<>();
int numStringItems = RandomInts.randomIntBetween(random, 1, 10);
for (int j = 0; j < numStringItems; j++) {
stringList.add(randomString(random));
}
return stringList;
case 5:
List<Integer> intList = new ArrayList<>();
int numIntItems = RandomInts.randomIntBetween(random, 1, 10);
for (int j = 0; j < numIntItems; j++) {
intList.add(random.nextInt());
}
return intList;
case 6:
List<Boolean> booleanList = new ArrayList<>();
int numBooleanItems = RandomInts.randomIntBetween(random, 1, 10);
for (int j = 0; j < numBooleanItems; j++) {
booleanList.add(random.nextBoolean());
}
return booleanList;
case 7:
List<Double> doubleList = new ArrayList<>();
int numDoubleItems = RandomInts.randomIntBetween(random, 1, 10);
for (int j = 0; j < numDoubleItems; j++) {
doubleList.add(random.nextDouble());
}
return doubleList;
case 8:
Map<String, Object> newNode = new HashMap<>();
addRandomFields(random, newNode, ++currentDepth);
return newNode;
default:
throw new UnsupportedOperationException();
}
}
public static String randomString(Random random) {
if (random.nextBoolean()) {
return RandomStrings.randomAsciiOfLengthBetween(random, 1, 10);
}
return RandomStrings.randomUnicodeOfCodepointLengthBetween(random, 1, 10);
}
private static void addRandomFields(Random random, Map<String, Object> parentNode, int currentDepth) {
if (currentDepth > 5) {
return;
}
int numFields = RandomInts.randomIntBetween(random, 1, 10);
for (int i = 0; i < numFields; i++) {
String fieldName = randomLeafFieldName(random);
Object fieldValue = randomFieldValue(random, currentDepth);
parentNode.put(fieldName, fieldValue);
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public abstract class AbstractStringProcessorTestCase extends ESTestCase {
protected abstract AbstractStringProcessor newProcessor(Collection<String> fields);
protected String modifyInput(String input) {
return input;
}
protected abstract String expectedResult(String input);
public void testProcessor() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Map<String, String> expected = new HashMap<>();
for (int i = 0; i < numFields; i++) {
String fieldValue = RandomDocumentPicks.randomString(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, modifyInput(fieldValue));
expected.put(fieldName, expectedResult(fieldValue));
}
Processor processor = newProcessor(expected.keySet());
processor.execute(ingestDocument);
for (Map.Entry<String, String> entry : expected.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), String.class), equalTo(entry.getValue()));
}
}
public void testNullValue() throws IOException {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = newProcessor(Collections.singletonList(fieldName));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
try {
processor.execute(ingestDocument);
fail("processor should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it."));
}
}
public void testNonStringValue() throws IOException {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = newProcessor(Collections.singletonList(fieldName));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setPropertyValue(fieldName, randomInt());
try {
processor.execute(ingestDocument);
fail("processor should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.add;
import org.elasticsearch.ingest.processor.join.JoinProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class AddProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
AddProcessor.Factory factory = new AddProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, String> fields = Collections.singletonMap("field1", "value1");
config.put("fields", fields);
AddProcessor addProcessor = factory.create(config);
assertThat(addProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
AddProcessor.Factory factory = new AddProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.add;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
public class AddProcessorTests extends ESTestCase {
public void testAddExistingFields() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Map<String, Object> fields = new HashMap<>();
for (int i = 0; i < numFields; i++) {
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
fields.put(fieldName, fieldValue);
}
Processor processor = new AddProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, Object> field : fields.entrySet()) {
assertThat(ingestDocument.hasPropertyValue(field.getKey()), equalTo(true));
assertThat(ingestDocument.getPropertyValue(field.getKey(), Object.class), equalTo(field.getValue()));
}
}
public void testAddNewFields() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
//used to verify that there are no conflicts between subsequent fields going to be added
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
int numFields = randomIntBetween(1, 5);
Map<String, Object> fields = new HashMap<>();
for (int i = 0; i < numFields; i++) {
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
fields.put(fieldName, fieldValue);
}
Processor processor = new AddProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, Object> field : fields.entrySet()) {
assertThat(ingestDocument.hasPropertyValue(field.getKey()), equalTo(true));
assertThat(ingestDocument.getPropertyValue(field.getKey(), Object.class), equalTo(field.getValue()));
}
}
public void testAddFieldsTypeMismatch() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setPropertyValue("field", "value");
Processor processor = new AddProcessor(Collections.singletonMap("field.inner", "value"));
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("cannot add field to parent [field] of type [java.lang.String], [java.util.Map] expected instead."));
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.convert;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class ConvertProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
ConvertProcessor.Factory factory = new ConvertProcessor.Factory();
Map<String, Object> config = new HashMap<>();
ConvertProcessor.Type type = randomFrom(ConvertProcessor.Type.values());
Map<String, String> fields = Collections.singletonMap("field1", type.toString());
config.put("fields", fields);
ConvertProcessor convertProcessor = factory.create(config);
assertThat(convertProcessor.getFields().size(), equalTo(1));
assertThat(convertProcessor.getFields().get("field1"), equalTo(type));
}
public void testCreateMissingFields() throws IOException {
ConvertProcessor.Factory factory = new ConvertProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
public void testCreateUnsupportedType() throws IOException {
ConvertProcessor.Factory factory = new ConvertProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String type = "type-" + randomAsciiOfLengthBetween(1, 10);
Map<String, String> fields = Collections.singletonMap("field1", type);
config.put("fields", fields);
try {
factory.create(config);
fail("factory create should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), Matchers.equalTo("type [" + type + "] not supported, cannot convert field."));
}
}
}

View File

@ -0,0 +1,325 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.convert;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.ingest.processor.convert.ConvertProcessor.*;
import static org.hamcrest.Matchers.equalTo;
public class ConvertProcessorTests extends ESTestCase {
public void testConvertInt() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, ConvertProcessor.Type> fields = new HashMap<>();
Map<String, Integer> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int randomInt = randomInt();
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, randomInt);
fields.put(fieldName, Type.INTEGER);
expectedResult.put(fieldName, randomInt);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, Integer> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), Integer.class), equalTo(entry.getValue()));
}
}
public void testConvertIntList() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, ConvertProcessor.Type> fields = new HashMap<>();
Map<String, List<Integer>> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>();
List<Integer> expectedList = new ArrayList<>();
for (int j = 0; j < numItems; j++) {
int randomInt = randomInt();
fieldValue.add(Integer.toString(randomInt));
expectedList.add(randomInt);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
fields.put(fieldName, Type.INTEGER);
expectedResult.put(fieldName, expectedList);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, List<Integer>> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), List.class), equalTo(entry.getValue()));
}
}
public void testConvertIntError() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
String value = "string-" + randomAsciiOfLengthBetween(1, 10);
ingestDocument.setPropertyValue(fieldName, value);
Map<String, Type> convert = Collections.singletonMap(fieldName, Type.INTEGER);
Processor processor = new ConvertProcessor(convert);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("unable to convert [" + value + "] to integer"));
}
}
public void testConvertFloat() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, Float> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
float randomFloat = randomFloat();
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, randomFloat);
fields.put(fieldName, Type.FLOAT);
expectedResult.put(fieldName, randomFloat);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, Float> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), Float.class), equalTo(entry.getValue()));
}
}
public void testConvertFloatList() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, List<Float>> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>();
List<Float> expectedList = new ArrayList<>();
for (int j = 0; j < numItems; j++) {
float randomFloat = randomFloat();
fieldValue.add(Float.toString(randomFloat));
expectedList.add(randomFloat);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
fields.put(fieldName, Type.FLOAT);
expectedResult.put(fieldName, expectedList);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, List<Float>> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), List.class), equalTo(entry.getValue()));
}
}
public void testConvertFloatError() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
String value = "string-" + randomAsciiOfLengthBetween(1, 10);
ingestDocument.setPropertyValue(fieldName, value);
Map<String, Type> convert = Collections.singletonMap(fieldName, Type.FLOAT);
Processor processor = new ConvertProcessor(convert);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("unable to convert [" + value + "] to float"));
}
}
public void testConvertBoolean() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, Boolean> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
boolean randomBoolean = randomBoolean();
String booleanString = Boolean.toString(randomBoolean);
if (randomBoolean) {
booleanString = booleanString.toUpperCase(Locale.ROOT);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, booleanString);
fields.put(fieldName, Type.BOOLEAN);
expectedResult.put(fieldName, randomBoolean);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, Boolean> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), Boolean.class), equalTo(entry.getValue()));
}
}
public void testConvertBooleanList() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, List<Boolean>> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>();
List<Boolean> expectedList = new ArrayList<>();
for (int j = 0; j < numItems; j++) {
boolean randomBoolean = randomBoolean();
String booleanString = Boolean.toString(randomBoolean);
if (randomBoolean) {
booleanString = booleanString.toUpperCase(Locale.ROOT);
}
fieldValue.add(booleanString);
expectedList.add(randomBoolean);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
fields.put(fieldName, Type.BOOLEAN);
expectedResult.put(fieldName, expectedList);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, List<Boolean>> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), List.class), equalTo(entry.getValue()));
}
}
public void testConvertBooleanError() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
String fieldValue;
if (randomBoolean()) {
fieldValue = "string-" + randomAsciiOfLengthBetween(1, 10);
} else {
//verify that only proper boolean values are supported and we are strict about it
fieldValue = randomFrom("on", "off", "yes", "no", "0", "1");
}
ingestDocument.setPropertyValue(fieldName, fieldValue);
Map<String, Type> convert = Collections.singletonMap(fieldName, Type.BOOLEAN);
Processor processor = new ConvertProcessor(convert);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch(Exception e) {
assertThat(e.getMessage(), equalTo("[" + fieldValue + "] is not a boolean value, cannot convert to boolean"));
}
}
public void testConvertString() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, String> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
Object fieldValue;
String expectedFieldValue;
switch(randomIntBetween(0, 2)) {
case 0:
float randomFloat = randomFloat();
fieldValue = randomFloat;
expectedFieldValue = Float.toString(randomFloat);
break;
case 1:
int randomInt = randomInt();
fieldValue = randomInt;
expectedFieldValue = Integer.toString(randomInt);
break;
case 2:
boolean randomBoolean = randomBoolean();
fieldValue = randomBoolean;
expectedFieldValue = Boolean.toString(randomBoolean);
break;
default:
throw new UnsupportedOperationException();
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
fields.put(fieldName, Type.STRING);
expectedResult.put(fieldName, expectedFieldValue);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, String> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), String.class), equalTo(entry.getValue()));
}
}
public void testConvertStringList() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, List<String>> expectedResult = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int numItems = randomIntBetween(1, 10);
List<Object> fieldValue = new ArrayList<>();
List<String> expectedList = new ArrayList<>();
for (int j = 0; j < numItems; j++) {
Object randomValue;
String randomValueString;
switch(randomIntBetween(0, 2)) {
case 0:
float randomFloat = randomFloat();
randomValue = randomFloat;
randomValueString = Float.toString(randomFloat);
break;
case 1:
int randomInt = randomInt();
randomValue = randomInt;
randomValueString = Integer.toString(randomInt);
break;
case 2:
boolean randomBoolean = randomBoolean();
randomValue = randomBoolean;
randomValueString = Boolean.toString(randomBoolean);
break;
default:
throw new UnsupportedOperationException();
}
fieldValue.add(randomValue);
expectedList.add(randomValueString);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
fields.put(fieldName, Type.STRING);
expectedResult.put(fieldName, expectedList);
}
Processor processor = new ConvertProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, List<String>> entry : expectedResult.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), List.class), equalTo(entry.getValue()));
}
}
public void testConvertNullField() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Type type = randomFrom(Type.values());
Map<String, Type> convert = Collections.singletonMap(fieldName, type);
Processor processor = new ConvertProcessor(convert);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Field [" + fieldName + "] is null, cannot be converted to type [" + type + "]"));
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.gsub;
import org.elasticsearch.ingest.processor.join.JoinProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class GsubProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
GsubProcessor.Factory factory = new GsubProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<Map<String, String>> expressions = new ArrayList<>();
Map<String, String> expression = new HashMap<>();
expression.put("field", "field1");
expression.put("pattern", "\\.");
expression.put("replacement", "-");
expressions.add(expression);
config.put("expressions", expressions);
GsubProcessor gsubProcessor = factory.create(config);
assertThat(gsubProcessor.getGsubExpressions().size(), equalTo(1));
GsubExpression gsubExpression = gsubProcessor.getGsubExpressions().get(0);
assertThat(gsubExpression.getFieldName(), equalTo("field1"));
assertThat(gsubExpression.getPattern().toString(), equalTo("\\."));
assertThat(gsubExpression.getReplacement(), equalTo("-"));
}
public void testCreateMissingExpressions() throws IOException {
GsubProcessor.Factory factory = new GsubProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [expressions] is missing"));
}
}
public void testCreateNoFieldPresent() throws IOException {
GsubProcessor.Factory factory = new GsubProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<Map<String, String>> expressions = new ArrayList<>();
Map<String, String> expression = new HashMap<>();
expression.put("pattern", "\\.");
expression.put("replacement", "-");
expressions.add(expression);
config.put("expressions", expressions);
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("no [field] specified for gsub expression"));
}
}
public void testCreateNoPatternPresent() throws IOException {
GsubProcessor.Factory factory = new GsubProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<Map<String, String>> expressions = new ArrayList<>();
Map<String, String> expression = new HashMap<>();
expression.put("field", "field1");
expression.put("replacement", "-");
expressions.add(expression);
config.put("expressions", expressions);
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("no [pattern] specified for gsub expression"));
}
}
public void testCreateNoReplacementPresent() throws IOException {
GsubProcessor.Factory factory = new GsubProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<Map<String, String>> expressions = new ArrayList<>();
Map<String, String> expression = new HashMap<>();
expression.put("field", "field1");
expression.put("pattern", "\\.");
expressions.add(expression);
config.put("expressions", expressions);
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("no [replacement] specified for gsub expression"));
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.gsub;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Pattern;
import static org.hamcrest.Matchers.equalTo;
public class GsubProcessorTests extends ESTestCase {
public void testGsub() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
List<GsubExpression> expressions = new ArrayList<>();
for (int i = 0; i < numFields; i++) {
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "127.0.0.1");
expressions.add(new GsubExpression(fieldName, Pattern.compile("\\."), "-"));
}
Processor processor = new GsubProcessor(expressions);
processor.execute(ingestDocument);
for (GsubExpression expression : expressions) {
assertThat(ingestDocument.getPropertyValue(expression.getFieldName(), String.class), equalTo("127-0-0-1"));
}
}
public void testGsubNotAStringValue() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setPropertyValue(fieldName, 123);
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression(fieldName, Pattern.compile("\\."), "-"));
Processor processor = new GsubProcessor(gsubExpressions);
try {
processor.execute(ingestDocument);
fail("processor execution should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
}
}
public void testGsubNullValue() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression(fieldName, Pattern.compile("\\."), "-"));
Processor processor = new GsubProcessor(gsubExpressions);
try {
processor.execute(ingestDocument);
fail("processor execution should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot match pattern."));
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.join;
import org.elasticsearch.ingest.processor.split.SplitProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class JoinProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
JoinProcessor.Factory factory = new JoinProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, String> fields = Collections.singletonMap("field1", "-");
config.put("fields", fields);
JoinProcessor joinProcessor = factory.create(config);
assertThat(joinProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
JoinProcessor.Factory factory = new JoinProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.join;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
public class JoinProcessorTests extends ESTestCase {
private static final String[] SEPARATORS = new String[]{"-", "_", "."};
public void testJoinStrings() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, String> fields = new HashMap<>();
Map<String, String> expectedResultMap = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int numItems = randomIntBetween(1, 10);
String separator = randomFrom(SEPARATORS);
List<String> fieldValue = new ArrayList<>(numItems);
String expectedResult = "";
for (int j = 0; j < numItems; j++) {
String value = randomAsciiOfLengthBetween(1, 10);
fieldValue.add(value);
expectedResult += value;
if (j < numItems - 1) {
expectedResult += separator;
}
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
expectedResultMap.put(fieldName, expectedResult);
fields.put(fieldName, separator);
}
Processor processor = new JoinProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, String> entry : expectedResultMap.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), String.class), equalTo(entry.getValue()));
}
}
public void testJoinIntegers() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, String> fields = new HashMap<>();
Map<String, String> expectedResultMap = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
int numItems = randomIntBetween(1, 10);
String separator = randomFrom(SEPARATORS);
List<Integer> fieldValue = new ArrayList<>(numItems);
String expectedResult = "";
for (int j = 0; j < numItems; j++) {
int value = randomInt();
fieldValue.add(value);
expectedResult += value;
if (j < numItems - 1) {
expectedResult += separator;
}
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
expectedResultMap.put(fieldName, expectedResult);
fields.put(fieldName, separator);
}
Processor processor = new JoinProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, String> entry : expectedResultMap.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), String.class), equalTo(entry.getValue()));
}
}
public void testJoinNonListField() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setPropertyValue(fieldName, randomAsciiOfLengthBetween(1, 10));
Map<String, String> join = Collections.singletonMap(fieldName, "-");
Processor processor = new JoinProcessor(join);
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.String] cannot be cast to [java.util.List]"));
}
}
public void testJoinNonExistingField() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new JoinProcessor(Collections.singletonMap(fieldName, "-"));
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot join."));
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.lowercase;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class LowercaseProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
LowercaseProcessor.Factory factory = new LowercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> fields = Collections.singletonList("field1");
config.put("fields", fields);
LowercaseProcessor uppercaseProcessor = factory.create(config);
assertThat(uppercaseProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
LowercaseProcessor.Factory factory = new LowercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.lowercase;
import org.elasticsearch.ingest.processor.AbstractStringProcessor;
import org.elasticsearch.ingest.processor.AbstractStringProcessorTestCase;
import java.util.Collection;
import java.util.Locale;
public class LowercaseProcessorTests extends AbstractStringProcessorTestCase {
@Override
protected AbstractStringProcessor newProcessor(Collection<String> fields) {
return new LowercaseProcessor(fields);
}
@Override
protected String expectedResult(String input) {
return input.toLowerCase(Locale.ROOT);
}
}

View File

@ -1,225 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.mutate;
import org.elasticsearch.test.ESTestCase;
import java.util.*;
import java.util.regex.Pattern;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class MutateProcessorFactoryTests extends ESTestCase {
public void testCreateUpdate() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, Object> update = new HashMap<>();
update.put("foo", 123);
config.put("update", update);
MutateProcessor processor = factory.create(config);
assertThat(processor.getRename(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getUpdate(), equalTo(update));
}
public void testCreateRename() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, Object> rename = new HashMap<>();
rename.put("foo", "bar");
config.put("rename", rename);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), equalTo(rename));
}
public void testCreateRemove() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> remove = Collections.singletonList("foo");
config.put("remove", remove);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getRemove(), equalTo(remove));
}
public void testCreateConvert() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, Object> convert = new HashMap<>();
convert.put("foo", "integer");
config.put("convert", convert);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getConvert(), equalTo(convert));
}
public void testCreateJoin() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, Object> join = new HashMap<>();
join.put("foo", "bar");
config.put("join", join);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getJoin(), equalTo(join));
}
public void testCreateSplit() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, Object> split = new HashMap<>();
split.put("foo", "bar");
config.put("split", split);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getSplit(), equalTo(split));
}
public void testCreateLowercase() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> lowercase = Collections.singletonList("foo");
config.put("lowercase", lowercase);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getLowercase(), equalTo(lowercase));
}
public void testCreateUppercase() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> uppercase = Collections.singletonList("foo");
config.put("uppercase", uppercase);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getTrim(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getUppercase(), equalTo(uppercase));
}
public void testCreateTrim() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> trim = Collections.singletonList("foo");
config.put("trim", trim);
MutateProcessor processor = factory.create(config);
assertThat(processor.getUpdate(), nullValue());
assertThat(processor.getGsub(), nullValue());
assertThat(processor.getConvert(), nullValue());
assertThat(processor.getJoin(), nullValue());
assertThat(processor.getRemove(), nullValue());
assertThat(processor.getUppercase(), nullValue());
assertThat(processor.getSplit(), nullValue());
assertThat(processor.getLowercase(), nullValue());
assertThat(processor.getRename(), nullValue());
assertThat(processor.getTrim(), equalTo(trim));
}
public void testCreateGsubPattern() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, List<String>> gsub = new HashMap<>();
gsub.put("foo", Arrays.asList("\\s.*e\\s", "<word_ending_with_e>"));
config.put("gsub", gsub);
MutateProcessor processor = factory.create(config);
assertThat(processor.getGsub().size(), equalTo(1));
assertThat(processor.getGsub().get(0), equalTo(new GsubExpression("foo", Pattern.compile("\\s.*e\\s"), "<word_ending_with_e>")));
}
public void testCreateGsubPatternInvalidFormat() throws Exception {
MutateProcessor.Factory factory = new MutateProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, List<String>> gsub = new HashMap<>();
gsub.put("foo", Collections.singletonList("only_one"));
config.put("gsub", gsub);
try {
factory.create(config);
fail("processor creation should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Invalid search and replace values [only_one] for field: foo"));
}
}
}

View File

@ -1,219 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.mutate;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.*;
import java.util.regex.Pattern;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class MutateProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
@Before
public void setData() {
Map<String, Object> document = new HashMap<>();
document.put("foo", "bar");
document.put("alpha", "aBcD");
document.put("num", "64");
document.put("to_strip", " clean ");
document.put("arr", Arrays.asList("1", "2", "3"));
document.put("ip", "127.0.0.1");
Map<String, Object> fizz = new HashMap<>();
fizz.put("buzz", "hello world");
document.put("fizz", fizz);
ingestDocument = new IngestDocument("index", "type", "id", document);
}
public void testUpdate() throws IOException {
Map<String, Object> update = new HashMap<>();
update.put("foo", 123);
Processor processor = new MutateProcessor(update, null, null, null, null, null, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("foo", Integer.class), equalTo(123));
}
public void testRename() throws IOException {
Map<String, String> rename = new HashMap<>();
rename.put("foo", "bar");
Processor processor = new MutateProcessor(null, rename, null, null, null, null, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("bar", String.class), equalTo("bar"));
assertThat(ingestDocument.hasPropertyValue("foo"), is(false));
}
public void testConvert() throws IOException {
Map<String, String> convert = new HashMap<>();
convert.put("num", "integer");
Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("num", Integer.class), equalTo(64));
}
public void testConvertNullField() throws IOException {
Map<String, String> convert = new HashMap<>();
convert.put("null", "integer");
Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Field \"null\" is null, cannot be converted to a/an integer"));
}
}
public void testConvertList() throws IOException {
Map<String, String> convert = new HashMap<>();
convert.put("arr", "integer");
Processor processor = new MutateProcessor(null, null, convert, null, null, null, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("arr", List.class), equalTo(Arrays.asList(1, 2, 3)));
}
public void testSplit() throws IOException {
Map<String, String> split = new HashMap<>();
split.put("ip", "\\.");
Processor processor = new MutateProcessor(null, null, null, split, null, null, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("ip", List.class), equalTo(Arrays.asList("127", "0", "0", "1")));
}
public void testSplitNullValue() throws IOException {
Map<String, String> split = new HashMap<>();
split.put("not.found", "\\.");
Processor processor = new MutateProcessor(null, null, null, split, null, null, null, null, null, null);
try {
processor.execute(ingestDocument);
fail();
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Cannot split field. [not.found] is null."));
}
}
public void testGsub() throws IOException {
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression("ip", Pattern.compile("\\."), "-"));
Processor processor = new MutateProcessor(null, null, null, null, gsubExpressions, null, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("ip", String.class), equalTo("127-0-0-1"));
}
public void testGsub_NullValue() throws IOException {
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression("null_field", Pattern.compile("\\."), "-"));
Processor processor = new MutateProcessor(null, null, null, null, gsubExpressions, null, null, null, null, null);
try {
processor.execute(ingestDocument);
fail("processor execution should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Field \"null_field\" is null, cannot match pattern."));
}
}
public void testJoin() throws IOException {
HashMap<String, String> join = new HashMap<>();
join.put("arr", "-");
Processor processor = new MutateProcessor(null, null, null, null, null, join, null, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("arr", String.class), equalTo("1-2-3"));
}
public void testRemove() throws IOException {
List<String> remove = Arrays.asList("foo", "ip");
Processor processor = new MutateProcessor(null, null, null, null, null, null, remove, null, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(5));
assertThat(ingestDocument.getPropertyValue("foo", Object.class), nullValue());
assertThat(ingestDocument.getPropertyValue("ip", Object.class), nullValue());
}
public void testTrim() throws IOException {
List<String> trim = Arrays.asList("to_strip", "foo");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, trim, null, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("bar"));
assertThat(ingestDocument.getPropertyValue("to_strip", String.class), equalTo("clean"));
}
public void testTrimNullValue() throws IOException {
List<String> trim = Collections.singletonList("not.found");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, trim, null, null);
try {
processor.execute(ingestDocument);
fail();
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Cannot trim field. [not.found] is null."));
}
}
public void testUppercase() throws IOException {
List<String> uppercase = Collections.singletonList("foo");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("foo", String.class), equalTo("BAR"));
}
public void testUppercaseNullValue() throws IOException {
List<String> uppercase = Collections.singletonList("not.found");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
try {
processor.execute(ingestDocument);
fail();
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Cannot uppercase field. [not.found] is null."));
}
}
public void testLowercase() throws IOException {
List<String> lowercase = Collections.singletonList("alpha");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(7));
assertThat(ingestDocument.getPropertyValue("alpha", String.class), equalTo("abcd"));
}
public void testLowercaseNullValue() throws IOException {
List<String> lowercase = Collections.singletonList("not.found");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase);
try {
processor.execute(ingestDocument);
fail();
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Cannot lowercase field. [not.found] is null."));
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.remove;
import org.elasticsearch.ingest.processor.join.JoinProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class RemoveProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
RemoveProcessor.Factory factory = new RemoveProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> fields = Collections.singletonList("field1");
config.put("fields", fields);
RemoveProcessor removeProcessor = factory.create(config);
assertThat(removeProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
RemoveProcessor.Factory factory = new RemoveProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.remove;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class RemoveProcessorTests extends ESTestCase {
public void testRemoveFields() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Set<String> fields = new HashSet<>();
for (int i = 0; i < numFields; i++) {
fields.add(RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument));
}
Processor processor = new RemoveProcessor(fields);
processor.execute(ingestDocument);
for (String field : fields) {
assertThat(ingestDocument.getPropertyValue(field, Object.class), nullValue());
assertThat(ingestDocument.hasPropertyValue(field), equalTo(false));
}
}
public void testRemoveNonExistingField() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Processor processor = new RemoveProcessor(Collections.singletonList(RandomDocumentPicks.randomFieldName(random())));
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(0));
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.rename;
import org.elasticsearch.ingest.processor.join.JoinProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class RenameProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
RenameProcessor.Factory factory = new RenameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, String> fields = Collections.singletonMap("field1", "value1");
config.put("fields", fields);
RenameProcessor renameProcessor = factory.create(config);
assertThat(renameProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
RenameProcessor.Factory factory = new RenameProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.rename;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class RenameProcessorTests extends ESTestCase {
public void testRename() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Map<String, String> fields = new HashMap<>();
Map<String, Object> newFields = new HashMap<>();
for (int i = 0; i < numFields; i++) {
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
if (fields.containsKey(fieldName)) {
continue;
}
String newFieldName;
do {
newFieldName = RandomDocumentPicks.randomFieldName(random());
} while (RandomDocumentPicks.canAddField(newFieldName, ingestDocument) == false || newFields.containsKey(newFieldName));
newFields.put(newFieldName, ingestDocument.getPropertyValue(fieldName, Object.class));
fields.put(fieldName, newFieldName);
}
Processor processor = new RenameProcessor(fields);
processor.execute(ingestDocument);
for (Map.Entry<String, Object> entry : newFields.entrySet()) {
assertThat(ingestDocument.getPropertyValue(entry.getKey(), Object.class), equalTo(entry.getValue()));
}
}
public void testRenameNonExistingField() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Processor processor = new RenameProcessor(Collections.singletonMap(RandomDocumentPicks.randomFieldName(random()), RandomDocumentPicks.randomFieldName(random())));
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(0));
}
public void testRenameExistingFieldNullValue() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setPropertyValue(fieldName, null);
String newFieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new RenameProcessor(Collections.singletonMap(fieldName, newFieldName));
processor.execute(ingestDocument);
assertThat(ingestDocument.hasPropertyValue(fieldName), equalTo(false));
assertThat(ingestDocument.hasPropertyValue(newFieldName), equalTo(true));
assertThat(ingestDocument.getPropertyValue(newFieldName, Object.class), nullValue());
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.split;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class SplitProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
SplitProcessor.Factory factory = new SplitProcessor.Factory();
Map<String, Object> config = new HashMap<>();
Map<String, String> fields = Collections.singletonMap("field1", "\\.");
config.put("fields", fields);
SplitProcessor splitProcessor = factory.create(config);
assertThat(splitProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
SplitProcessor.Factory factory = new SplitProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.split;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
public class SplitProcessorTests extends ESTestCase {
public void testSplit() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, String> fields = new HashMap<>();
int numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "127.0.0.1");
fields.put(fieldName, "\\.");
}
Processor processor = new SplitProcessor(fields);
processor.execute(ingestDocument);
for (String field : fields.keySet()) {
assertThat(ingestDocument.getPropertyValue(field, List.class), equalTo(Arrays.asList("127", "0", "0", "1")));
}
}
public void testSplitNullValue() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Map<String, String> split = Collections.singletonMap(fieldName, "\\.");
Processor processor = new SplitProcessor(split);
try {
processor.execute(ingestDocument);
fail("split processor should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot split."));
}
}
public void testSplitNonStringValue() throws IOException {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setPropertyValue(fieldName, randomInt());
Processor processor = new SplitProcessor(Collections.singletonMap(fieldName, "\\."));
try {
processor.execute(ingestDocument);
fail("split processor should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.trim;
import org.elasticsearch.ingest.processor.lowercase.LowercaseProcessor;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class TrimProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
TrimProcessor.Factory factory = new TrimProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> fields = Collections.singletonList("field1");
config.put("fields", fields);
TrimProcessor uppercaseProcessor = factory.create(config);
assertThat(uppercaseProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
TrimProcessor.Factory factory = new TrimProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.trim;
import org.elasticsearch.ingest.processor.AbstractStringProcessor;
import org.elasticsearch.ingest.processor.AbstractStringProcessorTestCase;
import java.util.Collection;
public class TrimProcessorTests extends AbstractStringProcessorTestCase {
@Override
protected AbstractStringProcessor newProcessor(Collection<String> fields) {
return new TrimProcessor(fields);
}
@Override
protected String modifyInput(String input) {
String updatedFieldValue = "";
updatedFieldValue = addWhitespaces(updatedFieldValue);
updatedFieldValue += input;
updatedFieldValue = addWhitespaces(updatedFieldValue);
return updatedFieldValue;
}
@Override
protected String expectedResult(String input) {
return input.trim();
}
private static String addWhitespaces(String input) {
int prefixLength = randomIntBetween(0, 10);
for (int i = 0; i < prefixLength; i++) {
input += ' ';
}
return input;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.uppercase;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
public class UppercaseProcessorFactoryTests extends ESTestCase {
public void testCreate() throws IOException {
UppercaseProcessor.Factory factory = new UppercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
List<String> fields = Collections.singletonList("field1");
config.put("fields", fields);
UppercaseProcessor uppercaseProcessor = factory.create(config);
assertThat(uppercaseProcessor.getFields(), equalTo(fields));
}
public void testCreateMissingFields() throws IOException {
UppercaseProcessor.Factory factory = new UppercaseProcessor.Factory();
Map<String, Object> config = new HashMap<>();
try {
factory.create(config);
fail("factory create should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("required property [fields] is missing"));
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.uppercase;
import org.elasticsearch.ingest.processor.AbstractStringProcessor;
import org.elasticsearch.ingest.processor.AbstractStringProcessorTestCase;
import java.util.Collection;
import java.util.Locale;
public class UppercaseProcessorTests extends AbstractStringProcessorTestCase {
@Override
protected AbstractStringProcessor newProcessor(Collection<String> fields) {
return new UppercaseProcessor(fields);
}
@Override
protected String expectedResult(String input) {
return input.toUpperCase(Locale.ROOT);
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.test.ESTestCase;
@ -37,9 +36,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -56,7 +53,7 @@ public class PipelineStoreTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
client = mock(PipelineStoreClient.class);
Environment environment = mock(Environment.class);
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(MutateProcessor.TYPE, new MutateProcessor.Factory()));
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.emptyMap());
store.start();
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
@ -43,7 +42,9 @@ import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.*;
import static org.hamcrest.Matchers.equalTo;
@ -115,7 +116,7 @@ public class IngestActionFilterTests extends ESTestCase {
verifyZeroInteractions(executionService, actionListener);
}
public void testApply_executed() throws Exception {
public void testApplyExecuted() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
@ -139,7 +140,7 @@ public class IngestActionFilterTests extends ESTestCase {
verifyZeroInteractions(actionListener);
}
public void testApply_failed() throws Exception {
public void testApplyFailed() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
@ -163,7 +164,7 @@ public class IngestActionFilterTests extends ESTestCase {
verifyZeroInteractions(actionFilterChain);
}
public void testApply_withBulkRequest() throws Exception {
public void testApplyWithBulkRequest() throws Exception {
ThreadPool threadPool = new ThreadPool(
Settings.builder()
.put("name", "_name")
@ -172,13 +173,18 @@ public class IngestActionFilterTests extends ESTestCase {
);
PipelineStore store = mock(PipelineStore.class);
Map<String, Object> mutateConfig = new HashMap<>();
Map<String, Object> update = new HashMap<>();
update.put("field2", "value2");
mutateConfig.put("update", update);
Processor processor = new Processor() {
@Override
public void execute(IngestDocument ingestDocument) {
ingestDocument.setPropertyValue("field2", "value2");
}
Processor mutateProcessor = (new MutateProcessor.Factory()).create(mutateConfig);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(mutateProcessor)));
@Override
public String getType() {
return null;
}
};
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
executionService = new PipelineExecutionService(store, threadPool);
filter = new IngestActionFilter(Settings.EMPTY, executionService);

View File

@ -12,8 +12,8 @@
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"add" : {
"fields" : {
"field2": "_value"
}
}

View File

@ -0,0 +1,132 @@
---
"Test mutate processors":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"add" : {
"fields" : {
"new_field": "new_value"
}
}
},
{
"rename" : {
"fields" : {
"field_to_rename": "renamed_field"
}
}
},
{
"remove" : {
"fields" : [
"field_to_remove"
]
}
},
{
"lowercase" : {
"fields" : [
"field_to_lowercase"
]
}
},
{
"uppercase" : {
"fields" : [
"field_to_uppercase"
]
}
},
{
"trim" : {
"fields" : [
"field_to_trim"
]
}
},
{
"split" : {
"fields" : {
"field_to_split": "-"
}
}
},
{
"join" : {
"fields" : {
"field_to_join": "-"
}
}
},
{
"convert" : {
"fields" : {
"field_to_convert": "integer"
}
}
},
{
"gsub" : {
"expressions" : [
{
"field": "field_to_gsub",
"pattern" : "-",
"replacement" : "."
}
]
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {
field_to_rename: "value",
field_to_remove: "old_value",
field_to_lowercase: "LOWERCASE",
field_to_uppercase: "uppercase",
field_to_trim: " trimmed ",
field_to_split: "127-0-0-1",
field_to_join: ["127","0","0","1"],
field_to_convert: ["127","0","0","1"],
field_to_gsub: "127-0-0-1"
}
- do:
get:
index: test
type: test
id: 1
- is_false: _source.field_to_rename
- is_false: _source.field_to_remove
- match: { _source.renamed_field: "value" }
- match: { _source.field_to_lowercase: "lowercase" }
- match: { _source.field_to_uppercase: "UPPERCASE" }
- match: { _source.field_to_trim: "trimmed" }
- match: { _source.field_to_split: ["127","0","0","1"] }
- match: { _source.field_to_join: "127-0-0-1" }
- match: { _source.field_to_convert: [127,0,0,1] }
- match: { _source.field_to_gsub: "127.0.0.1" }

View File

@ -1,50 +0,0 @@
---
"Test mutate processor":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"mutate" : {
"rename" : {
"field1": "foo"
},
"update" : {
"field2": "bar"
}
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {field1: "val"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.foo: "val" }
- match: { _source.field2: "bar" }

View File

@ -12,8 +12,8 @@
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"add" : {
"fields" : {
"field2" : "_value"
}
}
@ -66,8 +66,8 @@
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"add" : {
"fields" : {
"field2" : "_value"
}
}
@ -129,15 +129,15 @@
"description": "_description",
"processors": [
{
"mutate" : {
"update" : {
"add" : {
"fields" : {
"field2" : "_value"
}
}
},
{
"mutate" : {
"update" : {
"add" : {
"fields" : {
"field3" : "third_val"
}
}
@ -157,7 +157,7 @@
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.processor_id: "processor[mutate]-0" }
- match: { docs.0.processor_results.0.processor_id: "processor[add]-0" }
- is_true: docs.0.processor_results.0.doc.modified
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo: "bar" }
@ -181,8 +181,8 @@
"description": "_description",
"processors": [
{
"mutate" : {
"uppercase" : ["foo"]
"uppercase" : {
"fields" : ["foo"]
}
}
]
@ -226,15 +226,15 @@
"description": "_description",
"processors": [
{
"mutate" : {
"convert" : {
"convert" : {
"fields" : {
"foo": "integer"
}
}
},
{
"mutate" : {
"uppercase" : ["bar"]
"uppercase" : {
"fields" : ["bar"]
}
}
]
@ -262,7 +262,7 @@
}
- length: { docs: 2 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.error.type: "number_format_exception" }
- match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.doc._index: "index" }
- match: { docs.0.processor_results.1.doc._type: "type" }
- match: { docs.0.processor_results.1.doc._id: "id" }