diff --git a/docs/plugins/ingest.asciidoc b/docs/plugins/ingest.asciidoc index 3532204fee2..9c1877f2a49 100644 --- a/docs/plugins/ingest.asciidoc +++ b/docs/plugins/ingest.asciidoc @@ -396,6 +396,55 @@ An example that adds the parsed date to the `timestamp` field based on the `init } -------------------------------------------------- +==== Meta processor + +The `meta` processor allows to modify metadata properties of a document being processed. + +The following example changes the index of a document to `alternative_index` instead of indexing it into an index +that was specified in the index or bulk request: + +[source,js] +-------------------------------------------------- +{ + "description" : "...", + "processors" : [ + { + "meta" : { + "_index" : "alternative_index" + } + } + ] +} +-------------------------------------------------- + +The following metadata attributes can be modified in this processor: `_index`, `_type`, `_id`, `_routing`, `_parent`, +`_timestamp` and `_ttl`. All these metadata attributes can be specified in the body of the `meta` processor. + +Also the metadata settings in this processor are templatable which allows metadata field values to be replaced with +field values in the source of the document being indexed. The mustache template language is used and anything between +`{{` and `}}` can contain a template and point to any field in the source of the document. + +The following example documents being processed end up being indexed into an index based on the resolved city name by +the `geoip` processor. (for example `city-amsterdam`) + +[source,js] +-------------------------------------------------- +{ + "description" : "...", + "processors" : [ + { + "geoip" : { + "source" : "ip" + } + }, + { + "meta" : { + "_index" : "city-{{geoip.city_name}}" + } + } + ] +} +-------------------------------------------------- === Put pipeline API diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 97274d4cffb..bbb47aed8b1 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -31,13 +31,29 @@ public final class IngestDocument { private final Map metaData; private final Map source; - private boolean modified = false; + private boolean sourceModified = false; public IngestDocument(String index, String type, String id, Map source) { + this(index, type, id, null, null, null, null, source); + } + + public IngestDocument(String index, String type, String id, String routing, String parent, String timestamp, String ttl, Map source) { this.metaData = new HashMap<>(); this.metaData.put(MetaData.INDEX.getFieldName(), index); this.metaData.put(MetaData.TYPE.getFieldName(), type); this.metaData.put(MetaData.ID.getFieldName(), id); + if (routing != null) { + this.metaData.put(MetaData.ROUTING.getFieldName(), routing); + } + if (parent != null) { + this.metaData.put(MetaData.PARENT.getFieldName(), parent); + } + if (timestamp != null) { + this.metaData.put(MetaData.TIMESTAMP.getFieldName(), timestamp); + } + if (ttl != null) { + this.metaData.put(MetaData.TTL.getFieldName(), ttl); + } this.source = source; } @@ -109,7 +125,7 @@ public final class IngestDocument { if (parent != null) { String leafKey = pathElements[pathElements.length - 1]; if (parent.containsKey(leafKey)) { - modified = true; + sourceModified = true; parent.remove(leafKey); } } @@ -166,13 +182,17 @@ public final class IngestDocument { String leafKey = pathElements[pathElements.length - 1]; inner.put(leafKey, value); - modified = true; + sourceModified = true; } public String getMetadata(MetaData metaData) { return this.metaData.get(metaData.getFieldName()); } + public void setMetaData(MetaData metaData, String value) { + this.metaData.put(metaData.getFieldName(), value); + } + /** * Returns the document. Should be used only for reading. Any change made to the returned map will * not be reflected to the modified flag. Modify the document instead using {@link #setFieldValue(String, Object)} @@ -182,8 +202,8 @@ public final class IngestDocument { return source; } - public boolean isModified() { - return modified; + public boolean isSourceModified() { + return sourceModified; } @Override @@ -203,6 +223,14 @@ public final class IngestDocument { return Objects.hash(metaData, source); } + @Override + public String toString() { + return "IngestDocument{" + + "metaData=" + metaData + + ", source=" + source + + '}'; + } + public enum MetaData { INDEX("_index"), @@ -222,6 +250,28 @@ public final class IngestDocument { public String getFieldName() { return fieldName; } + + public static MetaData fromString(String value) { + switch (value) { + case "_index": + return INDEX; + case "_type": + return TYPE; + case "_id": + return ID; + case "_routing": + return ROUTING; + case "_parent": + return PARENT; + case "_timestamp": + return TIMESTAMP; + case "_ttl": + return TTL; + default: + throw new IllegalArgumentException("no valid metadata field name [" + value + "]"); + } + } + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java index 6e7d276876c..67a1cad45a7 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java @@ -49,7 +49,10 @@ public interface Processor { interface Factory

extends Closeable { /** - * Creates a processor based on the specified map of maps config + * Creates a processor based on the specified map of maps config. + * + * Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can + * verify if all configurations settings have been used. */ P create(Map config) throws IOException; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessor.java new file mode 100644 index 00000000000..e2b3b31498d --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessor.java @@ -0,0 +1,71 @@ +package org.elasticsearch.ingest.processor.meta; + +import com.github.mustachejava.DefaultMustacheFactory; +import com.github.mustachejava.Mustache; +import com.github.mustachejava.MustacheFactory; +import org.elasticsearch.common.io.FastStringReader; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestDocument.MetaData; +import org.elasticsearch.ingest.processor.Processor; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public final class MetaDataProcessor implements Processor { + + public final static String TYPE = "meta"; + + private final Map templates; + + public MetaDataProcessor(Map templates) { + this.templates = templates; + } + + @Override + public void execute(IngestDocument ingestDocument) { + Map model = ingestDocument.getSource(); + for (Map.Entry entry : templates.entrySet()) { + StringWriter writer = new StringWriter(); + entry.getValue().execute(writer, model); + ingestDocument.setMetaData(entry.getKey(), writer.toString()); + } + } + + @Override + public String getType() { + return TYPE; + } + + Map getTemplates() { + return templates; + } + + public final static class Factory implements Processor.Factory { + + private final MustacheFactory mustacheFactory = new DefaultMustacheFactory(); + + @Override + public MetaDataProcessor create(Map config) throws IOException { + Map templates = new HashMap<>(); + Iterator> iterator = config.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + MetaData metaData = MetaData.fromString(entry.getKey()); + Mustache mustache = mustacheFactory.compile(new FastStringReader(entry.getValue().toString()), ""); + templates.put(metaData, mustache); + iterator.remove(); + } + + if (templates.isEmpty()) { + throw new IllegalArgumentException("no meta fields specified"); + } + + return new MetaDataProcessor(Collections.unmodifiableMap(templates)); + } + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index dd30334e422..75a962ba272 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -35,6 +35,7 @@ import org.elasticsearch.ingest.processor.rename.RenameProcessor; import org.elasticsearch.ingest.processor.split.SplitProcessor; import org.elasticsearch.ingest.processor.trim.TrimProcessor; import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor; +import org.elasticsearch.ingest.processor.meta.MetaDataProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService; @@ -65,6 +66,7 @@ public class IngestModule extends AbstractModule { addProcessor(TrimProcessor.TYPE, new TrimProcessor.Factory()); addProcessor(ConvertProcessor.TYPE, new ConvertProcessor.Factory()); addProcessor(GsubProcessor.TYPE, new GsubProcessor.Factory()); + addProcessor(MetaDataProcessor.TYPE, new MetaDataProcessor.Factory()); MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class); for (Map.Entry entry : processors.entrySet()) { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index 4a963beecc8..e66697f02d6 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -19,14 +19,17 @@ package org.elasticsearch.plugin.ingest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Map; + public class PipelineExecutionService { static final String THREAD_POOL_NAME = IngestPlugin.NAME; @@ -40,22 +43,47 @@ public class PipelineExecutionService { this.threadPool = threadPool; } - public void execute(IngestDocument ingestDocument, String pipelineId, Listener listener) { + public void execute(IndexRequest indexRequest, String pipelineId, Listener listener) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { listener.failed(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist")); return; } - threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() { - @Override - public void run() { - try { - pipeline.execute(ingestDocument); - listener.executed(ingestDocument); - } catch (Throwable e) { - listener.failed(e); + threadPool.executor(THREAD_POOL_NAME).execute(() -> { + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + String parent = indexRequest.parent(); + String timestamp = indexRequest.timestamp(); + String ttl = null; + if (indexRequest.ttl() != -1) { + // At this point we don't know the original string ttl that was specified, + // so we covert the ttl which is a long to a string using 'ms' as unit: + ttl = TimeValue.timeValueMillis(indexRequest.ttl()).toString(); + } + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, timestamp, ttl, sourceAsMap); + try { + pipeline.execute(ingestDocument); + if (ingestDocument.isSourceModified()) { + indexRequest.source(ingestDocument.getSource()); } + indexRequest.index(ingestDocument.getMetadata(IngestDocument.MetaData.INDEX)); + indexRequest.type(ingestDocument.getMetadata(IngestDocument.MetaData.TYPE)); + indexRequest.id(ingestDocument.getMetadata(IngestDocument.MetaData.ID)); + indexRequest.routing(ingestDocument.getMetadata(IngestDocument.MetaData.ROUTING)); + indexRequest.parent(ingestDocument.getMetadata(IngestDocument.MetaData.PARENT)); + indexRequest.timestamp(ingestDocument.getMetadata(IngestDocument.MetaData.TIMESTAMP)); + String ttlStr = ingestDocument.getMetadata(IngestDocument.MetaData.TTL); + if (ttlStr != null) { + TimeValue timeValue = TimeValue.parseTimeValue(ttlStr, null, "ttl"); + indexRequest.ttl(timeValue.millis()); + } + listener.executed(ingestDocument); + } catch (Throwable e) { + listener.failed(e); } }); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index a80d10a18e4..910f26341e7 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -84,15 +84,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio chain.proceed(action, indexRequest, listener); return; } - - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); - executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() { + executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() { @Override public void executed(IngestDocument ingestDocument) { - if (ingestDocument.isModified()) { - indexRequest.source(ingestDocument.getSource()); - } indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true); chain.proceed(action, indexRequest, listener); } @@ -127,14 +121,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } IndexRequest indexRequest = (IndexRequest) actionRequest; - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); - executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() { + executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() { @Override public void executed(IngestDocument ingestDocument) { - if (ingestDocument.isModified()) { - indexRequest.source(ingestDocument.getSource()); - } processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/WriteableIngestDocument.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/WriteableIngestDocument.java index 4b65a16596d..3a75218857f 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/WriteableIngestDocument.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/WriteableIngestDocument.java @@ -31,9 +31,7 @@ import java.io.IOException; import java.util.Map; import java.util.Objects; -import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; -import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX; -import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; +import static org.elasticsearch.ingest.IngestDocument.MetaData.*; public class WriteableIngestDocument implements Writeable, ToXContent { @@ -58,8 +56,12 @@ public class WriteableIngestDocument implements Writeable doc = in.readMap(); - return new WriteableIngestDocument(new IngestDocument(index, type, id, doc)); + return new WriteableIngestDocument(new IngestDocument(index, type, id, routing, parent, timestamp, ttl, doc)); } @Override @@ -67,16 +69,24 @@ public class WriteableIngestDocument implements Writeable ingestDocumentList = new ArrayList<>(); for (Map dataMap : docs) { Map document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE); - IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX), - ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE), - ConfigurationUtils.readStringProperty(dataMap, Fields.ID), + IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, MetaData.INDEX.getFieldName()), + ConfigurationUtils.readStringProperty(dataMap, MetaData.TYPE.getFieldName()), + ConfigurationUtils.readStringProperty(dataMap, MetaData.ID.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.ROUTING.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.PARENT.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TIMESTAMP.getFieldName()), + ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TTL.getFieldName()), document); ingestDocumentList.add(ingestDocument); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index 375e86f71c6..ad28fd663dc 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -116,14 +116,14 @@ public class IngestDocumentTests extends ESTestCase { public void testSimpleSetFieldValue() { ingestDocument.setFieldValue("new_field", "foo"); assertThat(ingestDocument.getSource().get("new_field"), equalTo("foo")); - assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.isSourceModified(), equalTo(true)); } public void testSetFieldValueNullValue() { ingestDocument.setFieldValue("new_field", null); assertThat(ingestDocument.getSource().containsKey("new_field"), equalTo(true)); assertThat(ingestDocument.getSource().get("new_field"), nullValue()); - assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.isSourceModified(), equalTo(true)); } @SuppressWarnings("unchecked") @@ -138,7 +138,7 @@ public class IngestDocumentTests extends ESTestCase { assertThat(c.get("d"), instanceOf(String.class)); String d = (String) c.get("d"); assertThat(d, equalTo("foo")); - assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.isSourceModified(), equalTo(true)); } public void testSetFieldValueOnExistingField() { @@ -154,7 +154,7 @@ public class IngestDocumentTests extends ESTestCase { assertThat(innerMap.get("new"), instanceOf(String.class)); String value = (String) innerMap.get("new"); assertThat(value, equalTo("bar")); - assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.isSourceModified(), equalTo(true)); } public void testSetFieldValueOnExistingParentTypeMismatch() { @@ -163,7 +163,7 @@ public class IngestDocumentTests extends ESTestCase { fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add field to parent [buzz] of type [java.lang.String], [java.util.Map] expected instead.")); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); } } @@ -173,7 +173,7 @@ public class IngestDocumentTests extends ESTestCase { fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add field to null parent, [java.util.Map] expected instead.")); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); } } @@ -183,7 +183,7 @@ public class IngestDocumentTests extends ESTestCase { fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add null or empty field")); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); } } @@ -193,13 +193,13 @@ public class IngestDocumentTests extends ESTestCase { fail("add field should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("cannot add null or empty field")); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); } } public void testRemoveField() { ingestDocument.removeField("foo"); - assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.isSourceModified(), equalTo(true)); assertThat(ingestDocument.getSource().size(), equalTo(2)); assertThat(ingestDocument.getSource().containsKey("foo"), equalTo(false)); } @@ -217,30 +217,30 @@ public class IngestDocumentTests extends ESTestCase { assertThat(map.size(), equalTo(0)); assertThat(ingestDocument.getSource().size(), equalTo(3)); assertThat(ingestDocument.getSource().containsKey("fizz"), equalTo(true)); - assertThat(ingestDocument.isModified(), equalTo(true)); + assertThat(ingestDocument.isSourceModified(), equalTo(true)); } public void testRemoveNonExistingField() { ingestDocument.removeField("does_not_exist"); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testRemoveExistingParentTypeMismatch() { ingestDocument.removeField("foo.test"); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testRemoveNullField() { ingestDocument.removeField(null); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); assertThat(ingestDocument.getSource().size(), equalTo(3)); } public void testRemoveEmptyField() { ingestDocument.removeField(""); - assertThat(ingestDocument.isModified(), equalTo(false)); + assertThat(ingestDocument.isSourceModified(), equalTo(false)); assertThat(ingestDocument.getSource().size(), equalTo(3)); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorFactoryTests.java new file mode 100644 index 00000000000..ee4cb0228a8 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorFactoryTests.java @@ -0,0 +1,65 @@ +package org.elasticsearch.ingest.processor.meta; + +import com.github.mustachejava.DefaultMustacheFactory; +import com.github.mustachejava.Mustache; +import com.github.mustachejava.MustacheException; +import org.elasticsearch.common.io.FastStringReader; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.ingest.IngestDocument.MetaData; + +public class MetaDataProcessorFactoryTests extends ESTestCase { + + public void testCreate() throws Exception { + MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory(); + Map config = new HashMap<>(); + for (MetaData metaData : MetaData.values()) { + config.put(metaData.getFieldName(), randomBoolean() ? "static text" : "{{expression}}"); + } + MetaDataProcessor processor = factory.create(config); + assertThat(processor.getTemplates().size(), Matchers.equalTo(7)); + assertThat(processor.getTemplates().get(MetaData.INDEX), Matchers.notNullValue()); + assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue()); + assertThat(processor.getTemplates().get(MetaData.ID), Matchers.notNullValue()); + assertThat(processor.getTemplates().get(MetaData.ROUTING), Matchers.notNullValue()); + assertThat(processor.getTemplates().get(MetaData.PARENT), Matchers.notNullValue()); + assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue()); + assertThat(processor.getTemplates().get(MetaData.TTL), Matchers.notNullValue()); + } + + public void testCreateIllegalMetaData() throws Exception { + MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory(); + try { + factory.create(Collections.singletonMap("_field", "text {{expression}}")); + fail("exception should have been thrown"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), Matchers.equalTo("no valid metadata field name [_field]")); + } + } + + public void testCreateIllegalEmpty() throws Exception { + MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory(); + try { + factory.create(Collections.emptyMap()); + fail("exception should have been thrown"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), Matchers.equalTo("no meta fields specified")); + } + } + + public void testIlegalMustacheExpression() throws Exception { + try { + new MetaDataProcessor.Factory().create(Collections.singletonMap("_index", "text {{var")); + fail("exception expected"); + } catch (MustacheException e) { + assertThat(e.getMessage(), Matchers.equalTo("Improperly closed variable in :1")); + } + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorTests.java new file mode 100644 index 00000000000..13d56017d60 --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/meta/MetaDataProcessorTests.java @@ -0,0 +1,33 @@ +package org.elasticsearch.ingest.processor.meta; + +import com.github.mustachejava.DefaultMustacheFactory; +import com.github.mustachejava.Mustache; +import org.elasticsearch.common.io.FastStringReader; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.ingest.IngestDocument.*; + +public class MetaDataProcessorTests extends ESTestCase { + + public void testExecute() throws Exception { + Map templates = new HashMap<>(); + for (MetaData metaData : MetaData.values()) { + templates.put(metaData, new DefaultMustacheFactory().compile(new FastStringReader("some {{field}}"), "noname")); + } + + MetaDataProcessor processor = new MetaDataProcessor(templates); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("field", "value")); + processor.execute(ingestDocument); + + for (MetaData metaData : MetaData.values()) { + assertThat(ingestDocument.getMetadata(metaData), Matchers.equalTo("some value")); + } + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index 3f9ec3517e6..917d4b1815c 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -19,85 +19,168 @@ package org.elasticsearch.plugin.ingest; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.processor.meta.MetaDataProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -import java.util.Arrays; -import java.util.Collections; +import java.util.*; +import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.*; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; public class PipelineExecutionServiceTests extends ESTestCase { private PipelineStore store; - private ThreadPool threadPool; private PipelineExecutionService executionService; @Before public void setup() { store = mock(PipelineStore.class); - threadPool = new ThreadPool( - Settings.builder() - .put("name", "_name") - .put(PipelineExecutionService.additionalSettings(Settings.EMPTY)) - .build() - ); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor(anyString())).thenReturn(Runnable::run); executionService = new PipelineExecutionService(store, threadPool); } - @After - public void destroy() { - threadPool.shutdown(); - } - public void testExecute_pipelineDoesNotExist() { when(store.get("_id")).thenReturn(null); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap()); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); - executionService.execute(ingestDocument, "_id", listener); + executionService.execute(indexRequest, "_id", listener); verify(listener).failed(any(IllegalArgumentException.class)); - verify(listener, times(0)).executed(ingestDocument); + verify(listener, times(0)).executed(any()); } - public void testExecute_success() throws Exception { + public void testExecuteSuccess() throws Exception { Processor processor = mock(Processor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap()); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); - executionService.execute(ingestDocument, "_id", listener); - assertBusy(new Runnable() { - @Override - public void run() { - verify(processor).execute(ingestDocument); - verify(listener).executed(ingestDocument); - verify(listener, times(0)).failed(any(Exception.class)); + executionService.execute(indexRequest, "_id", listener); + verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(listener).executed(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(listener, times(0)).failed(any(Exception.class)); + } + + public void testExecutePropagateAllMetaDataUpdates() throws Exception { + Processor processor = mock(Processor.class); + doAnswer((InvocationOnMock invocationOnMock) -> { + IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; + for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { + if (metaData == IngestDocument.MetaData.TTL) { + ingestDocument.setMetaData(IngestDocument.MetaData.TTL, "5w"); + } else { + ingestDocument.setMetaData(metaData, "update" + metaData.getFieldName()); + } + } - }); + return null; + }).when(processor).execute(any()); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); + + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(indexRequest, "_id", listener); + verify(processor).execute(any()); + verify(listener).executed(any()); + verify(listener, times(0)).failed(any(Exception.class)); + + assertThat(indexRequest.index(), equalTo("update_index")); + assertThat(indexRequest.type(), equalTo("update_type")); + assertThat(indexRequest.id(), equalTo("update_id")); + assertThat(indexRequest.routing(), equalTo("update_routing")); + assertThat(indexRequest.parent(), equalTo("update_parent")); + assertThat(indexRequest.timestamp(), equalTo("update_timestamp")); + assertThat(indexRequest.ttl(), equalTo(3024000000l)); } public void testExecute_failure() throws Exception { Processor processor = mock(Processor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap()); - doThrow(new RuntimeException()).when(processor).execute(ingestDocument); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); - executionService.execute(ingestDocument, "_id", listener); - assertBusy(new Runnable() { - @Override - public void run() { - verify(processor).execute(ingestDocument); - verify(listener, times(0)).executed(ingestDocument); - verify(listener).failed(any(RuntimeException.class)); - } - }); + executionService.execute(indexRequest, "_id", listener); + verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(listener, times(0)).executed(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(listener).failed(any(RuntimeException.class)); + } + + public void testExecuteTTL() throws Exception { + // test with valid ttl + MetaDataProcessor.Factory metaProcessorFactory = new MetaDataProcessor.Factory(); + Map config = new HashMap<>(); + config.put("_ttl", "5d"); + MetaDataProcessor processor = metaProcessorFactory.create(config); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(indexRequest, "_id", listener); + + assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl").millis())); + verify(listener, times(1)).executed(any()); + verify(listener, never()).failed(any()); + + // test with invalid ttl + metaProcessorFactory = new MetaDataProcessor.Factory(); + config = new HashMap<>(); + config.put("_ttl", "abc"); + processor = metaProcessorFactory.create(config); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); + + indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); + listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(indexRequest, "_id", listener); + + verify(listener, never()).executed(any()); + verify(listener, times(1)).failed(any(ElasticsearchParseException.class)); + + // test with provided ttl + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList())); + + indexRequest = new IndexRequest("_index", "_type", "_id") + .source(Collections.emptyMap()) + .ttl(1000l); + listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(indexRequest, "_id", listener); + + assertThat(indexRequest.ttl(), equalTo(1000l)); + verify(listener, times(1)).executed(any()); + verify(listener, never()).failed(any(Throwable.class)); + } + + private IngestDocument eqID(String index, String type, String id, Map source) { + return Matchers.argThat(new IngestDocumentMatcher(index, type, id, source)); + } + + private class IngestDocumentMatcher extends ArgumentMatcher { + + private final IngestDocument ingestDocument; + + public IngestDocumentMatcher(String index, String type, String id, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, source); + } + + @Override + public boolean matches(Object o) { + return Objects.equals(ingestDocument, o); + } } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 974367aec17..67ea1b32d3e 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -83,7 +83,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verifyZeroInteractions(actionFilterChain); } @@ -96,7 +96,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verifyZeroInteractions(actionFilterChain); } @@ -121,19 +121,15 @@ public class IngestActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - Answer answer = new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; - PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; - listener.executed(ingestDocument); - return null; - } + Answer answer = invocationOnMock -> { + PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; + listener.executed(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap())); + return null; }; - doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verify(actionFilterChain).proceed("_action", indexRequest, actionListener); verifyZeroInteractions(actionListener); } @@ -154,10 +150,10 @@ public class IngestActionFilterTests extends ESTestCase { return null; } }; - doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); verify(actionListener).onFailure(exception); verifyZeroInteractions(actionFilterChain); } @@ -250,7 +246,7 @@ public class IngestActionFilterTests extends ESTestCase { listener.failed(exception); return null; }; - doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); CaptureActionListener actionListener = new CaptureActionListener(); RecordRequestAFC actionFilterChain = new RecordRequestAFC(); @@ -295,7 +291,7 @@ public class IngestActionFilterTests extends ESTestCase { listener.failed(exception); return null; }; - doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); ActionListener actionListener = mock(ActionListener.class); RecordRequestAFC actionFilterChain = new RecordRequestAFC(); diff --git a/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_meta_processor.yaml b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_meta_processor.yaml new file mode 100644 index 00000000000..be13146fb63 --- /dev/null +++ b/plugins/ingest/src/test/resources/rest-api-spec/test/ingest/70_meta_processor.yaml @@ -0,0 +1,45 @@ +--- +"Test meta processor": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "meta" : { + "_index" : "surprise" + } + } + ] + } + - match: { _id: "my_pipeline" } + + # Simulate a Thread.sleep(), because pipeline are updated in the background + - do: + catch: request_timeout + cluster.health: + wait_for_nodes: 99 + timeout: 2s + - match: { "timed_out": true } + + - do: + ingest.index: + index: test + type: test + id: 1 + pipeline_id: "my_pipeline" + body: {field: "value"} + + - do: + get: + index: surprise + type: test + id: 1 + - length: { _source: 1 } + - match: { _source.field: "value" }