Add template infrastructure, removed meta processor and added template support to set and remove processor.
Added ingest wide template infrastructure to IngestDocument Added a TemplateService interface that the ingest framework uses Added a TemplateService implementation that the ingest plugin provides that delegates to the ES' script service Cut SetProcessor over to use the template infrastructure for the `field` and `value` settings. Removed the MetaDataProcessor Removed dependency on mustache library Added qa ingest mustache rest test so that the ingest and mustache integration can be tested.
This commit is contained in:
parent
a56902567e
commit
e8a8e22e09
|
@ -403,57 +403,110 @@ An example that adds the parsed date to the `timestamp` field based on the `init
|
|||
}
|
||||
--------------------------------------------------
|
||||
|
||||
==== Meta processor
|
||||
=== Accessing data in pipelines
|
||||
|
||||
The `meta` processor allows to modify metadata properties of a document being processed.
|
||||
Processors in pipelines have read and write access to documents that pass through the pipeline.
|
||||
The fields in the source of a document and its metadata fields are accessible.
|
||||
|
||||
The following example changes the index of a document to `alternative_index` instead of indexing it into an index
|
||||
that was specified in the index or bulk request:
|
||||
Accessing a field in the source is straightforward and one can refer to fields by
|
||||
their name. For example:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"description" : "...",
|
||||
"processors" : [
|
||||
{
|
||||
"meta" : {
|
||||
"_index" : "alternative_index"
|
||||
}
|
||||
}
|
||||
]
|
||||
"set": {
|
||||
"field": "my_field"
|
||||
"value": 582.1
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
The following metadata attributes can be modified in this processor: `_index`, `_type`, `_id`, `_routing`, `_parent`,
|
||||
`_timestamp` and `_ttl`. All these metadata attributes can be specified in the body of the `meta` processor.
|
||||
|
||||
Also the metadata settings in this processor are templatable which allows metadata field values to be replaced with
|
||||
field values in the source of the document being indexed. The mustache template language is used and anything between
|
||||
`{{` and `}}` can contain a template and point to any field in the source of the document.
|
||||
|
||||
The following example documents being processed end up being indexed into an index based on the resolved city name by
|
||||
the `geoip` processor. (for example `city-amsterdam`)
|
||||
On top of this fields from the source are always accessible via the `_source` prefix:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"description" : "...",
|
||||
"processors" : [
|
||||
{
|
||||
"geoip" : {
|
||||
"source" : "ip"
|
||||
}
|
||||
},
|
||||
{
|
||||
"meta" : {
|
||||
"_index" : "city-{{geoip.city_name}}"
|
||||
}
|
||||
}
|
||||
]
|
||||
"set": {
|
||||
"field": "_source.my_field"
|
||||
"value": 582.1
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
=== Put pipeline API
|
||||
Metadata fields can also be accessed in the same way as fields from the source. This
|
||||
is possible because Elasticsearch doesn't allow fields in the source that have the
|
||||
same name as metadata fields.
|
||||
|
||||
The following example sets the id of a document to `1`:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"set": {
|
||||
"field": "_id"
|
||||
"value": "1"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`, `_parent`,
|
||||
`_timestamp` and `_ttl`.
|
||||
|
||||
Beyond metadata fields and source fields, the ingest plugin also adds ingest metadata to documents being processed.
|
||||
These metadata properties are accessible under the `_ingest` key. Currently the ingest plugin adds the ingest timestamp
|
||||
under `_ingest.timestamp` key to the ingest metadata, which is the time the ingest plugin received the index or bulk
|
||||
request to pre-process. But any processor is free to add more ingest related metadata to it. Ingest metadata is transient
|
||||
and is lost after a document has been processed by the pipeline and thus ingest metadata won't be indexed.
|
||||
|
||||
The following example adds a field with the name `received` and the value is the ingest timestamp:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"set": {
|
||||
"field": "received"
|
||||
"value": "{{_ingest.timestamp}}"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
As opposed to Elasticsearch metadata fields, the ingest metadata field name _ingest can be used as a valid field name
|
||||
in the source of a document. Use _source._ingest to refer to it, otherwise _ingest will be interpreted as ingest
|
||||
metadata fields by the ingest plugin.
|
||||
|
||||
A number of processor settings also support templating. Settings that support templating can have zero or more
|
||||
template snippets. A template snippet begins with `{{` and ends with `}}`.
|
||||
Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
|
||||
|
||||
In this example a field by the name `field_c` is added and its value is a concatenation of
|
||||
the values of `field_a` and `field_b`.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"set": {
|
||||
"field": "field_c"
|
||||
"value": "{{field_a}} {{field_b}}"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
The following example changes the index a document is going to be indexed into. The index a document will be redirected
|
||||
to depends on the field in the source with name `geoip.country_iso_code`.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"set": {
|
||||
"field": "_index"
|
||||
"value": "{{geoip.country_iso_code}}"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
=== Ingest APIs
|
||||
|
||||
==== Put pipeline API
|
||||
|
||||
The put pipeline api adds pipelines and updates existing pipelines in the cluster.
|
||||
|
||||
|
@ -477,7 +530,7 @@ PUT _ingest/pipeline/my-pipeline-id
|
|||
NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all
|
||||
nodes to have the latest version of the pipeline.
|
||||
|
||||
=== Get pipeline API
|
||||
==== Get pipeline API
|
||||
|
||||
The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline.
|
||||
|
||||
|
@ -513,7 +566,7 @@ For each returned pipeline the source and the version is returned.
|
|||
The version is useful for knowing what version of the pipeline the node has.
|
||||
Multiple ids can be provided at the same time. Also wildcards are supported.
|
||||
|
||||
=== Delete pipeline API
|
||||
==== Delete pipeline API
|
||||
|
||||
The delete pipeline api deletes pipelines by id.
|
||||
|
||||
|
|
|
@ -33,7 +33,6 @@ dependencies {
|
|||
compile('com.fasterxml.jackson.core:jackson-databind:2.5.3')
|
||||
compile('com.maxmind.db:maxmind-db:1.0.0')
|
||||
|
||||
compile "com.github.spullara.mustache.java:compiler:0.9.1"
|
||||
compile 'joda-time:joda-time:2.8.2'
|
||||
testCompile 'org.elasticsearch:geolite2-databases:20151029'
|
||||
testCompile 'org.elasticsearch:securemock:1.2'
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
14aec5344639782ee76441401b773946c65eb2b3
|
|
@ -1,14 +0,0 @@
|
|||
Copyright 2010 RightTime, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
|
@ -1 +0,0 @@
|
|||
|
|
@ -30,6 +30,9 @@ import java.util.*;
|
|||
*/
|
||||
public final class IngestDocument {
|
||||
|
||||
public final static String INGEST_KEY = "_ingest";
|
||||
public final static String SOURCE_KEY = "_source";
|
||||
|
||||
static final String TIMESTAMP = "timestamp";
|
||||
|
||||
private final Map<String, Object> sourceAndMetadata;
|
||||
|
@ -149,6 +152,16 @@ public final class IngestDocument {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the field identified by the provided path.
|
||||
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
|
||||
* @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist.
|
||||
*/
|
||||
public void removeField(TemplateService.Template fieldPathTemplate) {
|
||||
Map<String, Object> model = createTemplateModel();
|
||||
removeField(fieldPathTemplate.execute(model));
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the field identified by the provided path.
|
||||
* @param path the path of the field to be removed
|
||||
|
@ -246,12 +259,22 @@ public final class IngestDocument {
|
|||
setFieldValue(path, value, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the provided value to the provided path in the document.
|
||||
* Any non existing path element will be created. If the last element is a list,
|
||||
* the value will replace the existing list.
|
||||
* @param fieldPathTemplate Resolves to the path with dot-notation within the document
|
||||
* @param valueSource The value source that will produce the value to put in for the path key
|
||||
* @throws IllegalArgumentException if the path is null, empty, invalid or if the field doesn't exist.
|
||||
*/
|
||||
public void setFieldValue(TemplateService.Template fieldPathTemplate, ValueSource valueSource) {
|
||||
Map<String, Object> model = createTemplateModel();
|
||||
setFieldValue(fieldPathTemplate.execute(model), valueSource.copyAndResolve(model), false);
|
||||
}
|
||||
|
||||
private void setFieldValue(String path, Object value, boolean append) {
|
||||
FieldPath fieldPath = new FieldPath(path);
|
||||
Object context = fieldPath.initialContext;
|
||||
|
||||
value = deepCopy(value);
|
||||
|
||||
for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
|
||||
String pathElement = fieldPath.pathElements[i];
|
||||
if (context == null) {
|
||||
|
@ -332,6 +355,15 @@ public final class IngestDocument {
|
|||
throw new IllegalArgumentException("field [" + path + "] of type [" + object.getClass().getName() + "] cannot be cast to [" + clazz.getName() + "]");
|
||||
}
|
||||
|
||||
private Map<String, Object> createTemplateModel() {
|
||||
Map<String, Object> model = new HashMap<>(sourceAndMetadata);
|
||||
model.put(SOURCE_KEY, sourceAndMetadata);
|
||||
// If there is a field in the source with the name '_ingest' it gets overwritten here,
|
||||
// if access to that field is required then it get accessed via '_source._ingest'
|
||||
model.put(INGEST_KEY, ingestMetadata);
|
||||
return model;
|
||||
}
|
||||
|
||||
/**
|
||||
* one time operation that extracts the metadata fields from the ingest document and returns them.
|
||||
* Metadata fields that used to be accessible as ordinary top level fields will be removed as part of this call.
|
||||
|
@ -361,32 +393,6 @@ public final class IngestDocument {
|
|||
return this.sourceAndMetadata;
|
||||
}
|
||||
|
||||
static Object deepCopy(Object value) {
|
||||
if (value instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<Object, Object> mapValue = (Map<Object, Object>) value;
|
||||
Map<Object, Object> copy = new HashMap<>(mapValue.size());
|
||||
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
|
||||
copy.put(entry.getKey(), deepCopy(entry.getValue()));
|
||||
}
|
||||
return copy;
|
||||
} else if (value instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Object> listValue = (List<Object>) value;
|
||||
List<Object> copy = new ArrayList<>(listValue.size());
|
||||
for (Object itemValue : listValue) {
|
||||
copy.add(deepCopy(itemValue));
|
||||
}
|
||||
return copy;
|
||||
} else if (value == null || value instanceof String || value instanceof Integer ||
|
||||
value instanceof Long || value instanceof Float ||
|
||||
value instanceof Double || value instanceof Boolean) {
|
||||
return value;
|
||||
} else {
|
||||
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) { return true; }
|
||||
|
@ -431,26 +437,6 @@ public final class IngestDocument {
|
|||
return fieldName;
|
||||
}
|
||||
|
||||
public static MetaData fromString(String value) {
|
||||
switch (value) {
|
||||
case "_index":
|
||||
return INDEX;
|
||||
case "_type":
|
||||
return TYPE;
|
||||
case "_id":
|
||||
return ID;
|
||||
case "_routing":
|
||||
return ROUTING;
|
||||
case "_parent":
|
||||
return PARENT;
|
||||
case "_timestamp":
|
||||
return TIMESTAMP;
|
||||
case "_ttl":
|
||||
return TTL;
|
||||
default:
|
||||
throw new IllegalArgumentException("no valid metadata field name [" + value + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class FieldPath {
|
||||
|
@ -462,12 +448,12 @@ public final class IngestDocument {
|
|||
throw new IllegalArgumentException("path cannot be null nor empty");
|
||||
}
|
||||
String newPath;
|
||||
if (path.startsWith("_ingest.")) {
|
||||
if (path.startsWith(INGEST_KEY + ".")) {
|
||||
initialContext = ingestMetadata;
|
||||
newPath = path.substring(8, path.length());
|
||||
} else {
|
||||
initialContext = sourceAndMetadata;
|
||||
if (path.startsWith("_source.")) {
|
||||
if (path.startsWith(SOURCE_KEY + ".")) {
|
||||
newPath = path.substring(8, path.length());
|
||||
} else {
|
||||
newPath = path;
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Abstraction for the template engine.
|
||||
*/
|
||||
public interface TemplateService {
|
||||
|
||||
Template compile(String template);
|
||||
|
||||
interface Template {
|
||||
|
||||
String execute(Map<String, Object> model);
|
||||
|
||||
String getKey();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Holds a value. If the value is requested a copy is made and optionally template snippets are resolved too.
|
||||
*/
|
||||
public interface ValueSource {
|
||||
|
||||
/**
|
||||
* Returns a copy of the value this ValueSource holds and resolves templates if there're any.
|
||||
*
|
||||
* For immutable values only a copy of the reference to the value is made.
|
||||
*
|
||||
* @param model The model to be used when resolving any templates
|
||||
* @return copy of the wrapped value
|
||||
*/
|
||||
Object copyAndResolve(Map<String, Object> model);
|
||||
|
||||
static ValueSource wrap(Object value, TemplateService templateService) {
|
||||
if (value instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<Object, Object> mapValue = (Map) value;
|
||||
Map<ValueSource, ValueSource> valueTypeMap = new HashMap<>(mapValue.size());
|
||||
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
|
||||
valueTypeMap.put(wrap(entry.getKey(), templateService), wrap(entry.getValue(), templateService));
|
||||
}
|
||||
return new MapValue(valueTypeMap);
|
||||
} else if (value instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Object> listValue = (List) value;
|
||||
List<ValueSource> valueSourceList = new ArrayList<>(listValue.size());
|
||||
for (Object item : listValue) {
|
||||
valueSourceList.add(wrap(item, templateService));
|
||||
}
|
||||
return new ListValue(valueSourceList);
|
||||
} else if (value == null || value instanceof Integer ||
|
||||
value instanceof Long || value instanceof Float ||
|
||||
value instanceof Double || value instanceof Boolean) {
|
||||
return new ObjectValue(value);
|
||||
} else if (value instanceof String) {
|
||||
return new TemplatedValue(templateService.compile((String) value));
|
||||
} else {
|
||||
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
final class MapValue implements ValueSource {
|
||||
|
||||
private final Map<ValueSource, ValueSource> map;
|
||||
|
||||
MapValue(Map<ValueSource, ValueSource> map) {
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object copyAndResolve(Map<String, Object> model) {
|
||||
Map<Object, Object> copy = new HashMap<>();
|
||||
for (Map.Entry<ValueSource, ValueSource> entry : this.map.entrySet()) {
|
||||
copy.put(entry.getKey().copyAndResolve(model), entry.getValue().copyAndResolve(model));
|
||||
}
|
||||
return copy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
MapValue mapValue = (MapValue) o;
|
||||
return map.equals(mapValue.map);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return map.hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
final class ListValue implements ValueSource {
|
||||
|
||||
private final List<ValueSource> values;
|
||||
|
||||
ListValue(List<ValueSource> values) {
|
||||
this.values = values;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object copyAndResolve(Map<String, Object> model) {
|
||||
List<Object> copy = new ArrayList<>(values.size());
|
||||
for (ValueSource value : values) {
|
||||
copy.add(value.copyAndResolve(model));
|
||||
}
|
||||
return copy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
ListValue listValue = (ListValue) o;
|
||||
return values.equals(listValue.values);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return values.hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
final class ObjectValue implements ValueSource {
|
||||
|
||||
private final Object value;
|
||||
|
||||
ObjectValue(Object value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object copyAndResolve(Map<String, Object> model) {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
ObjectValue objectValue = (ObjectValue) o;
|
||||
return Objects.equals(value, objectValue.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(value);
|
||||
}
|
||||
}
|
||||
|
||||
final class TemplatedValue implements ValueSource {
|
||||
|
||||
private final TemplateService.Template template;
|
||||
|
||||
TemplatedValue(TemplateService.Template template) {
|
||||
this.template = template;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object copyAndResolve(Map<String, Object> model) {
|
||||
return template.execute(model);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
TemplatedValue templatedValue = (TemplatedValue) o;
|
||||
return Objects.equals(template.getKey(), templatedValue.template.getKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(template.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
package org.elasticsearch.ingest.processor.meta;
|
||||
|
||||
import com.github.mustachejava.DefaultMustacheFactory;
|
||||
import com.github.mustachejava.Mustache;
|
||||
import com.github.mustachejava.MustacheFactory;
|
||||
import org.elasticsearch.common.io.FastStringReader;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.IngestDocument.MetaData;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.io.StringWriter;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
//TODO this processor needs to be removed, as the set processor allows now to set any field, including metadata ones.
|
||||
//The only reason for it to be still here is that it supports templating, we will remove once any processor supports templating.
|
||||
public final class MetaDataProcessor implements Processor {
|
||||
|
||||
public final static String TYPE = "meta";
|
||||
|
||||
private final Map<MetaData, Mustache> templates;
|
||||
|
||||
public MetaDataProcessor(Map<MetaData, Mustache> templates) {
|
||||
this.templates = templates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(IngestDocument ingestDocument) {
|
||||
Map<String, Object> model = ingestDocument.getSourceAndMetadata();
|
||||
for (Map.Entry<MetaData, Mustache> entry : templates.entrySet()) {
|
||||
StringWriter writer = new StringWriter();
|
||||
entry.getValue().execute(writer, model);
|
||||
ingestDocument.setFieldValue(entry.getKey().getFieldName(), writer.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
Map<MetaData, Mustache> getTemplates() {
|
||||
return templates;
|
||||
}
|
||||
|
||||
public final static class Factory implements Processor.Factory<MetaDataProcessor> {
|
||||
|
||||
private final MustacheFactory mustacheFactory = new DefaultMustacheFactory();
|
||||
|
||||
@Override
|
||||
public MetaDataProcessor create(Map<String, Object> config) throws Exception {
|
||||
Map<MetaData, Mustache> templates = new HashMap<>();
|
||||
Iterator<Map.Entry<String, Object>> iterator = config.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<String, Object> entry = iterator.next();
|
||||
MetaData metaData = MetaData.fromString(entry.getKey());
|
||||
Mustache mustache = mustacheFactory.compile(new FastStringReader(entry.getValue().toString()), "");
|
||||
templates.put(metaData, mustache);
|
||||
iterator.remove();
|
||||
}
|
||||
|
||||
if (templates.isEmpty()) {
|
||||
throw new IllegalArgumentException("no meta fields specified");
|
||||
}
|
||||
|
||||
return new MetaDataProcessor(Collections.unmodifiableMap(templates));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.ingest.processor.remove;
|
||||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
|
@ -32,13 +33,13 @@ public class RemoveProcessor implements Processor {
|
|||
|
||||
public static final String TYPE = "remove";
|
||||
|
||||
private final String field;
|
||||
private final TemplateService.Template field;
|
||||
|
||||
RemoveProcessor(String field) {
|
||||
RemoveProcessor(TemplateService.Template field) {
|
||||
this.field = field;
|
||||
}
|
||||
|
||||
String getField() {
|
||||
public TemplateService.Template getField() {
|
||||
return field;
|
||||
}
|
||||
|
||||
|
@ -53,10 +54,17 @@ public class RemoveProcessor implements Processor {
|
|||
}
|
||||
|
||||
public static class Factory implements Processor.Factory<RemoveProcessor> {
|
||||
|
||||
private final TemplateService templateService;
|
||||
|
||||
public Factory(TemplateService templateService) {
|
||||
this.templateService = templateService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveProcessor create(Map<String, Object> config) throws Exception {
|
||||
String field = ConfigurationUtils.readStringProperty(config, "field");
|
||||
return new RemoveProcessor(field);
|
||||
return new RemoveProcessor(templateService.compile(field));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,11 @@
|
|||
package org.elasticsearch.ingest.processor.set;
|
||||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.ingest.ValueSource;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -35,19 +35,19 @@ public class SetProcessor implements Processor {
|
|||
|
||||
public static final String TYPE = "set";
|
||||
|
||||
private final String field;
|
||||
private final Object value;
|
||||
private final TemplateService.Template field;
|
||||
private final ValueSource value;
|
||||
|
||||
SetProcessor(String field, Object value) {
|
||||
SetProcessor(TemplateService.Template field, ValueSource value) {
|
||||
this.field = field;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
String getField() {
|
||||
public TemplateService.Template getField() {
|
||||
return field;
|
||||
}
|
||||
|
||||
Object getValue() {
|
||||
public ValueSource getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
|
@ -62,11 +62,18 @@ public class SetProcessor implements Processor {
|
|||
}
|
||||
|
||||
public static final class Factory implements Processor.Factory<SetProcessor> {
|
||||
|
||||
private final TemplateService templateService;
|
||||
|
||||
public Factory(TemplateService templateService) {
|
||||
this.templateService = templateService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SetProcessor create(Map<String, Object> config) throws Exception {
|
||||
String field = ConfigurationUtils.readStringProperty(config, "field");
|
||||
Object value = ConfigurationUtils.readObject(config, "value");
|
||||
return new SetProcessor(field, value);
|
||||
return new SetProcessor(templateService.compile(field), ValueSource.wrap(value, templateService));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,15 +34,12 @@ import org.elasticsearch.ingest.processor.rename.RenameProcessor;
|
|||
import org.elasticsearch.ingest.processor.split.SplitProcessor;
|
||||
import org.elasticsearch.ingest.processor.trim.TrimProcessor;
|
||||
import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor;
|
||||
import org.elasticsearch.ingest.processor.meta.MetaDataProcessor;
|
||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.plugin.ingest.PipelineStore.ProcessorFactoryProvider;
|
||||
|
||||
public class IngestModule extends AbstractModule {
|
||||
|
||||
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders = new HashMap<>();
|
||||
|
@ -54,20 +51,19 @@ public class IngestModule extends AbstractModule {
|
|||
binder().bind(PipelineStore.class).asEagerSingleton();
|
||||
binder().bind(SimulateExecutionService.class).asEagerSingleton();
|
||||
|
||||
addProcessor(GeoIpProcessor.TYPE, environment -> new GeoIpProcessor.Factory(environment.configFile()));
|
||||
addProcessor(GrokProcessor.TYPE, environment -> new GrokProcessor.Factory(environment.configFile()));
|
||||
addProcessor(DateProcessor.TYPE, environment -> new DateProcessor.Factory());
|
||||
addProcessor(SetProcessor.TYPE, environment -> new SetProcessor.Factory());
|
||||
addProcessor(RenameProcessor.TYPE, environment -> new RenameProcessor.Factory());
|
||||
addProcessor(RemoveProcessor.TYPE, environment -> new RemoveProcessor.Factory());
|
||||
addProcessor(SplitProcessor.TYPE, environment -> new SplitProcessor.Factory());
|
||||
addProcessor(JoinProcessor.TYPE, environment -> new JoinProcessor.Factory());
|
||||
addProcessor(UppercaseProcessor.TYPE, environment -> new UppercaseProcessor.Factory());
|
||||
addProcessor(LowercaseProcessor.TYPE, environment -> new LowercaseProcessor.Factory());
|
||||
addProcessor(TrimProcessor.TYPE, environment -> new TrimProcessor.Factory());
|
||||
addProcessor(ConvertProcessor.TYPE, environment -> new ConvertProcessor.Factory());
|
||||
addProcessor(GsubProcessor.TYPE, environment -> new GsubProcessor.Factory());
|
||||
addProcessor(MetaDataProcessor.TYPE, environment -> new MetaDataProcessor.Factory());
|
||||
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
|
||||
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
|
||||
addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
|
||||
addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
|
||||
addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
|
||||
addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
|
||||
addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
|
||||
addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
|
||||
addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
|
||||
addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory());
|
||||
addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
|
||||
addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
|
||||
addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
|
||||
|
||||
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
|
||||
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction;
|
|||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction;
|
||||
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransportAction;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -115,4 +116,8 @@ public class IngestPlugin extends Plugin {
|
|||
networkModule.registerRestHandler(RestSimulatePipelineAction.class);
|
||||
}
|
||||
}
|
||||
|
||||
public void onModule(ScriptModule module) {
|
||||
module.registerScriptContext(InternalTemplateService.INGEST_SCRIPT_CONTEXT);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.script.*;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
class InternalTemplateService implements TemplateService {
|
||||
|
||||
public static final ScriptContext.Plugin INGEST_SCRIPT_CONTEXT = new ScriptContext.Plugin("elasticsearch-ingest", "ingest");
|
||||
|
||||
private final ScriptService scriptService;
|
||||
|
||||
InternalTemplateService(ScriptService scriptService) {
|
||||
this.scriptService = scriptService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Template compile(String template) {
|
||||
int mustacheStart = template.indexOf("{{");
|
||||
int mustacheEnd = template.indexOf("}}");
|
||||
if (mustacheStart != -1 && mustacheEnd != -1 && mustacheStart < mustacheEnd) {
|
||||
Script script = new Script(template, ScriptService.ScriptType.INLINE, "mustache", Collections.emptyMap());
|
||||
CompiledScript compiledScript = scriptService.compile(
|
||||
script,
|
||||
INGEST_SCRIPT_CONTEXT,
|
||||
null /* we can supply null here, because ingest doesn't use indexed scripts */,
|
||||
Collections.emptyMap()
|
||||
);
|
||||
return new Template() {
|
||||
@Override
|
||||
public String execute(Map<String, Object> model) {
|
||||
ExecutableScript executableScript = scriptService.executable(compiledScript, model);
|
||||
Object result = executableScript.run();
|
||||
if (result instanceof BytesReference) {
|
||||
return ((BytesReference) result).toUtf8();
|
||||
}
|
||||
return String.valueOf(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return template;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new StringTemplate(template);
|
||||
}
|
||||
}
|
||||
|
||||
class StringTemplate implements Template {
|
||||
|
||||
private final String value;
|
||||
|
||||
public StringTemplate(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String execute(Map<String, Object> model) {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,10 +35,7 @@ import org.elasticsearch.cluster.ClusterService;
|
|||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.SearchScrollIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
|
@ -48,9 +45,11 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
|
||||
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
|
||||
import org.elasticsearch.script.*;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
|
@ -66,35 +65,46 @@ public class PipelineStore extends AbstractLifecycleComponent {
|
|||
public final static String TYPE = "pipeline";
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final Environment environment;
|
||||
private final TimeValue scrollTimeout;
|
||||
private final ClusterService clusterService;
|
||||
private final Provider<Client> clientProvider;
|
||||
private final TimeValue pipelineUpdateInterval;
|
||||
private final Provider<ScriptService> scriptServiceProvider;
|
||||
private final Pipeline.Factory factory = new Pipeline.Factory();
|
||||
private final Map<String, Processor.Factory> processorFactoryRegistry;
|
||||
private volatile Map<String, Processor.Factory> processorFactoryRegistry;
|
||||
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders;
|
||||
|
||||
private volatile Client client;
|
||||
private volatile Map<String, PipelineDefinition> pipelines = new HashMap<>();
|
||||
|
||||
@Inject
|
||||
public PipelineStore(Settings settings, Provider<Client> clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map<String, ProcessorFactoryProvider> processorFactoryProviders) {
|
||||
public PipelineStore(Settings settings, Provider<Client> clientProvider, ThreadPool threadPool,
|
||||
Environment environment, ClusterService clusterService, Provider<ScriptService> scriptServiceProvider,
|
||||
Map<String, ProcessorFactoryProvider> processorFactoryProviders) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.environment = environment;
|
||||
this.clusterService = clusterService;
|
||||
this.clientProvider = clientProvider;
|
||||
this.scriptServiceProvider = scriptServiceProvider;
|
||||
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
|
||||
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
|
||||
Map<String, Processor.Factory> processorFactories = new HashMap<>();
|
||||
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
|
||||
Processor.Factory processorFactory = entry.getValue().get(environment);
|
||||
processorFactories.put(entry.getKey(), processorFactory);
|
||||
}
|
||||
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);
|
||||
this.processorFactoryProviders = processorFactoryProviders;
|
||||
|
||||
clusterService.add(new PipelineStoreListener());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
// TODO this will be better when #15203 gets in:
|
||||
Map<String, Processor.Factory> processorFactories = new HashMap<>();
|
||||
TemplateService templateService = new InternalTemplateService(scriptServiceProvider.get());
|
||||
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
|
||||
Processor.Factory processorFactory = entry.getValue().get(environment, templateService);
|
||||
processorFactories.put(entry.getKey(), processorFactory);
|
||||
}
|
||||
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,7 +259,6 @@ public class PipelineStore extends AbstractLifecycleComponent {
|
|||
return SearchScrollIterator.createIterator(client(), scrollTimeout, searchRequest);
|
||||
}
|
||||
|
||||
|
||||
private Client client() {
|
||||
if (client == null) {
|
||||
client = clientProvider.get();
|
||||
|
@ -257,19 +266,6 @@ public class PipelineStore extends AbstractLifecycleComponent {
|
|||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
|
||||
* processors rely on reading files from the config directory. We can't add Environment as a constructor parameter,
|
||||
* so we need some code that provides the physical location of the configuration directory to the processor factories
|
||||
* that need this and this is what this processor factory provider does.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
interface ProcessorFactoryProvider {
|
||||
|
||||
Processor.Factory get(Environment environment);
|
||||
|
||||
}
|
||||
|
||||
class Updater implements Runnable {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
/**
|
||||
* The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
|
||||
* processors rely on reading files from the config directory. We can't add Environment as a constructor parameter,
|
||||
* so we need some code that provides the physical location of the configuration directory to the processor factories
|
||||
* that need this and this is what this processor factory provider does.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
interface ProcessorFactoryProvider {
|
||||
|
||||
Processor.Factory get(Environment environment, TemplateService templateService);
|
||||
|
||||
}
|
|
@ -22,8 +22,6 @@ package org.elasticsearch.ingest;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
@ -381,7 +379,7 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
}
|
||||
|
||||
try {
|
||||
ingestDocument.setFieldValue("_ingest.", Object.class);
|
||||
ingestDocument.setFieldValue("_ingest.", "_value");
|
||||
fail("set field value should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("path [_ingest.] is not valid"));
|
||||
|
@ -605,7 +603,7 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
|
||||
public void testRemoveNullField() {
|
||||
try {
|
||||
ingestDocument.removeField(null);
|
||||
ingestDocument.removeField((String) null);
|
||||
fail("remove field should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("path cannot be null nor empty"));
|
||||
|
@ -677,57 +675,4 @@ public class IngestDocumentTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDeepCopy() {
|
||||
int iterations = scaledRandomIntBetween(8, 64);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
Map<String, Object> map = RandomDocumentPicks.randomSource(random());
|
||||
Object copy = IngestDocument.deepCopy(map);
|
||||
assertThat("iteration: " + i, copy, equalTo(map));
|
||||
assertThat("iteration: " + i, copy, not(sameInstance(map)));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeepCopyDoesNotChangeProvidedMap() {
|
||||
Map<String, Object> myPreciousMap = new HashMap<>();
|
||||
myPreciousMap.put("field2", "value2");
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, new HashMap<>());
|
||||
ingestDocument.setFieldValue("field1", myPreciousMap);
|
||||
ingestDocument.removeField("field1.field2");
|
||||
|
||||
assertThat(myPreciousMap.size(), equalTo(1));
|
||||
assertThat(myPreciousMap.get("field2"), equalTo("value2"));
|
||||
}
|
||||
|
||||
public void testDeepCopyDoesNotChangeProvidedList() {
|
||||
List<String> myPreciousList = new ArrayList<>();
|
||||
myPreciousList.add("value");
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, new HashMap<>());
|
||||
ingestDocument.setFieldValue("field1", myPreciousList);
|
||||
ingestDocument.removeField("field1.0");
|
||||
|
||||
assertThat(myPreciousList.size(), equalTo(1));
|
||||
assertThat(myPreciousList.get(0), equalTo("value"));
|
||||
}
|
||||
|
||||
public void testIngestMetadataTimestamp() throws Exception {
|
||||
long before = System.currentTimeMillis();
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
long after = System.currentTimeMillis();
|
||||
String timestampString = ingestDocument.getIngestMetadata().get("timestamp");
|
||||
assertThat(timestampString, notNullValue());
|
||||
assertThat(timestampString, endsWith("+0000"));
|
||||
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT);
|
||||
Date timestamp = df.parse(timestampString);
|
||||
assertThat(timestamp.getTime(), greaterThanOrEqualTo(before));
|
||||
assertThat(timestamp.getTime(), lessThanOrEqualTo(after));
|
||||
}
|
||||
|
||||
public void testCopyConstructor() {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
IngestDocument copy = new IngestDocument(ingestDocument);
|
||||
assertThat(ingestDocument.getSourceAndMetadata(), not(sameInstance(copy.getSourceAndMetadata())));
|
||||
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(copy.getSourceAndMetadata()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class TestTemplateService implements TemplateService {
|
||||
|
||||
public static TemplateService instance() {
|
||||
return new TestTemplateService();
|
||||
}
|
||||
|
||||
private TestTemplateService() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Template compile(String template) {
|
||||
return new MockTemplate(template);
|
||||
}
|
||||
|
||||
public static class MockTemplate implements TemplateService.Template {
|
||||
|
||||
private final String expected;
|
||||
|
||||
public MockTemplate(String expected) {
|
||||
this.expected = expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String execute(Map<String, Object> model) {
|
||||
return expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return expected;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
public class ValueSourceTests extends ESTestCase {
|
||||
|
||||
public void testDeepCopy() {
|
||||
int iterations = scaledRandomIntBetween(8, 64);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
Map<String, Object> map = RandomDocumentPicks.randomSource(random());
|
||||
ValueSource valueSource = ValueSource.wrap(map, TestTemplateService.instance());
|
||||
Object copy = valueSource.copyAndResolve(Collections.emptyMap());
|
||||
assertThat("iteration: " + i, copy, equalTo(map));
|
||||
assertThat("iteration: " + i, copy, not(sameInstance(map)));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCopyDoesNotChangeProvidedMap() {
|
||||
Map<String, Object> myPreciousMap = new HashMap<>();
|
||||
myPreciousMap.put("field2", "value2");
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||
ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousMap, TestTemplateService.instance()));
|
||||
ingestDocument.removeField("field1.field2");
|
||||
|
||||
assertThat(myPreciousMap.size(), equalTo(1));
|
||||
assertThat(myPreciousMap.get("field2"), equalTo("value2"));
|
||||
}
|
||||
|
||||
public void testCopyDoesNotChangeProvidedList() {
|
||||
List<String> myPreciousList = new ArrayList<>();
|
||||
myPreciousList.add("value");
|
||||
|
||||
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||
ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousList, TestTemplateService.instance()));
|
||||
ingestDocument.removeField("field1.0");
|
||||
|
||||
assertThat(myPreciousList.size(), equalTo(1));
|
||||
assertThat(myPreciousList.get(0), equalTo("value"));
|
||||
}
|
||||
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
package org.elasticsearch.ingest.processor.meta;
|
||||
|
||||
import com.github.mustachejava.DefaultMustacheFactory;
|
||||
import com.github.mustachejava.Mustache;
|
||||
import com.github.mustachejava.MustacheException;
|
||||
import org.elasticsearch.common.io.FastStringReader;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.ingest.IngestDocument.MetaData;
|
||||
|
||||
public class MetaDataProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
public void testCreate() throws Exception {
|
||||
MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
for (MetaData metaData : MetaData.values()) {
|
||||
config.put(metaData.getFieldName(), randomBoolean() ? "static text" : "{{expression}}");
|
||||
}
|
||||
MetaDataProcessor processor = factory.create(config);
|
||||
assertThat(processor.getTemplates().size(), Matchers.equalTo(7));
|
||||
assertThat(processor.getTemplates().get(MetaData.INDEX), Matchers.notNullValue());
|
||||
assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue());
|
||||
assertThat(processor.getTemplates().get(MetaData.ID), Matchers.notNullValue());
|
||||
assertThat(processor.getTemplates().get(MetaData.ROUTING), Matchers.notNullValue());
|
||||
assertThat(processor.getTemplates().get(MetaData.PARENT), Matchers.notNullValue());
|
||||
assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue());
|
||||
assertThat(processor.getTemplates().get(MetaData.TTL), Matchers.notNullValue());
|
||||
}
|
||||
|
||||
public void testCreateIllegalMetaData() throws Exception {
|
||||
MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory();
|
||||
try {
|
||||
factory.create(Collections.singletonMap("_field", "text {{expression}}"));
|
||||
fail("exception should have been thrown");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), Matchers.equalTo("no valid metadata field name [_field]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCreateIllegalEmpty() throws Exception {
|
||||
MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory();
|
||||
try {
|
||||
factory.create(Collections.emptyMap());
|
||||
fail("exception should have been thrown");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), Matchers.equalTo("no meta fields specified"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testIlegalMustacheExpression() throws Exception {
|
||||
try {
|
||||
new MetaDataProcessor.Factory().create(Collections.singletonMap("_index", "text {{var"));
|
||||
fail("exception expected");
|
||||
} catch (MustacheException e) {
|
||||
assertThat(e.getMessage(), Matchers.equalTo("Improperly closed variable in :1"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
package org.elasticsearch.ingest.processor.meta;
|
||||
|
||||
import com.github.mustachejava.DefaultMustacheFactory;
|
||||
import com.github.mustachejava.Mustache;
|
||||
import org.elasticsearch.common.io.FastStringReader;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.ingest.IngestDocument.*;
|
||||
|
||||
public class MetaDataProcessorTests extends ESTestCase {
|
||||
|
||||
public void testExecute() throws Exception {
|
||||
Map<IngestDocument.MetaData, Mustache> templates = new HashMap<>();
|
||||
for (MetaData metaData : MetaData.values()) {
|
||||
templates.put(metaData, new DefaultMustacheFactory().compile(new FastStringReader("some {{field}}"), "noname"));
|
||||
}
|
||||
|
||||
MetaDataProcessor processor = new MetaDataProcessor(templates);
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", "value"));
|
||||
processor.execute(ingestDocument);
|
||||
|
||||
Map<MetaData, String> metadataMap = ingestDocument.extractMetadata();
|
||||
for (MetaData metaData : MetaData.values()) {
|
||||
assertThat(metadataMap.get(metaData), Matchers.equalTo("some value"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,27 +19,33 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.remove;
|
||||
|
||||
import org.elasticsearch.ingest.TestTemplateService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
||||
public class RemoveProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
private RemoveProcessor.Factory factory;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
factory = new RemoveProcessor.Factory(TestTemplateService.instance());
|
||||
}
|
||||
|
||||
public void testCreate() throws Exception {
|
||||
RemoveProcessor.Factory factory = new RemoveProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field1");
|
||||
RemoveProcessor removeProcessor = factory.create(config);
|
||||
assertThat(removeProcessor.getField(), equalTo("field1"));
|
||||
assertThat(removeProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
|
||||
}
|
||||
|
||||
public void testCreateMissingField() throws Exception {
|
||||
RemoveProcessor.Factory factory = new RemoveProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
try {
|
||||
factory.create(config);
|
||||
|
@ -48,4 +54,5 @@ public class RemoveProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(e.getMessage(), equalTo("required property [field] is missing"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,13 +21,11 @@ package org.elasticsearch.ingest.processor.remove;
|
|||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.ingest.TestTemplateService;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -37,7 +35,7 @@ public class RemoveProcessorTests extends ESTestCase {
|
|||
public void testRemoveFields() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String field = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
|
||||
Processor processor = new RemoveProcessor(field);
|
||||
Processor processor = new RemoveProcessor(new TestTemplateService.MockTemplate(field));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(field), equalTo(false));
|
||||
}
|
||||
|
@ -45,7 +43,7 @@ public class RemoveProcessorTests extends ESTestCase {
|
|||
public void testRemoveNonExistingField() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
String fieldName = RandomDocumentPicks.randomFieldName(random());
|
||||
Processor processor = new RemoveProcessor(fieldName);
|
||||
Processor processor = new RemoveProcessor(new TestTemplateService.MockTemplate(fieldName));
|
||||
try {
|
||||
processor.execute(ingestDocument);
|
||||
fail("remove field should have failed");
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.set;
|
||||
|
||||
import org.elasticsearch.ingest.TestTemplateService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -29,18 +31,23 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
|||
|
||||
public class SetProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
private SetProcessor.Factory factory;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
factory = new SetProcessor.Factory(TestTemplateService.instance());
|
||||
}
|
||||
|
||||
public void testCreate() throws Exception {
|
||||
SetProcessor.Factory factory = new SetProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field1");
|
||||
config.put("value", "value1");
|
||||
SetProcessor setProcessor = factory.create(config);
|
||||
assertThat(setProcessor.getField(), equalTo("field1"));
|
||||
assertThat(setProcessor.getValue(), equalTo("value1"));
|
||||
assertThat(setProcessor.getField().execute(Collections.emptyMap()), equalTo("field1"));
|
||||
assertThat(setProcessor.getValue().copyAndResolve(Collections.emptyMap()), equalTo("value1"));
|
||||
}
|
||||
|
||||
public void testCreateNoFieldPresent() throws Exception {
|
||||
SetProcessor.Factory factory = new SetProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("value", "value1");
|
||||
try {
|
||||
|
@ -52,7 +59,6 @@ public class SetProcessorFactoryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testCreateNoValuePresent() throws Exception {
|
||||
SetProcessor.Factory factory = new SetProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field1");
|
||||
try {
|
||||
|
@ -64,7 +70,6 @@ public class SetProcessorFactoryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testCreateNullValue() throws Exception {
|
||||
SetProcessor.Factory factory = new SetProcessor.Factory();
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field1");
|
||||
config.put("value", null);
|
||||
|
@ -75,4 +80,5 @@ public class SetProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(e.getMessage(), equalTo("required property [value] is missing"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,10 +19,10 @@
|
|||
|
||||
package org.elasticsearch.ingest.processor.set;
|
||||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.elasticsearch.ingest.*;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
|
@ -34,7 +34,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
|
||||
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
|
||||
Processor processor = new SetProcessor(fieldName, fieldValue);
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
|
@ -46,7 +46,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
|
||||
Processor processor = new SetProcessor(fieldName, fieldValue);
|
||||
Processor processor = createSetProcessor(fieldName, fieldValue);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
|
||||
|
@ -55,7 +55,7 @@ public class SetProcessorTests extends ESTestCase {
|
|||
public void testSetFieldsTypeMismatch() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||
ingestDocument.setFieldValue("field", "value");
|
||||
Processor processor = new SetProcessor("field.inner", "value");
|
||||
Processor processor = createSetProcessor("field.inner", "value");
|
||||
try {
|
||||
processor.execute(ingestDocument);
|
||||
fail("processor execute should have failed");
|
||||
|
@ -63,4 +63,18 @@ public class SetProcessorTests extends ESTestCase {
|
|||
assertThat(e.getMessage(), equalTo("cannot set [inner] with parent object of type [java.lang.String] as part of path [field.inner]"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSetMetadata() throws Exception {
|
||||
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
|
||||
Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value");
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value"));
|
||||
}
|
||||
|
||||
private Processor createSetProcessor(String fieldName, Object fieldValue) {
|
||||
TemplateService templateService = TestTemplateService.instance();
|
||||
return new SetProcessor(templateService.compile(fieldName), ValueSource.wrap(fieldValue, templateService));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,8 +25,9 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.TestTemplateService;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.ingest.processor.meta.MetaDataProcessor;
|
||||
import org.elasticsearch.ingest.processor.set.SetProcessor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
|
@ -126,10 +127,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
@SuppressWarnings("unchecked")
|
||||
public void testExecuteTTL() throws Exception {
|
||||
// test with valid ttl
|
||||
MetaDataProcessor.Factory metaProcessorFactory = new MetaDataProcessor.Factory();
|
||||
SetProcessor.Factory metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("_ttl", "5d");
|
||||
MetaDataProcessor processor = metaProcessorFactory.create(config);
|
||||
config.put("field", "_ttl");
|
||||
config.put("value", "5d");
|
||||
Processor processor = metaProcessorFactory.create(config);
|
||||
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
|
||||
|
@ -141,9 +143,10 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
verify(listener, never()).onFailure(any());
|
||||
|
||||
// test with invalid ttl
|
||||
metaProcessorFactory = new MetaDataProcessor.Factory();
|
||||
metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
|
||||
config = new HashMap<>();
|
||||
config.put("_ttl", "abc");
|
||||
config.put("field", "_ttl");
|
||||
config.put("value", "abc");
|
||||
processor = metaProcessorFactory.create(config);
|
||||
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
|
@ -68,9 +69,10 @@ public class PipelineStoreTests extends ESTestCase {
|
|||
client = mock(Client.class);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
ScriptService scriptService = mock(ScriptService.class);
|
||||
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
|
||||
Environment environment = mock(Environment.class);
|
||||
store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, Collections.emptyMap());
|
||||
store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, () -> scriptService, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -113,3 +113,51 @@
|
|||
- match: { _source.field_to_join: "127-0-0-1" }
|
||||
- match: { _source.field_to_convert: [127,0,0,1] }
|
||||
- match: { _source.field_to_gsub: "127.0.0.1" }
|
||||
|
||||
---
|
||||
"Test metadata":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "_index",
|
||||
"value" : "surprise"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 99
|
||||
timeout: 2s
|
||||
- match: { "timed_out": true }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field: "value"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: surprise
|
||||
type: test
|
||||
id: 1
|
||||
- length: { _source: 1 }
|
||||
- match: { _source.field: "value" }
|
||||
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
---
|
||||
"Test meta processor":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"meta" : {
|
||||
"_index" : "surprise"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 99
|
||||
timeout: 2s
|
||||
- match: { "timed_out": true }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field: "value"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: surprise
|
||||
type: test
|
||||
id: 1
|
||||
- length: { _source: 1 }
|
||||
- match: { _source.field: "value" }
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
apply plugin: 'elasticsearch.rest-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':plugins:ingest', configuration: 'runtime')
|
||||
testCompile project(path: ':modules:lang-mustache', configuration: 'runtime')
|
||||
}
|
||||
|
||||
integTest {
|
||||
cluster {
|
||||
plugin 'ingest', project(':plugins:ingest')
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.script.ScriptContextRegistry;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.mustache.MustacheScriptEngineService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
public abstract class AbstractMustacheTests extends ESTestCase {
|
||||
|
||||
protected TemplateService templateService;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put("path.home", createTempDir())
|
||||
.put(ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING, false)
|
||||
.build();
|
||||
MustacheScriptEngineService mustache = new MustacheScriptEngineService(settings);
|
||||
ScriptContextRegistry registry = new ScriptContextRegistry(
|
||||
Collections.singletonList(InternalTemplateService.INGEST_SCRIPT_CONTEXT)
|
||||
);
|
||||
ScriptService scriptService = new ScriptService(
|
||||
settings, new Environment(settings), Collections.singleton(mustache), null, registry
|
||||
);
|
||||
templateService = new InternalTemplateService(scriptService);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.ValueSource;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class IngestDocumentMustacheIT extends AbstractMustacheTests {
|
||||
|
||||
public void testAccessMetaDataViaTemplate() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("foo", "bar");
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
|
||||
ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("1 {{foo}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 bar"));
|
||||
|
||||
ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("2 {{_source.foo}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("2 bar"));
|
||||
}
|
||||
|
||||
public void testAccessMapMetaDataViaTemplate() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
Map<String, Object> innerObject = new HashMap<>();
|
||||
innerObject.put("bar", "hello bar");
|
||||
innerObject.put("baz", "hello baz");
|
||||
innerObject.put("qux", Collections.singletonMap("fubar", "hello qux and fubar"));
|
||||
document.put("foo", innerObject);
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
|
||||
ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("1 {{foo.bar}} {{foo.baz}} {{foo.qux.fubar}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 hello bar hello baz hello qux and fubar"));
|
||||
|
||||
ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("2 {{_source.foo.bar}} {{_source.foo.baz}} {{_source.foo.qux.fubar}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("2 hello bar hello baz hello qux and fubar"));
|
||||
}
|
||||
|
||||
public void testAccessListMetaDataViaTemplate() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
document.put("list1", Arrays.asList("foo", "bar", null));
|
||||
List<Map<String, Object>> list = new ArrayList<>();
|
||||
Map<String, Object> value = new HashMap<>();
|
||||
value.put("field", "value");
|
||||
list.add(value);
|
||||
list.add(null);
|
||||
document.put("list2", list);
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
|
||||
// TODO: fix index based lookups in lists:
|
||||
ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("1 {{list1}} {{list2}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 [foo, bar, null] [{field=value}, null]"));
|
||||
|
||||
ingestDocument.setFieldValue(templateService.compile("field1"), ValueSource.wrap("2 {{_source.list1}} {{_source.list2}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("2 [foo, bar, null] [{field=value}, null]"));
|
||||
}
|
||||
|
||||
public void testAccessIngestMetadataViaTemplate() {
|
||||
Map<String, Object> document = new HashMap<>();
|
||||
Map<String, Object> ingestMap = new HashMap<>();
|
||||
ingestMap.put("timestamp", "bogus_timestamp");
|
||||
document.put("_ingest", ingestMap);
|
||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
|
||||
ingestDocument.setFieldValue(templateService.compile("ingest_timestamp"), ValueSource.wrap("{{_ingest.timestamp}} and {{_source._ingest.timestamp}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("ingest_timestamp", String.class), equalTo(ingestDocument.getIngestMetadata().get("timestamp") + " and bogus_timestamp"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.ingest.processor.remove.RemoveProcessor;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class IngestMustacheRemoveProcessorIT extends AbstractMustacheTests {
|
||||
|
||||
public void testRemoveProcessorMustacheExpression() throws Exception {
|
||||
RemoveProcessor.Factory factory = new RemoveProcessor.Factory(templateService);
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "field{{var}}");
|
||||
RemoveProcessor processor = factory.create(config);
|
||||
assertThat(processor.getField().execute(Collections.singletonMap("var", "_value")), CoreMatchers.equalTo("field_value"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
|
||||
import org.elasticsearch.ingest.*;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
import org.elasticsearch.ingest.processor.set.SetProcessor;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
|
||||
public class IngestMustacheSetProcessorIT extends AbstractMustacheTests {
|
||||
|
||||
public void testExpression() throws Exception {
|
||||
SetProcessor processor = createSetProcessor("_index", "text {{var}}");
|
||||
assertThat(processor.getValue(), instanceOf(ValueSource.TemplatedValue.class));
|
||||
assertThat(processor.getValue().copyAndResolve(Collections.singletonMap("var", "_value")), equalTo("text _value"));
|
||||
}
|
||||
|
||||
public void testSetMetadataWithTemplates() throws Exception {
|
||||
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
|
||||
Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value {{field}}");
|
||||
IngestDocument ingestDocument = createIngestDocument(Collections.singletonMap("field", "value"));
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value value"));
|
||||
}
|
||||
|
||||
public void testSetWithTemplates() throws Exception {
|
||||
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.INDEX, IngestDocument.MetaData.TYPE, IngestDocument.MetaData.ID);
|
||||
Processor processor = createSetProcessor("field{{_type}}", "_value {{" + randomMetaData.getFieldName() + "}}");
|
||||
IngestDocument ingestDocument = createIngestDocument(new HashMap<>());
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue("field_type", String.class), Matchers.equalTo("_value " + ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class)));
|
||||
}
|
||||
|
||||
private SetProcessor createSetProcessor(String fieldName, Object fieldValue) throws Exception {
|
||||
SetProcessor.Factory factory = new SetProcessor.Factory(templateService);
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", fieldName);
|
||||
config.put("value", fieldValue);
|
||||
return factory.create(config);
|
||||
}
|
||||
|
||||
private IngestDocument createIngestDocument(Map<String, Object> source) {
|
||||
return new IngestDocument("_index", "_type", "_id", null, null, null, null, source);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.ingest.TemplateService;
|
||||
import org.elasticsearch.script.ScriptException;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
public class TemplateServiceIT extends AbstractMustacheTests {
|
||||
|
||||
public void testTemplates() {
|
||||
Map<String, Object> model = new HashMap<>();
|
||||
model.put("fielda", "value1");
|
||||
model.put("fieldb", Collections.singletonMap("fieldc", "value3"));
|
||||
|
||||
TemplateService.Template template = templateService.compile("{{fielda}}/{{fieldb}}/{{fieldb.fieldc}}");
|
||||
assertThat(template.execute(model), equalTo("value1/{fieldc=value3}/value3"));
|
||||
}
|
||||
|
||||
public void testWrongTemplateUsage() {
|
||||
Map<String, Object> model = Collections.emptyMap();
|
||||
TemplateService.Template template = templateService.compile("value");
|
||||
assertThat(template.execute(model), equalTo("value"));
|
||||
|
||||
template = templateService.compile("value {{");
|
||||
assertThat(template.execute(model), equalTo("value {{"));
|
||||
template = templateService.compile("value {{abc");
|
||||
assertThat(template.execute(model), equalTo("value {{abc"));
|
||||
template = templateService.compile("value }}");
|
||||
assertThat(template.execute(model), equalTo("value }}"));
|
||||
template = templateService.compile("value }} {{");
|
||||
assertThat(template.execute(model), equalTo("value }} {{"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.ValueSource;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
public class ValueSourceMustacheIT extends AbstractMustacheTests {
|
||||
|
||||
public void testValueSourceWithTemplates() {
|
||||
Map<String, Object> model = new HashMap<>();
|
||||
model.put("field1", "value1");
|
||||
model.put("field2", Collections.singletonMap("field3", "value3"));
|
||||
|
||||
ValueSource valueSource = ValueSource.wrap("{{field1}}/{{field2}}/{{field2.field3}}", templateService);
|
||||
assertThat(valueSource, instanceOf(ValueSource.TemplatedValue.class));
|
||||
assertThat(valueSource.copyAndResolve(model), equalTo("value1/{field3=value3}/value3"));
|
||||
|
||||
valueSource = ValueSource.wrap(Arrays.asList("_value", "{{field1}}"), templateService);
|
||||
assertThat(valueSource, instanceOf(ValueSource.ListValue.class));
|
||||
List<String> result = (List<String>) valueSource.copyAndResolve(model);
|
||||
assertThat(result.size(), equalTo(2));
|
||||
assertThat(result.get(0), equalTo("_value"));
|
||||
assertThat(result.get(1), equalTo("value1"));
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("field1", "{{field1}}");
|
||||
map.put("field2", Collections.singletonMap("field3", "{{field2.field3}}"));
|
||||
map.put("field4", "_value");
|
||||
valueSource = ValueSource.wrap(map, templateService);
|
||||
assertThat(valueSource, instanceOf(ValueSource.MapValue.class));
|
||||
Map<String, Object> resultMap = (Map<String, Object>) valueSource.copyAndResolve(model);
|
||||
assertThat(resultMap.size(), equalTo(3));
|
||||
assertThat(resultMap.get("field1"), equalTo("value1"));
|
||||
assertThat(((Map) resultMap.get("field2")).size(), equalTo(1));
|
||||
assertThat(((Map) resultMap.get("field2")).get("field3"), equalTo("value3"));
|
||||
assertThat(resultMap.get("field4"), equalTo("_value"));
|
||||
}
|
||||
|
||||
public void testAccessSourceViaTemplate() {
|
||||
IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, null, null, new HashMap<>());
|
||||
assertThat(ingestDocument.hasField("marvel"), is(false));
|
||||
ingestDocument.setFieldValue(templateService.compile("{{_index}}"), ValueSource.wrap("{{_index}}", templateService));
|
||||
assertThat(ingestDocument.getFieldValue("marvel", String.class), equalTo("marvel"));
|
||||
ingestDocument.removeField(templateService.compile("{{marvel}}"));
|
||||
assertThat(ingestDocument.hasField("index"), is(false));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.smoketest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.test.rest.RestTestCandidate;
|
||||
import org.elasticsearch.test.rest.parser.RestTestParseException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class IngestWithMustacheIT extends ESRestTestCase {
|
||||
|
||||
public IngestWithMustacheIT(@Name("yaml") RestTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
|
||||
return ESRestTestCase.createParameters(0, 1);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
---
|
||||
"Test metadata templateing":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline_1"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "index_type_id",
|
||||
"value": "{{_index}}/{{_type}}/{{_id}}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_1" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 99
|
||||
timeout: 2s
|
||||
- match: { "timed_out": true }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline_1"
|
||||
body: {}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- length: { _source: 1 }
|
||||
- match: { _source.index_type_id: "test/test/1" }
|
||||
|
||||
---
|
||||
"Test templateing":
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: green
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline_1"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "field4",
|
||||
"value": "{{field1}}/{{field2}}/{{field3}}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_1" }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline_2"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "{{field1}}",
|
||||
"value": "value"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_2" }
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline_3"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"remove" : {
|
||||
"field" : "{{field_to_remove}}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline_3" }
|
||||
|
||||
# Simulate a Thread.sleep(), because pipeline are updated in the background
|
||||
- do:
|
||||
catch: request_timeout
|
||||
cluster.health:
|
||||
wait_for_nodes: 99
|
||||
timeout: 2s
|
||||
- match: { "timed_out": true }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline_1"
|
||||
body: {
|
||||
field1: "1",
|
||||
field2: "2",
|
||||
field3: "3"
|
||||
}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- length: { _source: 4 }
|
||||
- match: { _source.field1: "1" }
|
||||
- match: { _source.field2: "2" }
|
||||
- match: { _source.field3: "3" }
|
||||
- match: { _source.field4: "1/2/3" }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline_2"
|
||||
body: {
|
||||
field1: "field2"
|
||||
}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- length: { _source: 2 }
|
||||
- match: { _source.field1: "field2" }
|
||||
- match: { _source.field2: "value" }
|
||||
|
||||
- do:
|
||||
ingest.index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline_id: "my_pipeline_3"
|
||||
body: {
|
||||
field_to_remove: "field2",
|
||||
field2: "2",
|
||||
}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- length: { _source: 1 }
|
||||
- match: { _source.field_to_remove: "field2" }
|
|
@ -39,6 +39,7 @@ List projects = [
|
|||
'qa:smoke-test-client',
|
||||
'qa:smoke-test-multinode',
|
||||
'qa:smoke-test-plugins',
|
||||
'qa:ingest-with-mustache',
|
||||
'qa:vagrant',
|
||||
]
|
||||
|
||||
|
|
Loading…
Reference in New Issue