* Make for each processor resistant to field modification (#62791) This change provides consistent view of field that foreach processor is iterating over. That prevents it to go into infinite loop and put great pressure on the cluster. Closes #62790 * fix compilation
This commit is contained in:
parent
bc34ecc581
commit
005e0bffaf
modules/ingest-common/src
main/java/org/elasticsearch/ingest/common
test/java/org/elasticsearch/ingest/common
|
@ -75,7 +75,7 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
|
|||
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
|
||||
}
|
||||
} else {
|
||||
innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler);
|
||||
innerExecute(0, new ArrayList<>(values), new ArrayList<>(values.size()), ingestDocument, handler);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -292,6 +292,30 @@ public class ForEachProcessorTests extends ESTestCase {
|
|||
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testAppendingToTheSameField() {
|
||||
Map<String, Object> source = Collections.singletonMap("field", Arrays.asList("a", "b"));
|
||||
IngestDocument originalIngestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, source);
|
||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||
TestProcessor testProcessor = new TestProcessor(id->id.appendFieldValue("field", "a"));
|
||||
ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true);
|
||||
processor.execute(ingestDocument, (result, e) -> {});
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
|
||||
ingestDocument.removeField("_ingest._value");
|
||||
assertThat(ingestDocument, equalTo(originalIngestDocument));
|
||||
}
|
||||
|
||||
public void testRemovingFromTheSameField() {
|
||||
Map<String, Object> source = Collections.singletonMap("field", Arrays.asList("a", "b"));
|
||||
IngestDocument originalIngestDocument = new IngestDocument("_index", "_id", "_type", null, null, null, source);
|
||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||
TestProcessor testProcessor = new TestProcessor(id -> id.removeField("field.0"));
|
||||
ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true);
|
||||
processor.execute(ingestDocument, (result, e) -> {});
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
|
||||
ingestDocument.removeField("_ingest._value");
|
||||
assertThat(ingestDocument, equalTo(originalIngestDocument));
|
||||
}
|
||||
|
||||
private class AsyncUpperCaseProcessor implements Processor {
|
||||
|
||||
private final String field;
|
||||
|
|
Loading…
Reference in New Issue