ingest: Add `meta` processor that allows to modify the metadata attributes of document being processed

This commit is contained in:
Martijn van Groningen 2015-11-24 23:59:44 +01:00
parent afc9069c99
commit 9d1fa0d6da
15 changed files with 538 additions and 108 deletions

View File

@ -396,6 +396,55 @@ An example that adds the parsed date to the `timestamp` field based on the `init
}
--------------------------------------------------
==== Meta processor
The `meta` processor allows to modify metadata properties of a document being processed.
The following example changes the index of a document to `alternative_index` instead of indexing it into an index
that was specified in the index or bulk request:
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors" : [
{
"meta" : {
"_index" : "alternative_index"
}
}
]
}
--------------------------------------------------
The following metadata attributes can be modified in this processor: `_index`, `_type`, `_id`, `_routing`, `_parent`,
`_timestamp` and `_ttl`. All these metadata attributes can be specified in the body of the `meta` processor.
Also the metadata settings in this processor are templatable which allows metadata field values to be replaced with
field values in the source of the document being indexed. The mustache template language is used and anything between
`{{` and `}}` can contain a template and point to any field in the source of the document.
The following example documents being processed end up being indexed into an index based on the resolved city name by
the `geoip` processor. (for example `city-amsterdam`)
[source,js]
--------------------------------------------------
{
"description" : "...",
"processors" : [
{
"geoip" : {
"source" : "ip"
}
},
{
"meta" : {
"_index" : "city-{{geoip.city_name}}"
}
}
]
}
--------------------------------------------------
=== Put pipeline API

View File

@ -31,13 +31,29 @@ public final class IngestDocument {
private final Map<String, String> metaData;
private final Map<String, Object> source;
private boolean modified = false;
private boolean sourceModified = false;
public IngestDocument(String index, String type, String id, Map<String, Object> source) {
this(index, type, id, null, null, null, null, source);
}
public IngestDocument(String index, String type, String id, String routing, String parent, String timestamp, String ttl, Map<String, Object> source) {
this.metaData = new HashMap<>();
this.metaData.put(MetaData.INDEX.getFieldName(), index);
this.metaData.put(MetaData.TYPE.getFieldName(), type);
this.metaData.put(MetaData.ID.getFieldName(), id);
if (routing != null) {
this.metaData.put(MetaData.ROUTING.getFieldName(), routing);
}
if (parent != null) {
this.metaData.put(MetaData.PARENT.getFieldName(), parent);
}
if (timestamp != null) {
this.metaData.put(MetaData.TIMESTAMP.getFieldName(), timestamp);
}
if (ttl != null) {
this.metaData.put(MetaData.TTL.getFieldName(), ttl);
}
this.source = source;
}
@ -109,7 +125,7 @@ public final class IngestDocument {
if (parent != null) {
String leafKey = pathElements[pathElements.length - 1];
if (parent.containsKey(leafKey)) {
modified = true;
sourceModified = true;
parent.remove(leafKey);
}
}
@ -166,13 +182,17 @@ public final class IngestDocument {
String leafKey = pathElements[pathElements.length - 1];
inner.put(leafKey, value);
modified = true;
sourceModified = true;
}
public String getMetadata(MetaData metaData) {
return this.metaData.get(metaData.getFieldName());
}
public void setMetaData(MetaData metaData, String value) {
this.metaData.put(metaData.getFieldName(), value);
}
/**
* Returns the document. Should be used only for reading. Any change made to the returned map will
* not be reflected to the modified flag. Modify the document instead using {@link #setFieldValue(String, Object)}
@ -182,8 +202,8 @@ public final class IngestDocument {
return source;
}
public boolean isModified() {
return modified;
public boolean isSourceModified() {
return sourceModified;
}
@Override
@ -203,6 +223,14 @@ public final class IngestDocument {
return Objects.hash(metaData, source);
}
@Override
public String toString() {
return "IngestDocument{" +
"metaData=" + metaData +
", source=" + source +
'}';
}
public enum MetaData {
INDEX("_index"),
@ -222,6 +250,28 @@ public final class IngestDocument {
public String getFieldName() {
return fieldName;
}
public static MetaData fromString(String value) {
switch (value) {
case "_index":
return INDEX;
case "_type":
return TYPE;
case "_id":
return ID;
case "_routing":
return ROUTING;
case "_parent":
return PARENT;
case "_timestamp":
return TIMESTAMP;
case "_ttl":
return TTL;
default:
throw new IllegalArgumentException("no valid metadata field name [" + value + "]");
}
}
}
}

View File

@ -49,7 +49,10 @@ public interface Processor {
interface Factory<P extends Processor> extends Closeable {
/**
* Creates a processor based on the specified map of maps config
* Creates a processor based on the specified map of maps config.
*
* Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can
* verify if all configurations settings have been used.
*/
P create(Map<String, Object> config) throws IOException;

View File

@ -0,0 +1,71 @@
package org.elasticsearch.ingest.processor.meta;
import com.github.mustachejava.DefaultMustacheFactory;
import com.github.mustachejava.Mustache;
import com.github.mustachejava.MustacheFactory;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.MetaData;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public final class MetaDataProcessor implements Processor {
public final static String TYPE = "meta";
private final Map<MetaData, Mustache> templates;
public MetaDataProcessor(Map<MetaData, Mustache> templates) {
this.templates = templates;
}
@Override
public void execute(IngestDocument ingestDocument) {
Map<String, Object> model = ingestDocument.getSource();
for (Map.Entry<MetaData, Mustache> entry : templates.entrySet()) {
StringWriter writer = new StringWriter();
entry.getValue().execute(writer, model);
ingestDocument.setMetaData(entry.getKey(), writer.toString());
}
}
@Override
public String getType() {
return TYPE;
}
Map<MetaData, Mustache> getTemplates() {
return templates;
}
public final static class Factory implements Processor.Factory<MetaDataProcessor> {
private final MustacheFactory mustacheFactory = new DefaultMustacheFactory();
@Override
public MetaDataProcessor create(Map<String, Object> config) throws IOException {
Map<MetaData, Mustache> templates = new HashMap<>();
Iterator<Map.Entry<String, Object>> iterator = config.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
MetaData metaData = MetaData.fromString(entry.getKey());
Mustache mustache = mustacheFactory.compile(new FastStringReader(entry.getValue().toString()), "");
templates.put(metaData, mustache);
iterator.remove();
}
if (templates.isEmpty()) {
throw new IllegalArgumentException("no meta fields specified");
}
return new MetaDataProcessor(Collections.unmodifiableMap(templates));
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.ingest.processor.rename.RenameProcessor;
import org.elasticsearch.ingest.processor.split.SplitProcessor;
import org.elasticsearch.ingest.processor.trim.TrimProcessor;
import org.elasticsearch.ingest.processor.uppercase.UppercaseProcessor;
import org.elasticsearch.ingest.processor.meta.MetaDataProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionService;
@ -65,6 +66,7 @@ public class IngestModule extends AbstractModule {
addProcessor(TrimProcessor.TYPE, new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, new GsubProcessor.Factory());
addProcessor(MetaDataProcessor.TYPE, new MetaDataProcessor.Factory());
MapBinder<String, Processor.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
for (Map.Entry<String, Processor.Factory> entry : processors.entrySet()) {

View File

@ -19,14 +19,17 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
public class PipelineExecutionService {
static final String THREAD_POOL_NAME = IngestPlugin.NAME;
@ -40,22 +43,47 @@ public class PipelineExecutionService {
this.threadPool = threadPool;
}
public void execute(IngestDocument ingestDocument, String pipelineId, Listener listener) {
public void execute(IndexRequest indexRequest, String pipelineId, Listener listener) {
Pipeline pipeline = store.get(pipelineId);
if (pipeline == null) {
listener.failed(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"));
return;
}
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
@Override
public void run() {
try {
pipeline.execute(ingestDocument);
listener.executed(ingestDocument);
} catch (Throwable e) {
listener.failed(e);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
String routing = indexRequest.routing();
String parent = indexRequest.parent();
String timestamp = indexRequest.timestamp();
String ttl = null;
if (indexRequest.ttl() != -1) {
// At this point we don't know the original string ttl that was specified,
// so we covert the ttl which is a long to a string using 'ms' as unit:
ttl = TimeValue.timeValueMillis(indexRequest.ttl()).toString();
}
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, timestamp, ttl, sourceAsMap);
try {
pipeline.execute(ingestDocument);
if (ingestDocument.isSourceModified()) {
indexRequest.source(ingestDocument.getSource());
}
indexRequest.index(ingestDocument.getMetadata(IngestDocument.MetaData.INDEX));
indexRequest.type(ingestDocument.getMetadata(IngestDocument.MetaData.TYPE));
indexRequest.id(ingestDocument.getMetadata(IngestDocument.MetaData.ID));
indexRequest.routing(ingestDocument.getMetadata(IngestDocument.MetaData.ROUTING));
indexRequest.parent(ingestDocument.getMetadata(IngestDocument.MetaData.PARENT));
indexRequest.timestamp(ingestDocument.getMetadata(IngestDocument.MetaData.TIMESTAMP));
String ttlStr = ingestDocument.getMetadata(IngestDocument.MetaData.TTL);
if (ttlStr != null) {
TimeValue timeValue = TimeValue.parseTimeValue(ttlStr, null, "ttl");
indexRequest.ttl(timeValue.millis());
}
listener.executed(ingestDocument);
} catch (Throwable e) {
listener.failed(e);
}
});
}

View File

@ -84,15 +84,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
chain.proceed(action, indexRequest, listener);
return;
}
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() {
executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() {
@Override
public void executed(IngestDocument ingestDocument) {
if (ingestDocument.isModified()) {
indexRequest.source(ingestDocument.getSource());
}
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(action, indexRequest, listener);
}
@ -127,14 +121,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
IndexRequest indexRequest = (IndexRequest) actionRequest;
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
executionService.execute(ingestDocument, pipelineId, new PipelineExecutionService.Listener() {
executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() {
@Override
public void executed(IngestDocument ingestDocument) {
if (ingestDocument.isModified()) {
indexRequest.source(ingestDocument.getSource());
}
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
}

View File

@ -31,9 +31,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.ingest.IngestDocument.MetaData.ID;
import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE;
import static org.elasticsearch.ingest.IngestDocument.MetaData.*;
public class WriteableIngestDocument implements Writeable<WriteableIngestDocument>, ToXContent {
@ -58,8 +56,12 @@ public class WriteableIngestDocument implements Writeable<WriteableIngestDocumen
String index = in.readString();
String type = in.readString();
String id = in.readString();
String routing = in.readOptionalString();
String parent = in.readOptionalString();
String timestamp = in.readOptionalString();
String ttl = in.readOptionalString();
Map<String, Object> doc = in.readMap();
return new WriteableIngestDocument(new IngestDocument(index, type, id, doc));
return new WriteableIngestDocument(new IngestDocument(index, type, id, routing, parent, timestamp, ttl, doc));
}
@Override
@ -67,16 +69,24 @@ public class WriteableIngestDocument implements Writeable<WriteableIngestDocumen
out.writeString(ingestDocument.getMetadata(INDEX));
out.writeString(ingestDocument.getMetadata(TYPE));
out.writeString(ingestDocument.getMetadata(ID));
out.writeOptionalString(ingestDocument.getMetadata(ROUTING));
out.writeOptionalString(ingestDocument.getMetadata(PARENT));
out.writeOptionalString(ingestDocument.getMetadata(TIMESTAMP));
out.writeOptionalString(ingestDocument.getMetadata(TTL));
out.writeMap(ingestDocument.getSource());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DOCUMENT);
builder.field(Fields.MODIFIED, ingestDocument.isModified());
builder.field(Fields.MODIFIED, ingestDocument.isSourceModified());
builder.field(Fields.INDEX, ingestDocument.getMetadata(INDEX));
builder.field(Fields.TYPE, ingestDocument.getMetadata(TYPE));
builder.field(Fields.ID, ingestDocument.getMetadata(ID));
builder.field(Fields.ROUTING, ingestDocument.getMetadata(ROUTING));
builder.field(Fields.PARENT, ingestDocument.getMetadata(PARENT));
builder.field(Fields.TIMESTAMP, ingestDocument.getMetadata(TIMESTAMP));
builder.field(Fields.TTL, ingestDocument.getMetadata(TTL));
builder.field(Fields.SOURCE, ingestDocument.getSource());
builder.endObject();
return builder;
@ -105,6 +115,10 @@ public class WriteableIngestDocument implements Writeable<WriteableIngestDocumen
static final XContentBuilderString INDEX = new XContentBuilderString(IngestDocument.MetaData.INDEX.getFieldName());
static final XContentBuilderString TYPE = new XContentBuilderString(IngestDocument.MetaData.TYPE.getFieldName());
static final XContentBuilderString ID = new XContentBuilderString(IngestDocument.MetaData.ID.getFieldName());
static final XContentBuilderString ROUTING = new XContentBuilderString(IngestDocument.MetaData.ROUTING.getFieldName());
static final XContentBuilderString PARENT = new XContentBuilderString(IngestDocument.MetaData.PARENT.getFieldName());
static final XContentBuilderString TIMESTAMP = new XContentBuilderString(IngestDocument.MetaData.TIMESTAMP.getFieldName());
static final XContentBuilderString TTL = new XContentBuilderString(IngestDocument.MetaData.TTL.getFieldName());
static final XContentBuilderString SOURCE = new XContentBuilderString("_source");
}
}

View File

@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest {
@ -96,9 +97,6 @@ public class SimulatePipelineRequest extends ActionRequest {
static final String PIPELINE = "pipeline";
static final String DOCS = "docs";
static final String SOURCE = "_source";
static final String INDEX = "_index";
static final String TYPE = "_type";
static final String ID = "_id";
}
static class Parsed {
@ -149,9 +147,13 @@ public class SimulatePipelineRequest extends ActionRequest {
List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE),
ConfigurationUtils.readStringProperty(dataMap, Fields.ID),
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, MetaData.INDEX.getFieldName()),
ConfigurationUtils.readStringProperty(dataMap, MetaData.TYPE.getFieldName()),
ConfigurationUtils.readStringProperty(dataMap, MetaData.ID.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.ROUTING.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.PARENT.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TIMESTAMP.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(dataMap, MetaData.TTL.getFieldName()),
document);
ingestDocumentList.add(ingestDocument);
}

View File

@ -116,14 +116,14 @@ public class IngestDocumentTests extends ESTestCase {
public void testSimpleSetFieldValue() {
ingestDocument.setFieldValue("new_field", "foo");
assertThat(ingestDocument.getSource().get("new_field"), equalTo("foo"));
assertThat(ingestDocument.isModified(), equalTo(true));
assertThat(ingestDocument.isSourceModified(), equalTo(true));
}
public void testSetFieldValueNullValue() {
ingestDocument.setFieldValue("new_field", null);
assertThat(ingestDocument.getSource().containsKey("new_field"), equalTo(true));
assertThat(ingestDocument.getSource().get("new_field"), nullValue());
assertThat(ingestDocument.isModified(), equalTo(true));
assertThat(ingestDocument.isSourceModified(), equalTo(true));
}
@SuppressWarnings("unchecked")
@ -138,7 +138,7 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(c.get("d"), instanceOf(String.class));
String d = (String) c.get("d");
assertThat(d, equalTo("foo"));
assertThat(ingestDocument.isModified(), equalTo(true));
assertThat(ingestDocument.isSourceModified(), equalTo(true));
}
public void testSetFieldValueOnExistingField() {
@ -154,7 +154,7 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(innerMap.get("new"), instanceOf(String.class));
String value = (String) innerMap.get("new");
assertThat(value, equalTo("bar"));
assertThat(ingestDocument.isModified(), equalTo(true));
assertThat(ingestDocument.isSourceModified(), equalTo(true));
}
public void testSetFieldValueOnExistingParentTypeMismatch() {
@ -163,7 +163,7 @@ public class IngestDocumentTests extends ESTestCase {
fail("add field should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("cannot add field to parent [buzz] of type [java.lang.String], [java.util.Map] expected instead."));
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
}
}
@ -173,7 +173,7 @@ public class IngestDocumentTests extends ESTestCase {
fail("add field should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("cannot add field to null parent, [java.util.Map] expected instead."));
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
}
}
@ -183,7 +183,7 @@ public class IngestDocumentTests extends ESTestCase {
fail("add field should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("cannot add null or empty field"));
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
}
}
@ -193,13 +193,13 @@ public class IngestDocumentTests extends ESTestCase {
fail("add field should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("cannot add null or empty field"));
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
}
}
public void testRemoveField() {
ingestDocument.removeField("foo");
assertThat(ingestDocument.isModified(), equalTo(true));
assertThat(ingestDocument.isSourceModified(), equalTo(true));
assertThat(ingestDocument.getSource().size(), equalTo(2));
assertThat(ingestDocument.getSource().containsKey("foo"), equalTo(false));
}
@ -217,30 +217,30 @@ public class IngestDocumentTests extends ESTestCase {
assertThat(map.size(), equalTo(0));
assertThat(ingestDocument.getSource().size(), equalTo(3));
assertThat(ingestDocument.getSource().containsKey("fizz"), equalTo(true));
assertThat(ingestDocument.isModified(), equalTo(true));
assertThat(ingestDocument.isSourceModified(), equalTo(true));
}
public void testRemoveNonExistingField() {
ingestDocument.removeField("does_not_exist");
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
assertThat(ingestDocument.getSource().size(), equalTo(3));
}
public void testRemoveExistingParentTypeMismatch() {
ingestDocument.removeField("foo.test");
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
assertThat(ingestDocument.getSource().size(), equalTo(3));
}
public void testRemoveNullField() {
ingestDocument.removeField(null);
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
assertThat(ingestDocument.getSource().size(), equalTo(3));
}
public void testRemoveEmptyField() {
ingestDocument.removeField("");
assertThat(ingestDocument.isModified(), equalTo(false));
assertThat(ingestDocument.isSourceModified(), equalTo(false));
assertThat(ingestDocument.getSource().size(), equalTo(3));
}

View File

@ -0,0 +1,65 @@
package org.elasticsearch.ingest.processor.meta;
import com.github.mustachejava.DefaultMustacheFactory;
import com.github.mustachejava.Mustache;
import com.github.mustachejava.MustacheException;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
public class MetaDataProcessorFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory();
Map<String, Object> config = new HashMap<>();
for (MetaData metaData : MetaData.values()) {
config.put(metaData.getFieldName(), randomBoolean() ? "static text" : "{{expression}}");
}
MetaDataProcessor processor = factory.create(config);
assertThat(processor.getTemplates().size(), Matchers.equalTo(7));
assertThat(processor.getTemplates().get(MetaData.INDEX), Matchers.notNullValue());
assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue());
assertThat(processor.getTemplates().get(MetaData.ID), Matchers.notNullValue());
assertThat(processor.getTemplates().get(MetaData.ROUTING), Matchers.notNullValue());
assertThat(processor.getTemplates().get(MetaData.PARENT), Matchers.notNullValue());
assertThat(processor.getTemplates().get(MetaData.TIMESTAMP), Matchers.notNullValue());
assertThat(processor.getTemplates().get(MetaData.TTL), Matchers.notNullValue());
}
public void testCreateIllegalMetaData() throws Exception {
MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory();
try {
factory.create(Collections.singletonMap("_field", "text {{expression}}"));
fail("exception should have been thrown");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), Matchers.equalTo("no valid metadata field name [_field]"));
}
}
public void testCreateIllegalEmpty() throws Exception {
MetaDataProcessor.Factory factory = new MetaDataProcessor.Factory();
try {
factory.create(Collections.emptyMap());
fail("exception should have been thrown");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), Matchers.equalTo("no meta fields specified"));
}
}
public void testIlegalMustacheExpression() throws Exception {
try {
new MetaDataProcessor.Factory().create(Collections.singletonMap("_index", "text {{var"));
fail("exception expected");
} catch (MustacheException e) {
assertThat(e.getMessage(), Matchers.equalTo("Improperly closed variable in :1"));
}
}
}

View File

@ -0,0 +1,33 @@
package org.elasticsearch.ingest.processor.meta;
import com.github.mustachejava.DefaultMustacheFactory;
import com.github.mustachejava.Mustache;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.ingest.IngestDocument.*;
public class MetaDataProcessorTests extends ESTestCase {
public void testExecute() throws Exception {
Map<IngestDocument.MetaData, Mustache> templates = new HashMap<>();
for (MetaData metaData : MetaData.values()) {
templates.put(metaData, new DefaultMustacheFactory().compile(new FastStringReader("some {{field}}"), "noname"));
}
MetaDataProcessor processor = new MetaDataProcessor(templates);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("field", "value"));
processor.execute(ingestDocument);
for (MetaData metaData : MetaData.values()) {
assertThat(ingestDocument.getMetadata(metaData), Matchers.equalTo("some value"));
}
}
}

View File

@ -19,85 +19,168 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.meta.MetaDataProcessor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.Collections;
import java.util.*;
import java.util.concurrent.Executor;
import static org.hamcrest.Matchers.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
public class PipelineExecutionServiceTests extends ESTestCase {
private PipelineStore store;
private ThreadPool threadPool;
private PipelineExecutionService executionService;
@Before
public void setup() {
store = mock(PipelineStore.class);
threadPool = new ThreadPool(
Settings.builder()
.put("name", "_name")
.put(PipelineExecutionService.additionalSettings(Settings.EMPTY))
.build()
);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(Runnable::run);
executionService = new PipelineExecutionService(store, threadPool);
}
@After
public void destroy() {
threadPool.shutdown();
}
public void testExecute_pipelineDoesNotExist() {
when(store.get("_id")).thenReturn(null);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap());
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(ingestDocument, "_id", listener);
executionService.execute(indexRequest, "_id", listener);
verify(listener).failed(any(IllegalArgumentException.class));
verify(listener, times(0)).executed(ingestDocument);
verify(listener, times(0)).executed(any());
}
public void testExecute_success() throws Exception {
public void testExecuteSuccess() throws Exception {
Processor processor = mock(Processor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap());
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(ingestDocument, "_id", listener);
assertBusy(new Runnable() {
@Override
public void run() {
verify(processor).execute(ingestDocument);
verify(listener).executed(ingestDocument);
verify(listener, times(0)).failed(any(Exception.class));
executionService.execute(indexRequest, "_id", listener);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener).executed(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener, times(0)).failed(any(Exception.class));
}
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
Processor processor = mock(Processor.class);
doAnswer((InvocationOnMock invocationOnMock) -> {
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
if (metaData == IngestDocument.MetaData.TTL) {
ingestDocument.setMetaData(IngestDocument.MetaData.TTL, "5w");
} else {
ingestDocument.setMetaData(metaData, "update" + metaData.getFieldName());
}
}
});
return null;
}).when(processor).execute(any());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(indexRequest, "_id", listener);
verify(processor).execute(any());
verify(listener).executed(any());
verify(listener, times(0)).failed(any(Exception.class));
assertThat(indexRequest.index(), equalTo("update_index"));
assertThat(indexRequest.type(), equalTo("update_type"));
assertThat(indexRequest.id(), equalTo("update_id"));
assertThat(indexRequest.routing(), equalTo("update_routing"));
assertThat(indexRequest.parent(), equalTo("update_parent"));
assertThat(indexRequest.timestamp(), equalTo("update_timestamp"));
assertThat(indexRequest.ttl(), equalTo(3024000000l));
}
public void testExecute_failure() throws Exception {
Processor processor = mock(Processor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(ingestDocument);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(ingestDocument, "_id", listener);
assertBusy(new Runnable() {
@Override
public void run() {
verify(processor).execute(ingestDocument);
verify(listener, times(0)).executed(ingestDocument);
verify(listener).failed(any(RuntimeException.class));
}
});
executionService.execute(indexRequest, "_id", listener);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener, times(0)).executed(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener).failed(any(RuntimeException.class));
}
public void testExecuteTTL() throws Exception {
// test with valid ttl
MetaDataProcessor.Factory metaProcessorFactory = new MetaDataProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("_ttl", "5d");
MetaDataProcessor processor = metaProcessorFactory.create(config);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(indexRequest, "_id", listener);
assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl").millis()));
verify(listener, times(1)).executed(any());
verify(listener, never()).failed(any());
// test with invalid ttl
metaProcessorFactory = new MetaDataProcessor.Factory();
config = new HashMap<>();
config.put("_ttl", "abc");
processor = metaProcessorFactory.create(config);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(indexRequest, "_id", listener);
verify(listener, never()).executed(any());
verify(listener, times(1)).failed(any(ElasticsearchParseException.class));
// test with provided ttl
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList()));
indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.ttl(1000l);
listener = mock(PipelineExecutionService.Listener.class);
executionService.execute(indexRequest, "_id", listener);
assertThat(indexRequest.ttl(), equalTo(1000l));
verify(listener, times(1)).executed(any());
verify(listener, never()).failed(any(Throwable.class));
}
private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {
return Matchers.argThat(new IngestDocumentMatcher(index, type, id, source));
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {
private final IngestDocument ingestDocument;
public IngestDocumentMatcher(String index, String type, String id, Map<String, Object> source) {
this.ingestDocument = new IngestDocument(index, type, id, source);
}
@Override
public boolean matches(Object o) {
return Objects.equals(ingestDocument, o);
}
}
}

View File

@ -83,7 +83,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verifyZeroInteractions(actionFilterChain);
}
@ -96,7 +96,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verifyZeroInteractions(actionFilterChain);
}
@ -121,19 +121,15 @@ public class IngestActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
listener.executed(ingestDocument);
return null;
}
Answer answer = invocationOnMock -> {
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
listener.executed(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap()));
return null;
};
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
verifyZeroInteractions(actionListener);
}
@ -154,10 +150,10 @@ public class IngestActionFilterTests extends ESTestCase {
return null;
}
};
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(actionListener).onFailure(exception);
verifyZeroInteractions(actionFilterChain);
}
@ -250,7 +246,7 @@ public class IngestActionFilterTests extends ESTestCase {
listener.failed(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
CaptureActionListener actionListener = new CaptureActionListener();
RecordRequestAFC actionFilterChain = new RecordRequestAFC();
@ -295,7 +291,7 @@ public class IngestActionFilterTests extends ESTestCase {
listener.failed(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
ActionListener actionListener = mock(ActionListener.class);
RecordRequestAFC actionFilterChain = new RecordRequestAFC();

View File

@ -0,0 +1,45 @@
---
"Test meta processor":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"meta" : {
"_index" : "surprise"
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {field: "value"}
- do:
get:
index: surprise
type: test
id: 1
- length: { _source: 1 }
- match: { _source.field: "value" }