ingest: Change the foreach processor to use the _ingest._value ingest metadata attribute to store the current array element being processed.

Closes #19592
This commit is contained in:
Martijn van Groningen 2016-07-26 20:47:14 +02:00
parent 21ff90fed3
commit 24d7fa6d54
14 changed files with 143 additions and 54 deletions

View File

@ -41,15 +41,14 @@ final class WriteableIngestDocument implements Writeable, ToXContent {
WriteableIngestDocument(StreamInput in) throws IOException {
Map<String, Object> sourceAndMetadata = in.readMap();
@SuppressWarnings("unchecked")
Map<String, String> ingestMetadata = (Map<String, String>) in.readGenericValue();
Map<String, Object> ingestMetadata = in.readMap();
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(ingestDocument.getSourceAndMetadata());
out.writeGenericValue(ingestDocument.getIngestMetadata());
out.writeMap(ingestDocument.getIngestMetadata());
}
IngestDocument getIngestDocument() {
@ -66,11 +65,7 @@ final class WriteableIngestDocument implements Writeable, ToXContent {
}
}
builder.field("_source", ingestDocument.getSourceAndMetadata());
builder.startObject("_ingest");
for (Map.Entry<String, String> ingestMetadata : ingestDocument.getIngestMetadata().entrySet()) {
builder.field(ingestMetadata.getKey(), ingestMetadata.getValue());
}
builder.endObject();
builder.field("_ingest", ingestDocument.getIngestMetadata());
builder.endObject();
return builder;
}

View File

@ -135,14 +135,14 @@ public class CompoundProcessor implements Processor {
List<String> processorTagHeader = cause.getHeader("processor_tag");
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
}
private void removeFailureMetadata(IngestDocument ingestDocument) {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);

View File

