diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index 4c0cc6a77c3..42585fe0e90 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -403,57 +403,110 @@ An example that adds the parsed date to the `timestamp` field based on the `init } -------------------------------------------------- -==== Meta processor +=== Accessing data in pipelines -The `meta` processor allows to modify metadata properties of a document being processed. +Processors in pipelines have read and write access to documents that pass through the pipeline. +The fields in the source of a document and its metadata fields are accessible. -The following example changes the index of a document to `alternative_index` instead of indexing it into an index -that was specified in the index or bulk request: +Accessing a field in the source is straightforward and one can refer to fields by +their name. For example: [source,js] -------------------------------------------------- { - "description" : "...", - "processors" : [ - { - "meta" : { - "_index" : "alternative_index" - } - } - ] + "set": { + "field": "my_field" + "value": 582.1 + } } -------------------------------------------------- -The following metadata attributes can be modified in this processor: `_index`, `_type`, `_id`, `_routing`, `_parent`, -`_timestamp` and `_ttl`. All these metadata attributes can be specified in the body of the `meta` processor. - -Also the metadata settings in this processor are templatable which allows metadata field values to be replaced with -field values in the source of the document being indexed. The mustache template language is used and anything between -`{{` and `}}` can contain a template and point to any field in the source of the document. - -The following example documents being processed end up being indexed into an index based on the resolved city name by -the `geoip` processor. (for example `city-amsterdam`) +On top of this fields from the source are always accessible via the `_source` prefix: [source,js] -------------------------------------------------- { - "description" : "...", - "processors" : [ - { - "geoip" : { - "source" : "ip" - } - }, - { - "meta" : { - "_index" : "city-{{geoip.city_name}}" - } - } - ] + "set": { + "field": "_source.my_field" + "value": 582.1 + } } -------------------------------------------------- -=== Put pipeline API +Metadata fields can also be accessed in the same way as fields from the source. This +is possible because Elasticsearch doesn't allow fields in the source that have the +same name as metadata fields. + +The following example sets the id of a document to `1`: + +[source,js] +-------------------------------------------------- +{ + "set": { + "field": "_id" + "value": "1" + } +} +-------------------------------------------------- + +The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`, `_parent`, +`_timestamp` and `_ttl`. + +Beyond metadata fields and source fields, the ingest plugin also adds ingest metadata to documents being processed. +These metadata properties are accessible under the `_ingest` key. Currently the ingest plugin adds the ingest timestamp +under `_ingest.timestamp` key to the ingest metadata, which is the time the ingest plugin received the index or bulk +request to pre-process. But any processor is free to add more ingest related metadata to it. Ingest metadata is transient +and is lost after a document has been processed by the pipeline and thus ingest metadata won't be indexed. + +The following example adds a field with the name `received` and the value is the ingest timestamp: + +[source,js] +-------------------------------------------------- +{ + "set": { + "field": "received" + "value": "{{_ingest.timestamp}}" + } +} +-------------------------------------------------- + +As opposed to Elasticsearch metadata fields, the ingest metadata field name _ingest can be used as a valid field name +in the source of a document. Use _source._ingest to refer to it, otherwise _ingest will be interpreted as ingest +metadata fields by the ingest plugin. + +A number of processor settings also support templating. Settings that support templating can have zero or more +template snippets. A template snippet begins with `{{` and ends with `}}`. +Accessing fields and metafields in templates is exactly the same as via regular processor field settings. + +In this example a field by the name `field_c` is added and its value is a concatenation of +the values of `field_a` and `field_b`. + +[source,js] +-------------------------------------------------- +{ + "set": { + "field": "field_c" + "value": "{{field_a}} {{field_b}}" + } +} +-------------------------------------------------- + +The following example changes the index a document is going to be indexed into. The index a document will be redirected +to depends on the field in the source with name `geoip.country_iso_code`. + +[source,js] +-------------------------------------------------- +{ + "set": { + "field": "_index" + "value": "{{geoip.country_iso_code}}" + } +} +-------------------------------------------------- + +=== Ingest APIs + +==== Put pipeline API The put pipeline api adds pipelines and updates existing pipelines in the cluster. @@ -477,7 +530,7 @@ PUT _ingest/pipeline/my-pipeline-id NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all nodes to have the latest version of the pipeline. -=== Get pipeline API +==== Get pipeline API The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline. @@ -513,7 +566,7 @@ For each returned pipeline the source and the version is returned. The version is useful for knowing what version of the pipeline the node has. Multiple ids can be provided at the same time. Also wildcards are supported. -=== Delete pipeline API +==== Delete pipeline API The delete pipeline api deletes pipelines by id. diff --git a/plugins/ingest/build.gradle b/plugins/ingest/build.gradle index 861115f336d..383ad6c88fa 100644 --- a/plugins/ingest/build.gradle +++ b/plugins/ingest/build.gradle @@ -33,7 +33,6 @@ dependencies { compile('com.fasterxml.jackson.core:jackson-databind:2.5.3') compile('com.maxmind.db:maxmind-db:1.0.0') - compile "com.github.spullara.mustache.java:compiler:0.9.1" compile 'joda-time:joda-time:2.8.2' testCompile 'org.elasticsearch:geolite2-databases:20151029' testCompile 'org.elasticsearch:securemock:1.2' diff --git a/plugins/ingest/licenses/compiler-0.9.1.jar.sha1 b/plugins/ingest/licenses/compiler-0.9.1.jar.sha1 deleted file mode 100644 index d1ef908bb33..00000000000 --- a/plugins/ingest/licenses/compiler-0.9.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -14aec5344639782ee76441401b773946c65eb2b3 \ No newline at end of file diff --git a/plugins/ingest/licenses/compiler-LICENSE.txt b/plugins/ingest/licenses/compiler-LICENSE.txt deleted file mode 100644 index ac68303cc11..00000000000 --- a/plugins/ingest/licenses/compiler-LICENSE.txt +++ /dev/null @@ -1,14 +0,0 @@ -Copyright 2010 RightTime, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - diff --git a/plugins/ingest/licenses/compiler-NOTICE.txt b/plugins/ingest/licenses/compiler-NOTICE.txt deleted file mode 100644 index 8d1c8b69c3f..00000000000 --- a/plugins/ingest/licenses/compiler-NOTICE.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java index ca4d55ee2db..a14a2647fd4 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -30,6 +30,9 @@ import java.util.*; */ public final class IngestDocument { + public final static String INGEST_KEY = "_ingest"; + public final static String SOURCE_KEY = "_source"; + static final String TIMESTAMP = "timestamp"; private final Map sourceAndMetadata; @@ -149,6 +152,16 @@ public final class IngestDocument { return false; } + /** + * Removes the field identified by the provided path. + * @param fieldPathTemplate Resolves to the path with dot-notation within the document + * @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. + */ + public void removeField(TemplateService.Template fieldPathTemplate) { + Map model = createTemplateModel(); + removeField(fieldPathTemplate.execute(model)); + } + /** * Removes the field identified by the provided path. * @param path the path of the field to be removed @@ -246,12 +259,22 @@ public final class IngestDocument { setFieldValue(path, value, false); } + /** + * Sets the provided value to the provided path in the document. + * Any non existing path element will be created. If the last element is a list, + * the value will replace the existing list. + * @param fieldPathTemplate Resolves to the path with dot-notation within the document + * @param valueSource The value source that will produce the value to put in for the path key + * @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist. + */ + public void setFieldValue(TemplateService.Template fieldPathTemplate, ValueSource valueSource) { + Map model = createTemplateModel(); + setFieldValue(fieldPathTemplate.execute(model), valueSource.copyAndResolve(model), false); + } + private void setFieldValue(String path, Object value, boolean append) { FieldPath fieldPath = new FieldPath(path); Object context = fieldPath.initialContext; - - value = deepCopy(value); - for (int i = 0; i < fieldPath.pathElements.length - 1; i++) { String pathElement = fieldPath.pathElements[i]; if (context == null) { @@ -332,6 +355,15 @@ public final class IngestDocument { throw new IllegalArgumentException("field [" + path + "] of type [" + object.getClass().getName() + "] cannot be cast to [" + clazz.getName() + "]"); } + private Map createTemplateModel() { + Map model = new HashMap<>(sourceAndMetadata); + model.put(SOURCE_KEY, sourceAndMetadata); + // If there is a field in the source with the name '_ingest' it gets overwritten here, + // if access to that field is required then it get accessed via '_source._ingest' + model.put(INGEST_KEY, ingestMetadata); + return model; + } + /** * one time operation that extracts the metadata fields from the ingest document and returns them. * Metadata fields that used to be accessible as ordinary top level fields will be removed as part of this call. @@ -361,32 +393,6 @@ public final class IngestDocument { return this.sourceAndMetadata; } - static Object deepCopy(Object value) { - if (value instanceof Map) { - @SuppressWarnings("unchecked") - Map mapValue = (Map) value; - Map copy = new HashMap<>(mapValue.size()); - for (Map.Entry entry : mapValue.entrySet()) { - copy.put(entry.getKey(), deepCopy(entry.getValue())); - } - return copy; - } else if (value instanceof List) { - @SuppressWarnings("unchecked") - List listValue = (List) value; - List copy = new ArrayList<>(listValue.size()); - for (Object itemValue : listValue) { - copy.add(deepCopy(itemValue)); - } - return copy; - } else if (value == null || value instanceof String || value instanceof Integer || - value instanceof Long || value instanceof Float || - value instanceof Double || value instanceof Boolean) { - return value; - } else { - throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]"); - } - } - @Override public boolean equals(Object obj) { if (obj == this) { return true; } @@ -431,26 +437,6 @@ public final class IngestDocument { return fieldName; } - public static MetaData fromString(String value) { - switch (value) { - case "_index": - return INDEX; - case "_type": - return TYPE; - case "_id": - return ID; - case "_routing": - return ROUTING; - case "_parent": - return PARENT; - case "_timestamp": - return TIMESTAMP; - case "_ttl": - return TTL; - default: - throw new IllegalArgumentException("no valid metadata field name [" + value + "]"); - } - } } private class FieldPath { @@ -462,12 +448,12 @@ public final class IngestDocument { throw new IllegalArgumentException("path cannot be null nor empty"); } String newPath; - if (path.startsWith("_ingest.")) { + if (path.startsWith(INGEST_KEY + ".")) { initialContext = ingestMetadata; newPath = path.substring(8, path.length()); } else { initialContext = sourceAndMetadata; - if (path.startsWith("_source.")) { + if (path.startsWith(SOURCE_KEY + ".")) { newPath = path.substring(8, path.length()); } else { newPath = path; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/TemplateService.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/TemplateService.java new file mode 100644 index 00000000000..c0505365e0a --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/TemplateService.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest; + +import java.util.Map; + +/** + * Abstraction for the template engine. + */ +public interface TemplateService { + + Template compile(String template); + + interface Template { + + String execute(Map model); + + String getKey(); + + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/ValueSource.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/ValueSource.java new file mode 100644 index 00000000000..525bb722d60 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/ValueSource.java @@ -0,0 +1,189 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.*; + +/** + * Holds a value. If the value is requested a copy is made and optionally template snippets are resolved too. + */ +public interface ValueSource { + + /** + * Returns a copy of the value this ValueSource holds and resolves templates if there're any. + * + * For immutable values only a copy of the reference to the value is made. + * + * @param model The model to be used when resolving any templates + * @return copy of the wrapped value + */ + Object copyAndResolve(Map model); + + static ValueSource wrap(Object value, TemplateService templateService) { + if (value instanceof Map) { + @SuppressWarnings("unchecked") + Map mapValue = (Map) value; + Map valueTypeMap = new HashMap<>(mapValue.size()); + for (Map.Entry entry : mapValue.entrySet()) { + valueTypeMap.put(wrap(entry.getKey(), templateService), wrap(entry.getValue(), templateService)); + } + return new MapValue(valueTypeMap); + } else if (value instanceof List) { + @SuppressWarnings("unchecked") + List listValue = (List) value; + List valueSourceList = new ArrayList<>(listValue.size()); + for (Object item : listValue) { + valueSourceList.add(wrap(item, templateService)); + } + return new ListValue(valueSourceList); + } else if (value == null || value instanceof Integer || + value instanceof Long || value instanceof Float || + value instanceof Double || value instanceof Boolean) { + return new ObjectValue(value); + } else if (value instanceof String) { + return new TemplatedValue(templateService.compile((String) value)); + } else { + throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]"); + } + } + + final class MapValue implements ValueSource { + + private final Map map; + + MapValue(Map map) { + this.map = map; + } + + @Override + public Object copyAndResolve(Map model) { + Map copy = new HashMap<>(); + for (Map.Entry entry : this.map.entrySet()) { + copy.put(entry.getKey().copyAndResolve(model), entry.getValue().copyAndResolve(model)); + } + return copy; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MapValue mapValue = (MapValue) o; + return map.equals(mapValue.map); + + } + + @Override + public int hashCode() { + return map.hashCode(); + } + } + + final class ListValue implements ValueSource { + + private final List values; + + ListValue(List values) { + this.values = values; + } + + @Override + public Object copyAndResolve(Map model) { + List copy = new ArrayList<>(values.size()); + for (ValueSource value : values) { + copy.add(value.copyAndResolve(model)); + } + return copy; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ListValue listValue = (ListValue) o; + return values.equals(listValue.values); + + } + + @Override + public int hashCode() { + return values.hashCode(); + } + } + + final class ObjectValue implements ValueSource { + + private final Object value; + + ObjectValue(Object value) { + this.value = value; + } + + @Override + public Object copyAndResolve(Map model) { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ObjectValue objectValue = (ObjectValue) o; + return Objects.equals(value, objectValue.value); + } + + @Override + public int hashCode() { + return Objects.hashCode(value); + } + } + + final class TemplatedValue implements ValueSource { + + private final TemplateService.Template template; + + TemplatedValue(TemplateService.Template template) { + this.template = template; + } + + @Override + public Object copyAndResolve(Map model) { + return template.execute(model); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TemplatedValue templatedValue = (TemplatedValue) o; + return Objects.equals(template.getKey(), templatedValue.template.getKey()); + } + + @Override + public int hashCode() { + return Objects.hashCode(template.getKey()); + } + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessor.java deleted file mode 100644 index 4de13f5b107..00000000000 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessor.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.elasticsearch.ingest.processor.meta; - -import com.github.mustachejava.DefaultMustacheFactory; -import com.github.mustachejava.Mustache; -import com.github.mustachejava.MustacheFactory; -import org.elasticsearch.common.io.FastStringReader; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestDocument.MetaData; -import org.elasticsearch.ingest.processor.Processor; - -import java.io.StringWriter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - - -//TODO this processor needs to be removed, as the set processor allows now to set any field, including metadata ones. -//The only reason for it to be still here is that it supports templating, we will remove once any processor supports templating. -public final class MetaDataProcessor implements Processor { - - public final static String TYPE = "meta"; - - private final Map templates; - - public MetaDataProcessor(Map templates) { - this.templates = templates; - } - - @Override - public void execute(IngestDocument ingestDocument) { - Map model = ingestDocument.getSourceAndMetadata(); - for (Map.Entry entry : templates.entrySet()) { - StringWriter writer = new StringWriter(); - entry.getValue().execute(writer, model); - ingestDocument.setFieldValue(entry.getKey().getFieldName(), writer.toString()); - } - } - - @Override - public String getType() { - return TYPE; - } - - Map getTemplates() { - return templates; - } - - public final static class Factory implements Processor.Factory { - - private final MustacheFactory mustacheFactory = new DefaultMustacheFactory(); - - @Override - public MetaDataProcessor create(Map config) throws Exception { - Map templates = new HashMap<>(); - Iterator> iterator = config.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - MetaData metaData = MetaData.fromString(entry.getKey()); - Mustache mustache = mustacheFactory.compile(new FastStringReader(entry.getValue().toString()), ""); - templates.put(metaData, mustache); - iterator.remove(); - } - - if (templates.isEmpty()) { - throw new IllegalArgumentException("no meta fields specified"); - } - - return new MetaDataProcessor(Collections.unmodifiableMap(templates)); - } - } - -} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/remove/RemoveProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/remove/RemoveProcessor.java index 80cd017ef78..9bdde91f38c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/remove/RemoveProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/remove/RemoveProcessor.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor.remove; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.Processor; @@ -32,13 +33,13 @@ public class RemoveProcessor implements Processor { public static final String TYPE = "remove"; - private final String field; + private final TemplateService.Template field; - RemoveProcessor(String field) { + RemoveProcessor(TemplateService.Template field) { this.field = field; } - String getField() { + public TemplateService.Template getField() { return field; } @@ -53,10 +54,17 @@ public class RemoveProcessor implements Processor { } public static class Factory implements Processor.Factory { + + private final TemplateService templateService; + + public Factory(TemplateService templateService) { + this.templateService = templateService; + } + @Override public RemoveProcessor create(Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(config, "field"); - return new RemoveProcessor(field); + return new RemoveProcessor(templateService.compile(field)); } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/set/SetProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/set/SetProcessor.java index f14be2a3217..c872c479f2b 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/set/SetProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/set/SetProcessor.java @@ -20,11 +20,11 @@ package org.elasticsearch.ingest.processor.set; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.ingest.ValueSource; import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.Processor; -import java.util.Arrays; -import java.util.Collections; import java.util.Map; /** @@ -35,19 +35,19 @@ public class SetProcessor implements Processor { public static final String TYPE = "set"; - private final String field; - private final Object value; + private final TemplateService.Template field; + private final ValueSource value; - SetProcessor(String field, Object value) { + SetProcessor(TemplateService.Template field, ValueSource value) { this.field = field; this.value = value; } - String getField() { + public TemplateService.Template getField() { return field; } - Object getValue() { + public ValueSource getValue() { return value; } @@ -62,11 +62,18 @@ public class SetProcessor implements Processor { } public static final class Factory implements Processor.Factory { + + private final TemplateService templateService; + + public Factory(TemplateService templateService) { + this.templateService = templateService; + } + @Override public SetProcessor create(Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(config, "field"); Object value = ConfigurationUtils.readObject(config, "value"); - return new SetProcessor(field, value); + return new SetProcessor(templateService.compile(field), ValueSource.wrap(value, templateService)); } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index 5c6961b8670..a50bed33ea0 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -34,15 +34,12 @@ import org.elasticsearch.ingest.processor.rename.RenameProcessor; import org.elasticsearch.ingest.processor.split.SplitProcessor; import org.elasticsearch.ingest.processor.trim.TrimProcessor; import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor; -import org.elasticsearch.ingest.processor.meta.MetaDataProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService; import java.util.HashMap; import java.util.Map; -import static org.elasticsearch.plugin.ingest.PipelineStore.ProcessorFactoryProvider; - public class IngestModule extends AbstractModule { private final Map processorFactoryProviders = new HashMap<>(); @@ -54,20 +51,19 @@ public class IngestModule extends AbstractModule { binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(SimulateExecutionService.class).asEagerSingleton(); - addProcessor(GeoIpProcessor.TYPE, environment -> new GeoIpProcessor.Factory(environment.configFile())); - addProcessor(GrokProcessor.TYPE, environment -> new GrokProcessor.Factory(environment.configFile())); - addProcessor(DateProcessor.TYPE, environment -> new DateProcessor.Factory()); - addProcessor(SetProcessor.TYPE, environment -> new SetProcessor.Factory()); - addProcessor(RenameProcessor.TYPE, environment -> new RenameProcessor.Factory()); - addProcessor(RemoveProcessor.TYPE, environment -> new RemoveProcessor.Factory()); - addProcessor(SplitProcessor.TYPE, environment -> new SplitProcessor.Factory()); - addProcessor(JoinProcessor.TYPE, environment -> new JoinProcessor.Factory()); - addProcessor(UppercaseProcessor.TYPE, environment -> new UppercaseProcessor.Factory()); - addProcessor(LowercaseProcessor.TYPE, environment -> new LowercaseProcessor.Factory()); - addProcessor(TrimProcessor.TYPE, environment -> new TrimProcessor.Factory()); - addProcessor(ConvertProcessor.TYPE, environment -> new ConvertProcessor.Factory()); - addProcessor(GsubProcessor.TYPE, environment -> new GsubProcessor.Factory()); - addProcessor(MetaDataProcessor.TYPE, environment -> new MetaDataProcessor.Factory()); + addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); + addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); + addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); + addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); + addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); + addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); + addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); + addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); + addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); + addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory()); + addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); + addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); + addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); for (Map.Entry entry : processorFactoryProviders.entrySet()) { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 2474d64c067..1bd68efd87c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -41,6 +41,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction; import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransportAction; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptModule; import java.util.Collection; import java.util.Collections; @@ -115,4 +116,8 @@ public class IngestPlugin extends Plugin { networkModule.registerRestHandler(RestSimulatePipelineAction.class); } } + + public void onModule(ScriptModule module) { + module.registerScriptContext(InternalTemplateService.INGEST_SCRIPT_CONTEXT); + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/InternalTemplateService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/InternalTemplateService.java new file mode 100644 index 00000000000..cec07f40459 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/InternalTemplateService.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.script.*; + +import java.util.Collections; +import java.util.Map; + +class InternalTemplateService implements TemplateService { + + public static final ScriptContext.Plugin INGEST_SCRIPT_CONTEXT = new ScriptContext.Plugin("elasticsearch-ingest", "ingest"); + + private final ScriptService scriptService; + + InternalTemplateService(ScriptService scriptService) { + this.scriptService = scriptService; + } + + @Override + public Template compile(String template) { + int mustacheStart = template.indexOf("{{"); + int mustacheEnd = template.indexOf("}}"); + if (mustacheStart != -1 && mustacheEnd != -1 && mustacheStart < mustacheEnd) { + Script script = new Script(template, ScriptService.ScriptType.INLINE, "mustache", Collections.emptyMap()); + CompiledScript compiledScript = scriptService.compile( + script, + INGEST_SCRIPT_CONTEXT, + null /* we can supply null here, because ingest doesn't use indexed scripts */, + Collections.emptyMap() + ); + return new Template() { + @Override + public String execute(Map model) { + ExecutableScript executableScript = scriptService.executable(compiledScript, model); + Object result = executableScript.run(); + if (result instanceof BytesReference) { + return ((BytesReference) result).toUtf8(); + } + return String.valueOf(result); + } + + @Override + public String getKey() { + return template; + } + }; + } else { + return new StringTemplate(template); + } + } + + class StringTemplate implements Template { + + private final String value; + + public StringTemplate(String value) { + this.value = value; + } + + @Override + public String execute(Map model) { + return value; + } + + @Override + public String getKey() { + return value; + } + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index b3e30b51ff9..9e36cf17df2 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -35,10 +35,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.SearchScrollIterator; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.regex.Regex; @@ -48,9 +45,11 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest; +import org.elasticsearch.script.*; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; @@ -66,35 +65,46 @@ public class PipelineStore extends AbstractLifecycleComponent { public final static String TYPE = "pipeline"; private final ThreadPool threadPool; + private final Environment environment; private final TimeValue scrollTimeout; private final ClusterService clusterService; private final Provider clientProvider; private final TimeValue pipelineUpdateInterval; + private final Provider scriptServiceProvider; private final Pipeline.Factory factory = new Pipeline.Factory(); - private final Map processorFactoryRegistry; + private volatile Map processorFactoryRegistry; + private final Map processorFactoryProviders; private volatile Client client; private volatile Map pipelines = new HashMap<>(); @Inject - public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processorFactoryProviders) { + public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, + Environment environment, ClusterService clusterService, Provider scriptServiceProvider, + Map processorFactoryProviders) { super(settings); this.threadPool = threadPool; + this.environment = environment; this.clusterService = clusterService; this.clientProvider = clientProvider; + this.scriptServiceProvider = scriptServiceProvider; this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1)); - Map processorFactories = new HashMap<>(); - for (Map.Entry entry : processorFactoryProviders.entrySet()) { - Processor.Factory processorFactory = entry.getValue().get(environment); - processorFactories.put(entry.getKey(), processorFactory); - } - this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); + this.processorFactoryProviders = processorFactoryProviders; + clusterService.add(new PipelineStoreListener()); } @Override protected void doStart() { + // TODO this will be better when #15203 gets in: + Map processorFactories = new HashMap<>(); + TemplateService templateService = new InternalTemplateService(scriptServiceProvider.get()); + for (Map.Entry entry : processorFactoryProviders.entrySet()) { + Processor.Factory processorFactory = entry.getValue().get(environment, templateService); + processorFactories.put(entry.getKey(), processorFactory); + } + this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); } @Override @@ -249,7 +259,6 @@ public class PipelineStore extends AbstractLifecycleComponent { return SearchScrollIterator.createIterator(client(), scrollTimeout, searchRequest); } - private Client client() { if (client == null) { client = clientProvider.get(); @@ -257,19 +266,6 @@ public class PipelineStore extends AbstractLifecycleComponent { return client; } - /** - * The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some - * processors rely on reading files from the config directory. We can't add Environment as a constructor parameter, - * so we need some code that provides the physical location of the configuration directory to the processor factories - * that need this and this is what this processor factory provider does. - */ - @FunctionalInterface - interface ProcessorFactoryProvider { - - Processor.Factory get(Environment environment); - - } - class Updater implements Runnable { @Override diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/ProcessorFactoryProvider.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/ProcessorFactoryProvider.java new file mode 100644 index 00000000000..e99261e6408 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/ProcessorFactoryProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.ingest.processor.Processor; + +/** + * The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some + * processors rely on reading files from the config directory. We can't add Environment as a constructor parameter, + * so we need some code that provides the physical location of the configuration directory to the processor factories + * that need this and this is what this processor factory provider does. + */ +@FunctionalInterface +interface ProcessorFactoryProvider { + + Processor.Factory get(Environment environment, TemplateService templateService); + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index 11ac560fb3e..011d47c67ed 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -22,8 +22,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.test.ESTestCase; import org.junit.Before; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.*; import static org.hamcrest.Matchers.*; @@ -381,7 +379,7 @@ public class IngestDocumentTests extends ESTestCase { } try { - ingestDocument.setFieldValue("_ingest.", Object.class); + ingestDocument.setFieldValue("_ingest.", "_value"); fail("set field value should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("path [_ingest.] is not valid")); @@ -605,7 +603,7 @@ public class IngestDocumentTests extends ESTestCase { public void testRemoveNullField() { try { - ingestDocument.removeField(null); + ingestDocument.removeField((String) null); fail("remove field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("path cannot be null nor empty")); @@ -677,57 +675,4 @@ public class IngestDocumentTests extends ESTestCase { } } - public void testDeepCopy() { - int iterations = scaledRandomIntBetween(8, 64); - for (int i = 0; i < iterations; i++) { - Map map = RandomDocumentPicks.randomSource(random()); - Object copy = IngestDocument.deepCopy(map); - assertThat("iteration: " + i, copy, equalTo(map)); - assertThat("iteration: " + i, copy, not(sameInstance(map))); - } - } - - public void testDeepCopyDoesNotChangeProvidedMap() { - Map myPreciousMap = new HashMap<>(); - myPreciousMap.put("field2", "value2"); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, new HashMap<>()); - ingestDocument.setFieldValue("field1", myPreciousMap); - ingestDocument.removeField("field1.field2"); - - assertThat(myPreciousMap.size(), equalTo(1)); - assertThat(myPreciousMap.get("field2"), equalTo("value2")); - } - - public void testDeepCopyDoesNotChangeProvidedList() { - List myPreciousList = new ArrayList<>(); - myPreciousList.add("value"); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, new HashMap<>()); - ingestDocument.setFieldValue("field1", myPreciousList); - ingestDocument.removeField("field1.0"); - - assertThat(myPreciousList.size(), equalTo(1)); - assertThat(myPreciousList.get(0), equalTo("value")); - } - - public void testIngestMetadataTimestamp() throws Exception { - long before = System.currentTimeMillis(); - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - long after = System.currentTimeMillis(); - String timestampString = ingestDocument.getIngestMetadata().get("timestamp"); - assertThat(timestampString, notNullValue()); - assertThat(timestampString, endsWith("+0000")); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT); - Date timestamp = df.parse(timestampString); - assertThat(timestamp.getTime(), greaterThanOrEqualTo(before)); - assertThat(timestamp.getTime(), lessThanOrEqualTo(after)); - } - - public void testCopyConstructor() { - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - IngestDocument copy = new IngestDocument(ingestDocument); - assertThat(ingestDocument.getSourceAndMetadata(), not(sameInstance(copy.getSourceAndMetadata()))); - assertThat(ingestDocument.getSourceAndMetadata(), equalTo(copy.getSourceAndMetadata())); - } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/TestTemplateService.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/TestTemplateService.java new file mode 100644 index 00000000000..5ef2c8e4bdd --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/TestTemplateService.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.Map; + +public class TestTemplateService implements TemplateService { + + public static TemplateService instance() { + return new TestTemplateService(); + } + + private TestTemplateService() { + } + + @Override + public Template compile(String template) { + return new MockTemplate(template); + } + + public static class MockTemplate implements TemplateService.Template { + + private final String expected; + + public MockTemplate(String expected) { + this.expected = expected; + } + + @Override + public String execute(Map model) { + return expected; + } + + @Override + public String getKey() { + return expected; + } + + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java new file mode 100644 index 00000000000..1c3f7dc3120 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.test.ESTestCase; + +import java.util.*; + +import static org.hamcrest.Matchers.*; + +public class ValueSourceTests extends ESTestCase { + + public void testDeepCopy() { + int iterations = scaledRandomIntBetween(8, 64); + for (int i = 0; i < iterations; i++) { + Map map = RandomDocumentPicks.randomSource(random()); + ValueSource valueSource = ValueSource.wrap(map, TestTemplateService.instance()); + Object copy = valueSource.copyAndResolve(Collections.emptyMap()); + assertThat("iteration: " + i, copy, equalTo(map)); + assertThat("iteration: " + i, copy, not(sameInstance(map))); + } + } + + public void testCopyDoesNotChangeProvidedMap() { + Map myPreciousMap = new HashMap<>(); + myPreciousMap.put("field2", "value2"); + + IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousMap, TestTemplateService.instance())); + ingestDocument.removeField("field1.field2"); + + assertThat(myPreciousMap.size(), equalTo(1)); + assertThat(myPreciousMap.get("field2"), equalTo("value2")); + } + + public void testCopyDoesNotChangeProvidedList() { + List myPreciousList = new ArrayList<>(); + myPreciousList.add("value"); + + IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousList, TestTemplateService.instance())); + ingestDocument.removeField("field1.0"); + + assertThat(myPreciousList.size(), equalTo(1)); + assertThat(myPreciousList.get(0), equalTo("value")); + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorFactoryTests.java deleted file mode 100644 index ee4cb0228a8..00000000000 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorFactoryTests.java +++ /dev/null @@ -1,65 +0,0 @@ -package org.elasticsearch.ingest.processor.meta; - -import com.github.mustachejava.DefaultMustacheFactory; -import com.github.mustachejava.Mustache; -import com.github.mustachejava.MustacheException; -import org.elasticsearch.common.io.FastStringReader; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.ingest.IngestDocument.MetaData; - -public class MetaDataProcessorFactoryTests extends ESTestCase { - - public void testCreate() throws Exception { - MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory(); - Map config = new HashMap<>(); - for (MetaData metaData : MetaData.values()) { - config.put(metaData.getFieldName(), randomBoolean() ? "static text" : "{{expression}}"); - } - MetaDataProcessor processor = factory.create(config); - assertThat(processor.getTemplates().size(), Matchers.equalTo(7)); - assertThat(processor.getTemplates().get(MetaData.INDEX), Matchers.notNullValue()); - assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue()); - assertThat(processor.getTemplates().get(MetaData.ID), Matchers.notNullValue()); - assertThat(processor.getTemplates().get(MetaData.ROUTING), Matchers.notNullValue()); - assertThat(processor.getTemplates().get(MetaData.PARENT), Matchers.notNullValue()); - assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue()); - assertThat(processor.getTemplates().get(MetaData.TTL), Matchers.notNullValue()); - } - - public void testCreateIllegalMetaData() throws Exception { - MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory(); - try { - factory.create(Collections.singletonMap("_field", "text {{expression}}")); - fail("exception should have been thrown"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), Matchers.equalTo("no valid metadata field name [_field]")); - } - } - - public void testCreateIllegalEmpty() throws Exception { - MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory(); - try { - factory.create(Collections.emptyMap()); - fail("exception should have been thrown"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), Matchers.equalTo("no meta fields specified")); - } - } - - public void testIlegalMustacheExpression() throws Exception { - try { - new MetaDataProcessor.Factory().create(Collections.singletonMap("_index", "text {{var")); - fail("exception expected"); - } catch (MustacheException e) { - assertThat(e.getMessage(), Matchers.equalTo("Improperly closed variable in :1")); - } - } - -} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorTests.java deleted file mode 100644 index c102849fdc4..00000000000 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorTests.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.elasticsearch.ingest.processor.meta; - -import com.github.mustachejava.DefaultMustacheFactory; -import com.github.mustachejava.Mustache; -import org.elasticsearch.common.io.FastStringReader; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.ingest.IngestDocument.*; - -public class MetaDataProcessorTests extends ESTestCase { - - public void testExecute() throws Exception { - Map templates = new HashMap<>(); - for (MetaData metaData : MetaData.values()) { - templates.put(metaData, new DefaultMustacheFactory().compile(new FastStringReader("some {{field}}"), "noname")); - } - - MetaDataProcessor processor = new MetaDataProcessor(templates); - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", "value")); - processor.execute(ingestDocument); - - Map metadataMap = ingestDocument.extractMetadata(); - for (MetaData metaData : MetaData.values()) { - assertThat(metadataMap.get(metaData), Matchers.equalTo("some value")); - } - } -} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorFactoryTests.java index f45f3bc59d0..3a370223813 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorFactoryTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorFactoryTests.java @@ -19,27 +19,33 @@ package org.elasticsearch.ingest.processor.remove; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; public class RemoveProcessorFactoryTests extends ESTestCase { + private RemoveProcessor.Factory factory; + + @Before + public void init() { + factory = new RemoveProcessor.Factory(TestTemplateService.instance()); + } + public void testCreate() throws Exception { - RemoveProcessor.Factory factory = new RemoveProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); RemoveProcessor removeProcessor = factory.create(config); - assertThat(removeProcessor.getField(), equalTo("field1")); + assertThat(removeProcessor.getField().execute(Collections.emptyMap()), equalTo("field1")); } public void testCreateMissingField() throws Exception { - RemoveProcessor.Factory factory = new RemoveProcessor.Factory(); Map config = new HashMap<>(); try { factory.create(config); @@ -48,4 +54,5 @@ public class RemoveProcessorFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("required property [field] is missing")); } } + } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java index 2ccfd5add93..9800c48702a 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java @@ -21,13 +21,11 @@ package org.elasticsearch.ingest.processor.remove; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -37,7 +35,7 @@ public class RemoveProcessorTests extends ESTestCase { public void testRemoveFields() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String field = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument); - Processor processor = new RemoveProcessor(field); + Processor processor = new RemoveProcessor(new TestTemplateService.MockTemplate(field)); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(field), equalTo(false)); } @@ -45,7 +43,7 @@ public class RemoveProcessorTests extends ESTestCase { public void testRemoveNonExistingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); - Processor processor = new RemoveProcessor(fieldName); + Processor processor = new RemoveProcessor(new TestTemplateService.MockTemplate(fieldName)); try { processor.execute(ingestDocument); fail("remove field should have failed"); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorFactoryTests.java index 9eb6b2a4907..ddbec7c8546 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorFactoryTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorFactoryTests.java @@ -19,7 +19,9 @@ package org.elasticsearch.ingest.processor.set; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.util.Collections; import java.util.HashMap; @@ -29,18 +31,23 @@ import static org.hamcrest.CoreMatchers.equalTo; public class SetProcessorFactoryTests extends ESTestCase { + private SetProcessor.Factory factory; + + @Before + public void init() { + factory = new SetProcessor.Factory(TestTemplateService.instance()); + } + public void testCreate() throws Exception { - SetProcessor.Factory factory = new SetProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("value", "value1"); SetProcessor setProcessor = factory.create(config); - assertThat(setProcessor.getField(), equalTo("field1")); - assertThat(setProcessor.getValue(), equalTo("value1")); + assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1")); + assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo("value1")); } public void testCreateNoFieldPresent() throws Exception { - SetProcessor.Factory factory = new SetProcessor.Factory(); Map config = new HashMap<>(); config.put("value", "value1"); try { @@ -52,7 +59,6 @@ public class SetProcessorFactoryTests extends ESTestCase { } public void testCreateNoValuePresent() throws Exception { - SetProcessor.Factory factory = new SetProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); try { @@ -64,7 +70,6 @@ public class SetProcessorFactoryTests extends ESTestCase { } public void testCreateNullValue() throws Exception { - SetProcessor.Factory factory = new SetProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("value", null); @@ -75,4 +80,5 @@ public class SetProcessorFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("required property [value] is missing")); } } + } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorTests.java index 7d693066595..9d772602f78 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/set/SetProcessorTests.java @@ -19,10 +19,10 @@ package org.elasticsearch.ingest.processor.set; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.*; import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; import java.util.*; @@ -34,7 +34,7 @@ public class SetProcessorTests extends ESTestCase { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument); Object fieldValue = RandomDocumentPicks.randomFieldValue(random()); - Processor processor = new SetProcessor(fieldName, fieldValue); + Processor processor = createSetProcessor(fieldName, fieldValue); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue)); @@ -46,7 +46,7 @@ public class SetProcessorTests extends ESTestCase { IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Object fieldValue = RandomDocumentPicks.randomFieldValue(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue); - Processor processor = new SetProcessor(fieldName, fieldValue); + Processor processor = createSetProcessor(fieldName, fieldValue); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue)); @@ -55,7 +55,7 @@ public class SetProcessorTests extends ESTestCase { public void testSetFieldsTypeMismatch() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); ingestDocument.setFieldValue("field", "value"); - Processor processor = new SetProcessor("field.inner", "value"); + Processor processor = createSetProcessor("field.inner", "value"); try { processor.execute(ingestDocument); fail("processor execute should have failed"); @@ -63,4 +63,18 @@ public class SetProcessorTests extends ESTestCase { assertThat(e.getMessage(), equalTo("cannot set [inner] with parent object of type [java.lang.String] as part of path [field.inner]")); } } -} \ No newline at end of file + + public void testSetMetadata() throws Exception { + IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); + Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); + } + + private Processor createSetProcessor(String fieldName, Object fieldValue) { + TemplateService templateService = TestTemplateService.instance(); + return new SetProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService)); + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index 8f8a3a3f2ea..a1e3191e16c 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -25,8 +25,9 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.processor.Processor; -import org.elasticsearch.ingest.processor.meta.MetaDataProcessor; +import org.elasticsearch.ingest.processor.set.SetProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; @@ -126,10 +127,11 @@ public class PipelineExecutionServiceTests extends ESTestCase { @SuppressWarnings("unchecked") public void testExecuteTTL() throws Exception { // test with valid ttl - MetaDataProcessor.Factory metaProcessorFactory = new MetaDataProcessor.Factory(); + SetProcessor.Factory metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance()); Map config = new HashMap<>(); - config.put("_ttl", "5d"); - MetaDataProcessor processor = metaProcessorFactory.create(config); + config.put("field", "_ttl"); + config.put("value", "5d"); + Processor processor = metaProcessorFactory.create(config); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); @@ -141,9 +143,10 @@ public class PipelineExecutionServiceTests extends ESTestCase { verify(listener, never()).onFailure(any()); // test with invalid ttl - metaProcessorFactory = new MetaDataProcessor.Factory(); + metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance()); config = new HashMap<>(); - config.put("_ttl", "abc"); + config.put("field", "_ttl"); + config.put("value", "abc"); processor = metaProcessorFactory.create(config); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java index 2f1409b4528..51c1e877de4 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.env.Environment; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; @@ -68,9 +69,10 @@ public class PipelineStoreTests extends ESTestCase { client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); + ScriptService scriptService = mock(ScriptService.class); when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); Environment environment = mock(Environment.class); - store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, Collections.emptyMap()); + store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, () -> scriptService, Collections.emptyMap()); } @After diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml index eb59cada2d0..ca0f58435df 100644 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/60_mutate.yaml @@ -113,3 +113,51 @@ - match: { _source.field_to_join: "127-0-0-1" } - match: { _source.field_to_convert: [127,0,0,1] } - match: { _source.field_to_gsub: "127.0.0.1" } + +--- +"Test metadata": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "_index", + "value" : "surprise" + } + } + ] + } + - match: { _id: "my_pipeline" } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline" + body: {field: "value"} + + - do: + get: + index: surprise + type: test + id: 1 + - length: { _source: 1 } + - match: { _source.field: "value" } + diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_meta_processor.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_meta_processor.yaml deleted file mode 100644 index be13146fb63..00000000000 --- a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_meta_processor.yaml +++ /dev/null @@ -1,45 +0,0 @@ ---- -"Test meta processor": - - do: - cluster.health: - wait_for_status: green - - - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "description": "_description", - "processors": [ - { - "meta" : { - "_index" : "surprise" - } - } - ] - } - - match: { _id: "my_pipeline" } - - # Simulate a Thread.sleep(), because pipeline are updated in the background - - do: - catch: request_timeout - cluster.health: - wait_for_nodes: 99 - timeout: 2s - - match: { "timed_out": true } - - - do: - ingest.index: - index: test - type: test - id: 1 - pipeline_id: "my_pipeline" - body: {field: "value"} - - - do: - get: - index: surprise - type: test - id: 1 - - length: { _source: 1 } - - match: { _source.field: "value" } diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml similarity index 100% rename from plugins/ingest/src/test/resources/rest-api-spec/test/ingest/80_simulate.yaml rename to plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_simulate.yaml diff --git a/qa/ingest-with-mustache/build.gradle b/qa/ingest-with-mustache/build.gradle new file mode 100644 index 00000000000..32ed5f8956f --- /dev/null +++ b/qa/ingest-with-mustache/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testCompile project(path: ':plugins:ingest', configuration: 'runtime') + testCompile project(path: ':modules:lang-mustache', configuration: 'runtime') +} + +integTest { + cluster { + plugin 'ingest', project(':plugins:ingest') + } +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/AbstractMustacheTests.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/AbstractMustacheTests.java new file mode 100644 index 00000000000..bdd37c86d58 --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/AbstractMustacheTests.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.script.ScriptContextRegistry; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.mustache.MustacheScriptEngineService; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Collections; + +public abstract class AbstractMustacheTests extends ESTestCase { + + protected TemplateService templateService; + + @Before + public void init() throws Exception { + Settings settings = Settings.builder() + .put("path.home", createTempDir()) + .put(ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING, false) + .build(); + MustacheScriptEngineService mustache = new MustacheScriptEngineService(settings); + ScriptContextRegistry registry = new ScriptContextRegistry( + Collections.singletonList(InternalTemplateService.INGEST_SCRIPT_CONTEXT) + ); + ScriptService scriptService = new ScriptService( + settings, new Environment(settings), Collections.singleton(mustache), null, registry + ); + templateService = new InternalTemplateService(scriptService); + } + +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestDocumentMustacheIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestDocumentMustacheIT.java new file mode 100644 index 00000000000..8096d3a8d5d --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestDocumentMustacheIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ValueSource; + +import java.util.*; + +import static org.hamcrest.Matchers.equalTo; + +public class IngestDocumentMustacheIT extends AbstractMustacheTests { + + public void testAccessMetaDataViaTemplate() { + Map document = new HashMap<>(); + document.put("foo", "bar"); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); + ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("1 {{foo}}", templateService)); + assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 bar")); + + ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("2 {{_source.foo}}", templateService)); + assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("2 bar")); + } + + public void testAccessMapMetaDataViaTemplate() { + Map document = new HashMap<>(); + Map innerObject = new HashMap<>(); + innerObject.put("bar", "hello bar"); + innerObject.put("baz", "hello baz"); + innerObject.put("qux", Collections.singletonMap("fubar", "hello qux and fubar")); + document.put("foo", innerObject); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); + ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("1 {{foo.bar}} {{foo.baz}} {{foo.qux.fubar}}", templateService)); + assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 hello bar hello baz hello qux and fubar")); + + ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("2 {{_source.foo.bar}} {{_source.foo.baz}} {{_source.foo.qux.fubar}}", templateService)); + assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("2 hello bar hello baz hello qux and fubar")); + } + + public void testAccessListMetaDataViaTemplate() { + Map document = new HashMap<>(); + document.put("list1", Arrays.asList("foo", "bar", null)); + List> list = new ArrayList<>(); + Map value = new HashMap<>(); + value.put("field", "value"); + list.add(value); + list.add(null); + document.put("list2", list); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); + // TODO: fix index based lookups in lists: + ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("1 {{list1}} {{list2}}", templateService)); + assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 [foo, bar, null] [{field=value}, null]")); + + ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("2 {{_source.list1}} {{_source.list2}}", templateService)); + assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("2 [foo, bar, null] [{field=value}, null]")); + } + + public void testAccessIngestMetadataViaTemplate() { + Map document = new HashMap<>(); + Map ingestMap = new HashMap<>(); + ingestMap.put("timestamp", "bogus_timestamp"); + document.put("_ingest", ingestMap); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); + ingestDocument.setFieldValue(templateService.compile("ingest_timestamp"), ValueSource.wrap("{{_ingest.timestamp}} and {{_source._ingest.timestamp}}", templateService)); + assertThat(ingestDocument.getFieldValue("ingest_timestamp", String.class), equalTo(ingestDocument.getIngestMetadata().get("timestamp") + " and bogus_timestamp")); + } + +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheRemoveProcessorIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheRemoveProcessorIT.java new file mode 100644 index 00000000000..9931b15f231 --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheRemoveProcessorIT.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.ingest.processor.remove.RemoveProcessor; +import org.hamcrest.CoreMatchers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class IngestMustacheRemoveProcessorIT extends AbstractMustacheTests { + + public void testRemoveProcessorMustacheExpression() throws Exception { + RemoveProcessor.Factory factory = new RemoveProcessor.Factory(templateService); + Map config = new HashMap<>(); + config.put("field", "field{{var}}"); + RemoveProcessor processor = factory.create(config); + assertThat(processor.getField().execute(Collections.singletonMap("var", "_value")), CoreMatchers.equalTo("field_value")); + } + +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java new file mode 100644 index 00000000000..1cf70543ce1 --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + + +import org.elasticsearch.ingest.*; +import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.processor.set.SetProcessor; +import org.hamcrest.Matchers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; + +public class IngestMustacheSetProcessorIT extends AbstractMustacheTests { + + public void testExpression() throws Exception { + SetProcessor processor = createSetProcessor("_index", "text {{var}}"); + assertThat(processor.getValue(), instanceOf(ValueSource.TemplatedValue.class)); + assertThat(processor.getValue().copyAndResolve(Collections.singletonMap("var", "_value")), equalTo("text _value")); + } + + public void testSetMetadataWithTemplates() throws Exception { + IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); + Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value {{field}}"); + IngestDocument ingestDocument = createIngestDocument(Collections.singletonMap("field", "value")); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value value")); + } + + public void testSetWithTemplates() throws Exception { + IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.INDEX, IngestDocument.MetaData.TYPE, IngestDocument.MetaData.ID); + Processor processor = createSetProcessor("field{{_type}}", "_value {{" + randomMetaData.getFieldName() + "}}"); + IngestDocument ingestDocument = createIngestDocument(new HashMap<>()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue("field_type", String.class), Matchers.equalTo("_value " + ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class))); + } + + private SetProcessor createSetProcessor(String fieldName, Object fieldValue) throws Exception { + SetProcessor.Factory factory = new SetProcessor.Factory(templateService); + Map config = new HashMap<>(); + config.put("field", fieldName); + config.put("value", fieldValue); + return factory.create(config); + } + + private IngestDocument createIngestDocument(Map source) { + return new IngestDocument("_index", "_type", "_id", null, null, null, null, source); + } + +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/TemplateServiceIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/TemplateServiceIT.java new file mode 100644 index 00000000000..6925959e710 --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/TemplateServiceIT.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.script.ScriptException; + +import java.util.*; + +import static org.hamcrest.Matchers.*; + +public class TemplateServiceIT extends AbstractMustacheTests { + + public void testTemplates() { + Map model = new HashMap<>(); + model.put("fielda", "value1"); + model.put("fieldb", Collections.singletonMap("fieldc", "value3")); + + TemplateService.Template template = templateService.compile("{{fielda}}/{{fieldb}}/{{fieldb.fieldc}}"); + assertThat(template.execute(model), equalTo("value1/{fieldc=value3}/value3")); + } + + public void testWrongTemplateUsage() { + Map model = Collections.emptyMap(); + TemplateService.Template template = templateService.compile("value"); + assertThat(template.execute(model), equalTo("value")); + + template = templateService.compile("value {{"); + assertThat(template.execute(model), equalTo("value {{")); + template = templateService.compile("value {{abc"); + assertThat(template.execute(model), equalTo("value {{abc")); + template = templateService.compile("value }}"); + assertThat(template.execute(model), equalTo("value }}")); + template = templateService.compile("value }} {{"); + assertThat(template.execute(model), equalTo("value }} {{")); + } + +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/ValueSourceMustacheIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/ValueSourceMustacheIT.java new file mode 100644 index 00000000000..85fd9561dad --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/ValueSourceMustacheIT.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ValueSource; + +import java.util.*; + +import static org.hamcrest.Matchers.*; + +public class ValueSourceMustacheIT extends AbstractMustacheTests { + + public void testValueSourceWithTemplates() { + Map model = new HashMap<>(); + model.put("field1", "value1"); + model.put("field2", Collections.singletonMap("field3", "value3")); + + ValueSource valueSource = ValueSource.wrap("{{field1}}/{{field2}}/{{field2.field3}}", templateService); + assertThat(valueSource, instanceOf(ValueSource.TemplatedValue.class)); + assertThat(valueSource.copyAndResolve(model), equalTo("value1/{field3=value3}/value3")); + + valueSource = ValueSource.wrap(Arrays.asList("_value", "{{field1}}"), templateService); + assertThat(valueSource, instanceOf(ValueSource.ListValue.class)); + List result = (List) valueSource.copyAndResolve(model); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0), equalTo("_value")); + assertThat(result.get(1), equalTo("value1")); + + Map map = new HashMap<>(); + map.put("field1", "{{field1}}"); + map.put("field2", Collections.singletonMap("field3", "{{field2.field3}}")); + map.put("field4", "_value"); + valueSource = ValueSource.wrap(map, templateService); + assertThat(valueSource, instanceOf(ValueSource.MapValue.class)); + Map resultMap = (Map) valueSource.copyAndResolve(model); + assertThat(resultMap.size(), equalTo(3)); + assertThat(resultMap.get("field1"), equalTo("value1")); + assertThat(((Map) resultMap.get("field2")).size(), equalTo(1)); + assertThat(((Map) resultMap.get("field2")).get("field3"), equalTo("value3")); + assertThat(resultMap.get("field4"), equalTo("_value")); + } + + public void testAccessSourceViaTemplate() { + IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, null, null, new HashMap<>()); + assertThat(ingestDocument.hasField("marvel"), is(false)); + ingestDocument.setFieldValue(templateService.compile("{{_index}}"), ValueSource.wrap("{{_index}}", templateService)); + assertThat(ingestDocument.getFieldValue("marvel", String.class), equalTo("marvel")); + ingestDocument.removeField(templateService.compile("{{marvel}}")); + assertThat(ingestDocument.hasField("index"), is(false)); + } + +} diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/smoketest/IngestWithMustacheIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/smoketest/IngestWithMustacheIT.java new file mode 100644 index 00000000000..73f64d4433c --- /dev/null +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/smoketest/IngestWithMustacheIT.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.smoketest; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.RestTestCandidate; +import org.elasticsearch.test.rest.parser.RestTestParseException; + +import java.io.IOException; + +public class IngestWithMustacheIT extends ESRestTestCase { + + public IngestWithMustacheIT(@Name("yaml") RestTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, RestTestParseException { + return ESRestTestCase.createParameters(0, 1); + } + +} diff --git a/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml b/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml new file mode 100644 index 00000000000..fb0fa9c1083 --- /dev/null +++ b/qa/ingest-with-mustache/src/test/resources/rest-api-spec/test/ingest_mustache/10_pipeline_with_mustache_templates.yaml @@ -0,0 +1,171 @@ +--- +"Test metadata templateing": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline_1" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "index_type_id", + "value": "{{_index}}/{{_type}}/{{_id}}" + } + } + ] + } + - match: { _id: "my_pipeline_1" } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline_1" + body: {} + + - do: + get: + index: test + type: test + id: 1 + - length: { _source: 1 } + - match: { _source.index_type_id: "test/test/1" } + +--- +"Test templateing": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline_1" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field4", + "value": "{{field1}}/{{field2}}/{{field3}}" + } + } + ] + } + - match: { _id: "my_pipeline_1" } + + - do: + ingest.put_pipeline: + id: "my_pipeline_2" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "{{field1}}", + "value": "value" + } + } + ] + } + - match: { _id: "my_pipeline_2" } + + - do: + ingest.put_pipeline: + id: "my_pipeline_3" + body: > + { + "description": "_description", + "processors": [ + { + "remove" : { + "field" : "{{field_to_remove}}" + } + } + ] + } + - match: { _id: "my_pipeline_3" } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline_1" + body: { + field1: "1", + field2: "2", + field3: "3" + } + + - do: + get: + index: test + type: test + id: 1 + - length: { _source: 4 } + - match: { _source.field1: "1" } + - match: { _source.field2: "2" } + - match: { _source.field3: "3" } + - match: { _source.field4: "1/2/3" } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline_2" + body: { + field1: "field2" + } + + - do: + get: + index: test + type: test + id: 1 + - length: { _source: 2 } + - match: { _source.field1: "field2" } + - match: { _source.field2: "value" } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline_3" + body: { + field_to_remove: "field2", + field2: "2", + } + + - do: + get: + index: test + type: test + id: 1 + - length: { _source: 1 } + - match: { _source.field_to_remove: "field2" } diff --git a/settings.gradle b/settings.gradle index 760b4e90c86..9a69d685e66 100644 --- a/settings.gradle +++ b/settings.gradle @@ -39,6 +39,7 @@ List projects = [ 'qa:smoke-test-client', 'qa:smoke-test-multinode', 'qa:smoke-test-plugins', + 'qa:ingest-with-mustache', 'qa:vagrant', ]