This commit is contained in:
parent
26a927a120
commit
13e1cf6191
|
@ -1078,6 +1078,7 @@ then it aborts the execution and leaves the array unmodified.
|
||||||
| Name | Required | Default | Description
|
| Name | Required | Default | Description
|
||||||
| `field` | yes | - | The array field
|
| `field` | yes | - | The array field
|
||||||
| `processor` | yes | - | The processor to execute against each field
|
| `processor` | yes | - | The processor to execute against each field
|
||||||
|
| `ignore_missing` | no | false | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|
||||||
|======
|
|======
|
||||||
|
|
||||||
Assume the following document:
|
Assume the following document:
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
|
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
|
||||||
|
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
|
||||||
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
|
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
|
||||||
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
|
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
|
||||||
|
|
||||||
|
@ -47,16 +48,28 @@ public final class ForEachProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
private final String field;
|
private final String field;
|
||||||
private final Processor processor;
|
private final Processor processor;
|
||||||
|
private final boolean ignoreMissing;
|
||||||
|
|
||||||
ForEachProcessor(String tag, String field, Processor processor) {
|
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
|
||||||
super(tag);
|
super(tag);
|
||||||
this.field = field;
|
this.field = field;
|
||||||
this.processor = processor;
|
this.processor = processor;
|
||||||
|
this.ignoreMissing = ignoreMissing;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isIgnoreMissing() {
|
||||||
|
return ignoreMissing;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(IngestDocument ingestDocument) throws Exception {
|
public void execute(IngestDocument ingestDocument) throws Exception {
|
||||||
List values = ingestDocument.getFieldValue(field, List.class);
|
List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
|
||||||
|
if (values == null) {
|
||||||
|
if (ignoreMissing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
|
||||||
|
}
|
||||||
List<Object> newValues = new ArrayList<>(values.size());
|
List<Object> newValues = new ArrayList<>(values.size());
|
||||||
for (Object value : values) {
|
for (Object value : values) {
|
||||||
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
|
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
|
||||||
|
@ -87,6 +100,7 @@ public final class ForEachProcessor extends AbstractProcessor {
|
||||||
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
|
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
|
||||||
Map<String, Object> config) throws Exception {
|
Map<String, Object> config) throws Exception {
|
||||||
String field = readStringProperty(TYPE, tag, config, "field");
|
String field = readStringProperty(TYPE, tag, config, "field");
|
||||||
|
boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
|
||||||
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
|
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
|
||||||
Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
|
Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
|
||||||
if (entries.size() != 1) {
|
if (entries.size() != 1) {
|
||||||
|
@ -94,7 +108,7 @@ public final class ForEachProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
|
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
|
||||||
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
|
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
|
||||||
return new ForEachProcessor(tag, field, processor);
|
return new ForEachProcessor(tag, field, processor, ignoreMissing);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,24 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
|
||||||
assertThat(forEachProcessor, Matchers.notNullValue());
|
assertThat(forEachProcessor, Matchers.notNullValue());
|
||||||
assertThat(forEachProcessor.getField(), equalTo("_field"));
|
assertThat(forEachProcessor.getField(), equalTo("_field"));
|
||||||
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
|
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
|
||||||
|
assertFalse(forEachProcessor.isIgnoreMissing());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSetIgnoreMissing() throws Exception {
|
||||||
|
Processor processor = new TestProcessor(ingestDocument -> { });
|
||||||
|
Map<String, Processor.Factory> registry = new HashMap<>();
|
||||||
|
registry.put("_name", (r, t, c) -> processor);
|
||||||
|
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
|
||||||
|
|
||||||
|
Map<String, Object> config = new HashMap<>();
|
||||||
|
config.put("field", "_field");
|
||||||
|
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
|
||||||
|
config.put("ignore_missing", true);
|
||||||
|
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
|
||||||
|
assertThat(forEachProcessor, Matchers.notNullValue());
|
||||||
|
assertThat(forEachProcessor.getField(), equalTo("_field"));
|
||||||
|
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
|
||||||
|
assertTrue(forEachProcessor.isIgnoreMissing());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCreateWithTooManyProcessorTypes() throws Exception {
|
public void testCreateWithTooManyProcessorTypes() throws Exception {
|
||||||
|
|
|
@ -19,6 +19,13 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest.common;
|
package org.elasticsearch.ingest.common;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
import org.elasticsearch.ingest.CompoundProcessor;
|
import org.elasticsearch.ingest.CompoundProcessor;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
@ -27,14 +34,7 @@ import org.elasticsearch.ingest.TestTemplateService;
|
||||||
import org.elasticsearch.script.TemplateScript;
|
import org.elasticsearch.script.TemplateScript;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class ForEachProcessorTests extends ESTestCase {
|
public class ForEachProcessorTests extends ESTestCase {
|
||||||
|
@ -49,7 +49,8 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
|
|
||||||
ForEachProcessor processor = new ForEachProcessor(
|
ForEachProcessor processor = new ForEachProcessor(
|
||||||
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value")
|
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
|
||||||
|
false
|
||||||
);
|
);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
|
|
||||||
|
@ -69,7 +70,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
throw new RuntimeException("failure");
|
throw new RuntimeException("failure");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor);
|
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
|
||||||
try {
|
try {
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
fail("exception expected");
|
fail("exception expected");
|
||||||
|
@ -89,7 +90,8 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
|
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
|
||||||
processor = new ForEachProcessor(
|
processor = new ForEachProcessor(
|
||||||
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))
|
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
|
||||||
|
false
|
||||||
);
|
);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
|
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
|
||||||
|
@ -109,7 +111,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type"));
|
id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type"));
|
||||||
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
|
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
|
||||||
});
|
});
|
||||||
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
|
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
|
|
||||||
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
|
assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
|
||||||
|
@ -137,7 +139,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
ForEachProcessor processor = new ForEachProcessor(
|
ForEachProcessor processor = new ForEachProcessor(
|
||||||
"_tag", "values", new SetProcessor("_tag",
|
"_tag", "values", new SetProcessor("_tag",
|
||||||
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
|
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
|
||||||
(model) -> model.get("other")));
|
(model) -> model.get("other")), false);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
|
|
||||||
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
|
assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
|
||||||
|
@ -174,7 +176,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
"_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values)
|
"_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values)
|
||||||
);
|
);
|
||||||
|
|
||||||
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
|
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
List<String> result = ingestDocument.getFieldValue("values", List.class);
|
List<String> result = ingestDocument.getFieldValue("values", List.class);
|
||||||
|
@ -199,7 +201,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
"_tag", "values", new CompoundProcessor(false,
|
"_tag", "values", new CompoundProcessor(false,
|
||||||
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
|
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
|
||||||
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
|
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
|
||||||
));
|
), false);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
|
|
||||||
List result = ingestDocument.getFieldValue("values", List.class);
|
List result = ingestDocument.getFieldValue("values", List.class);
|
||||||
|
@ -225,7 +227,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
|
|
||||||
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
|
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
|
||||||
doc.getFieldValue("_source._value", String.class)));
|
doc.getFieldValue("_source._value", String.class)));
|
||||||
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor);
|
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
|
||||||
forEachProcessor.execute(ingestDocument);
|
forEachProcessor.execute(ingestDocument);
|
||||||
|
|
||||||
List result = ingestDocument.getFieldValue("values", List.class);
|
List result = ingestDocument.getFieldValue("values", List.class);
|
||||||
|
@ -258,7 +260,7 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
|
doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
|
||||||
);
|
);
|
||||||
ForEachProcessor processor = new ForEachProcessor(
|
ForEachProcessor processor = new ForEachProcessor(
|
||||||
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor));
|
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
|
|
||||||
List result = ingestDocument.getFieldValue("values1.0.values2", List.class);
|
List result = ingestDocument.getFieldValue("values1.0.values2", List.class);
|
||||||
|
@ -270,4 +272,16 @@ public class ForEachProcessorTests extends ESTestCase {
|
||||||
assertThat(result.get(1), equalTo("JKL"));
|
assertThat(result.get(1), equalTo("JKL"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIgnoreMissing() throws Exception {
|
||||||
|
IngestDocument originalIngestDocument = new IngestDocument(
|
||||||
|
"_index", "_type", "_id", null, null, null, Collections.emptyMap()
|
||||||
|
);
|
||||||
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
|
TestProcessor testProcessor = new TestProcessor(doc -> {});
|
||||||
|
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
|
||||||
|
processor.execute(ingestDocument);
|
||||||
|
assertIngestDocument(originalIngestDocument, ingestDocument);
|
||||||
|
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue