ingest: Give the `foreach` processor access to the rest of the document.

Closes #17147
This commit is contained in:
Martijn van Groningen 2016-03-17 16:26:24 +01:00
parent 1264ee79b6
commit 8f22a01bbd
3 changed files with 96 additions and 5 deletions

View File

@ -59,11 +59,8 @@ public final class ForEachProcessor extends AbstractProcessor {
List<Object> values = ingestDocument.getFieldValue(field, List.class); List<Object> values = ingestDocument.getFieldValue(field, List.class);
List<Object> newValues = new ArrayList<>(values.size()); List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) { for (Object value : values) {
Map<String, Object> innerSource = new HashMap<>(); Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata());
innerSource.put("_value", value); innerSource.put("_value", value); // scalar value to access the list item being evaluated
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
innerSource.put(metaData.getFieldName(), ingestDocument.getSourceAndMetadata().get(metaData.getFieldName()));
}
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata()); IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
for (Processor processor : processors) { for (Processor processor : processors) {
processor.execute(innerIngestDocument); processor.execute(innerIngestDocument);

View File

@ -20,9 +20,12 @@
package org.elasticsearch.ingest.processor; package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.ingest.core.ValueSource;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList; import java.util.ArrayList;
@ -120,6 +123,42 @@ public class ForEachProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue("values.1.id", String.class), equalTo("_id")); assertThat(ingestDocument.getFieldValue("values.1.id", String.class), equalTo("_id"));
} }
public void testRestOfTheDocumentIsAvailable() throws Exception {
List<Map<String, Object>> values = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> object = new HashMap<>();
object.put("field", "value");
values.add(object);
}
Map<String, Object> document = new HashMap<>();
document.put("values", values);
document.put("flat_values", new ArrayList<>());
document.put("other", "value");
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, document);
TemplateService ts = TestTemplateService.instance();
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Arrays.asList(
new AppendProcessor("_tag", ts.compile("flat_values"), ValueSource.wrap("value", ts)),
new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other")))
);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.2.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.3.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value"));
List<String> flatValues = ingestDocument.getFieldValue("flat_values", List.class);
assertThat(flatValues.size(), equalTo(5));
assertThat(flatValues.get(0), equalTo("value"));
assertThat(flatValues.get(1), equalTo("value"));
assertThat(flatValues.get(2), equalTo("value"));
assertThat(flatValues.get(3), equalTo("value"));
assertThat(flatValues.get(4), equalTo("value"));
}
public void testRandom() throws Exception { public void testRandom() throws Exception {
int numProcessors = randomInt(8); int numProcessors = randomInt(8);
List<Processor> processors = new ArrayList<>(numProcessors); List<Processor> processors = new ArrayList<>(numProcessors);

View File

@ -219,3 +219,58 @@
- length: { _source: 2 } - length: { _source: 2 }
- match: { _source.do_nothing: "foo" } - match: { _source.do_nothing: "foo" }
- match: { _source.error: "processor first_processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" } - match: { _source.error: "processor first_processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" }
---
"Test rolling up json object arrays":
- do:
ingest.put_pipeline:
id: "_id"
body: >
{
"processors": [
{
"foreach": {
"field": "values",
"processors": [
{
"append": {
"field": "values_flat",
"value": "{{_value.key}}_{{_value.value}}"
}
}
]
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "_id"
body: {
values_flat : [],
values: [
{
level: 1,
key: "foo",
value: "bar"
},
{
level: 2,
key: "foo",
value: "baz"
}
]
}
- do:
get:
index: test
type: test
id: 1
- length: { _source: 2 }
- match: { _source.values_flat: ["foo_bar", "foo_baz"] }