From 005e0bffafbedfd998067a26ec0b02838775ec44 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Wed, 23 Sep 2020 10:46:00 +0200 Subject: [PATCH] [7.x] Make for each processor resistant to field modification (#62791) (#62807) * Make for each processor resistant to field modification (#62791) This change provides consistent view of field that foreach processor is iterating over. That prevents it to go into infinite loop and put great pressure on the cluster. Closes #62790 * fix compilation --- .../ingest/common/ForEachProcessor.java | 2 +- .../ingest/common/ForEachProcessorTests.java | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 5583424b5d3..4f99a75122d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -75,7 +75,7 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.")); } } else { - innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler); + innerExecute(0, new ArrayList<>(values), new ArrayList<>(values.size()), ingestDocument, handler); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 73cd07d9824..ffd84303855 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -292,6 +292,30 @@ public class ForEachProcessorTests extends ESTestCase { assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } + public void testAppendingToTheSameField() { + Map source = Collections.singletonMap("field", Arrays.asList("a", "b")); + IngestDocument originalIngestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, source); + IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); + TestProcessor testProcessor = new TestProcessor(id->id.appendFieldValue("field", "a")); + ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true); + processor.execute(ingestDocument, (result, e) -> {}); + assertThat(testProcessor.getInvokedCounter(), equalTo(2)); + ingestDocument.removeField("_ingest._value"); + assertThat(ingestDocument, equalTo(originalIngestDocument)); + } + + public void testRemovingFromTheSameField() { + Map source = Collections.singletonMap("field", Arrays.asList("a", "b")); + IngestDocument originalIngestDocument = new IngestDocument("_index", "_id", "_type", null, null, null, source); + IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); + TestProcessor testProcessor = new TestProcessor(id -> id.removeField("field.0")); + ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true); + processor.execute(ingestDocument, (result, e) -> {}); + assertThat(testProcessor.getInvokedCounter(), equalTo(2)); + ingestDocument.removeField("_ingest._value"); + assertThat(ingestDocument, equalTo(originalIngestDocument)); + } + private class AsyncUpperCaseProcessor implements Processor { private final String field;