@ -54,7 +54,7 @@ public final class IngestDocument {
static final String TIMESTAMP = "timestamp";
private final Map<String, Object> sourceAndMetadata;
private final Map<String, String> ingestMetadata;
private final Map<String, Object> ingestMetadata;
public IngestDocument(String index, String type, String id, String routing, String parent, String timestamp,
String ttl, Map<String, Object> source) {
@ -94,7 +94,7 @@ public final class IngestDocument {
* source and ingest metadata. This is needed because the ingest metadata will be initialized with the current timestamp at
* init time, which makes equality comparisons impossible in tests.
*/
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, String> ingestMetadata) {
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
this.sourceAndMetadata = sourceAndMetadata;
this.ingestMetadata = ingestMetadata;
}
@ -517,7 +517,7 @@ public final class IngestDocument {
* Returns the available ingest metadata fields, by default only timestamp, but it is possible to set additional ones.
* Use only for reading values, modify them instead using {@link #setFieldValue(String, Object)} and {@link #removeField(String)}
*/
public Map<String, String> getIngestMetadata() {
public Map<String, Object> getIngestMetadata() {
return this.ingestMetadata;
}

View File

@ -145,7 +145,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument);
Map<String, String> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
Map<String, Object> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");

View File

@ -111,7 +111,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
Map<String, String> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));

View File

@ -47,7 +47,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
for (int i = 0; i < numFields; i++) {
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
}
Map<String, String> ingestMetadata = new HashMap<>();
Map<String, Object> ingestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@ -70,7 +70,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
changed = true;
}
Map<String, String> otherIngestMetadata;
Map<String, Object> otherIngestMetadata;
if (randomBoolean()) {
otherIngestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
@ -103,7 +103,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
for (int i = 0; i < numFields; i++) {
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
}
Map<String, String> ingestMetadata = new HashMap<>();
Map<String, Object> ingestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@ -131,7 +131,7 @@ public class WriteableIngestDocumentTests extends ESTestCase {
Map<String, Object> toXContentDoc = (Map<String, Object>) toXContentMap.get("doc");
Map<String, Object> toXContentSource = (Map<String, Object>) toXContentDoc.get("_source");
Map<String, String> toXContentIngestMetadata = (Map<String, String>) toXContentDoc.get("_ingest");
Map<String, Object> toXContentIngestMetadata = (Map<String, Object>) toXContentDoc.get("_ingest");
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {

View File

@ -86,7 +86,7 @@ public class CompoundProcessorTests extends ESTestCase {
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
@ -104,7 +104,7 @@ public class CompoundProcessorTests extends ESTestCase {
public void testSingleProcessorWithNestedFailures() throws Exception {
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
@ -112,7 +112,7 @@ public class CompoundProcessorTests extends ESTestCase {
throw new RuntimeException("error");
});
TestProcessor lastProcessor = new TestProcessor(ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
@ -131,7 +131,7 @@ public class CompoundProcessorTests extends ESTestCase {
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
@ -153,7 +153,7 @@ public class CompoundProcessorTests extends ESTestCase {
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
@ -176,7 +176,7 @@ public class CompoundProcessorTests extends ESTestCase {
TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));

View File

@ -907,7 +907,7 @@ public class IngestDocumentTests extends ESTestCase {
for (int i = 0; i < numFields; i++) {
sourceAndMetadata.put(randomFrom(IngestDocument.MetaData.values()).getFieldName(), randomAsciiOfLengthBetween(5, 10));
}
Map<String, String> ingestMetadata = new HashMap<>();
Map<String, Object> ingestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
ingestMetadata.put(randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(5, 10));
@ -930,7 +930,7 @@ public class IngestDocumentTests extends ESTestCase {
changed = true;
}
Map<String, String> otherIngestMetadata;
Map<String, Object> otherIngestMetadata;
if (randomBoolean()) {
otherIngestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
@ -962,7 +962,7 @@ public class IngestDocumentTests extends ESTestCase {
long before = System.currentTimeMillis();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
long after = System.currentTimeMillis();
String timestampString = ingestDocument.getIngestMetadata().get("timestamp");
String timestampString = (String) ingestDocument.getIngestMetadata().get("timestamp");
assertThat(timestampString, notNullValue());
assertThat(timestampString, endsWith("+0000"));
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT);

View File

@ -859,8 +859,16 @@ because it is likely that the number of elements in an array is unknown. For thi
processor exists. By specifying the field holding array elements and a processor that
defines what should happen to each element, array fields can easily be preprocessed.
A processor inside the foreach processor works in a different context, and the only valid top-level
field is `_value`, which holds the array element value. Under this field other fields may exist.
A processor inside the foreach processor works in the array element context and puts that in the ingest metadata
under the `_ingest._value` key. If the array element is a json object it holds all immediate fields of that json object.
and if the nested object is a value is `_ingest._value` just holds that value. Note that if a processor prior to the
`foreach` processor used `_ingest._value` key then the specified value will not be available to the processor inside
the `foreach` processor. The `foreach` processor does restore the original value, so that value is available to processors
after the `foreach` processor.
Note that any other field from the document are accessible and modifiable like with all other processors. This processor
just puts the current array element being read into `_ingest._value` ingest metadata attribute, so that it may be
pre-processed.
If the `foreach` processor fails to process an element inside the array, and no `on_failure` processor has been specified,
then it aborts the execution and leaves the array unmodified.
@ -892,7 +900,7 @@ When this `foreach` processor operates on this sample document:
"field" : "values",
"processor" : {
"uppercase" : {
"field" : "_value"
"field" : "_ingest._value"
}
}
}
@ -936,7 +944,7 @@ so the following `foreach` processor is used:
"field" : "persons",
"processor" : {
"remove" : {
"field" : "_value.id"
"field" : "_ingest._value.id"
}
}
}
@ -959,9 +967,7 @@ After preprocessing the result is:
}
--------------------------------------------------
As for any processor, you can define `on_failure` processors
in processors that are wrapped inside the `foreach` processor.
The wrapped processor can have a `on_failure` definition.
For example, the `id` field may not exist on all person objects.
Instead of failing the index request, you can use an `on_failure`
block to send the document to the 'failure_index' index for later inspection:

View File

@ -62,11 +62,12 @@ public final class ForEachProcessor extends AbstractProcessor {
List<Object> values = ingestDocument.getFieldValue(field, List.class);
List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) {
Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata());
innerSource.put("_value", value); // scalar value to access the list item being evaluated
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
processor.execute(innerIngestDocument);
newValues.add(innerSource.get("_value"));
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
try {
processor.execute(ingestDocument);
} finally {
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
}
}
ingestDocument.setFieldValue(field, newValues);
}

View File

@ -49,7 +49,7 @@ public class ForEachProcessorTests extends ESTestCase {
);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new UppercaseProcessor("_tag", "_value")
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value")
);
processor.execute(ingestDocument);
@ -65,7 +65,7 @@ public class ForEachProcessorTests extends ESTestCase {
);
TestProcessor testProcessor = new TestProcessor(id -> {
if ("c".equals(id.getFieldValue("_value", String.class))) {
if ("c".equals(id.getFieldValue("_ingest._value", String.class))) {
throw new RuntimeException("failure");
}
});
@ -80,11 +80,11 @@ public class ForEachProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c")));
testProcessor = new TestProcessor(id -> {
String value = id.getFieldValue("_value", String.class);
String value = id.getFieldValue("_ingest._value", String.class);
if ("c".equals(value)) {
throw new RuntimeException("failure");
} else {
id.setFieldValue("_value", value.toUpperCase(Locale.ROOT));
id.setFieldValue("_ingest._value", value.toUpperCase(Locale.ROOT));
}
});
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
@ -105,9 +105,9 @@ public class ForEachProcessorTests extends ESTestCase {
);
TestProcessor innerProcessor = new TestProcessor(id -> {
id.setFieldValue("_value.index", id.getSourceAndMetadata().get("_index"));
id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id"));
id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index"));
id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
processor.execute(ingestDocument);
@ -136,7 +136,7 @@ public class ForEachProcessorTests extends ESTestCase {
TemplateService ts = TestTemplateService.instance();
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other"))
"_tag", "values", new SetProcessor("_tag", ts.compile("_ingest._value.new_field"), (model) -> model.get("other"))
);
processor.execute(ingestDocument);
@ -151,8 +151,8 @@ public class ForEachProcessorTests extends ESTestCase {
Processor innerProcessor = new Processor() {
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
String existingValue = ingestDocument.getFieldValue("_value", String.class);
ingestDocument.setFieldValue("_value", existingValue + ".");
String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class);
ingestDocument.setFieldValue("_ingest._value", existingValue + ".");
}
@Override
@ -184,4 +184,91 @@ public class ForEachProcessorTests extends ESTestCase {
}
}
public void testModifyFieldsOutsideArray() throws Exception {
List<Object> values = new ArrayList<>();
values.add("string");
values.add(1);
values.add(null);
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
);
TemplateService ts = TestTemplateService.instance();
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new CompoundProcessor(false,
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag",
ts.compile("errors"), (model) -> (Collections.singletonList("added"))))
));
processor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("STRING"));
assertThat(result.get(1), equalTo(1));
assertThat(result.get(2), equalTo(null));
List<String> errors = ingestDocument.getFieldValue("errors", List.class);
assertThat(errors.size(), equalTo(2));
}
public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws Exception {
List<Object> values = new ArrayList<>();
values.add("please");
values.add("change");
values.add("me");
Map<String, Object> source = new HashMap<>();
source.put("_value", "new_value");
source.put("values", values);
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, source
);
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
doc.getFieldValue("_source._value", String.class)));
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor);
forEachProcessor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.get(0), equalTo("new_value"));
assertThat(result.get(1), equalTo("new_value"));
assertThat(result.get(2), equalTo("new_value"));
}
public void testNestedForEach() throws Exception {
List<Map<String, Object>> values = new ArrayList<>();
List<Object> innerValues = new ArrayList<>();
innerValues.add("abc");
innerValues.add("def");
Map<String, Object> value = new HashMap<>();
value.put("values2", innerValues);
values.add(value);
innerValues = new ArrayList<>();
innerValues.add("ghi");
innerValues.add("jkl");
value = new HashMap<>();
value.put("values2", innerValues);
values.add(value);
IngestDocument ingestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values1", values)
);
TestProcessor testProcessor = new TestProcessor(
doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor));
processor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
assertThat(result.get(0), equalTo("ABC"));
assertThat(result.get(1), equalTo("DEF"));
result = ingestDocument.getFieldValue("values1.1.values2", List.class);
assertThat(result.get(0), equalTo("GHI"));
assertThat(result.get(1), equalTo("JKL"));
}
}

View File

@ -19,7 +19,7 @@ teardown:
"field" : "values",
"processor" : {
"uppercase" : {
"field" : "_value"
"field" : "_ingest._value"
}
}
}

View File

@ -234,7 +234,7 @@
"processor": {
"append": {
"field": "values_flat",
"value": "{{_value.key}}_{{_value.value}}"
"value": "{{_ingest._value.key}}_{{_ingest._value.value}}"
}
}
}

View File

@ -84,7 +84,7 @@
"field" : "friends",
"processor" : {
"remove" : {
"field" : "_value.id"
"field" : "_ingest._value.id"
}
}
}
@ -106,7 +106,7 @@
"field" : "address",
"processor" : {
"trim" : {
"field" : "_value"
"field" : "_ingest._value"
}
}
}