update foreach processor to only support one applied processor. (#19402)

Closes #19345.
This commit is contained in:
Tal Levy 2016-07-13 13:13:00 -07:00 committed by GitHub
parent 3f2e1066d3
commit 8fd01554bc
8 changed files with 108 additions and 111 deletions

View File

@ -251,7 +251,7 @@ public final class ConfigurationUtils {
return processors; return processors;
} }
private static Processor readProcessor(Map<String, Processor.Factory> processorFactories, public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
String type, Map<String, Object> config) throws Exception { String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorFactories.get(type); Processor.Factory factory = processorFactories.get(type);
if (factory != null) { if (factory != null) {

View File

@ -856,10 +856,10 @@ Processes elements in an array of unknown length.
All processors can operate on elements inside an array, but if all elements of an array need to All processors can operate on elements inside an array, but if all elements of an array need to
be processed in the same way, defining a processor for each element becomes cumbersome and tricky be processed in the same way, defining a processor for each element becomes cumbersome and tricky
because it is likely that the number of elements in an array is unknown. For this reason the `foreach` because it is likely that the number of elements in an array is unknown. For this reason the `foreach`
processor exists. By specifying the field holding array elements and a list of processors that processor exists. By specifying the field holding array elements and a processor that
define what should happen to each element, array fields can easily be preprocessed. defines what should happen to each element, array fields can easily be preprocessed.
Processors inside the foreach processor work in a different context, and the only valid top-level A processor inside the foreach processor works in a different context, and the only valid top-level
field is `_value`, which holds the array element value. Under this field other fields may exist. field is `_value`, which holds the array element value. Under this field other fields may exist.
If the `foreach` processor fails to process an element inside the array, and no `on_failure` processor has been specified, If the `foreach` processor fails to process an element inside the array, and no `on_failure` processor has been specified,
@ -871,7 +871,7 @@ then it aborts the execution and leaves the array unmodified.
|====== |======
| Name | Required | Default | Description | Name | Required | Default | Description
| `field` | yes | - | The array field | `field` | yes | - | The array field
| `processors` | yes | - | The processors | `processor` | yes | - | The processor to execute against each field
|====== |======
Assume the following document: Assume the following document:
@ -890,13 +890,11 @@ When this `foreach` processor operates on this sample document:
{ {
"foreach" : { "foreach" : {
"field" : "values", "field" : "values",
"processors" : [ "processor" : {
{
"uppercase" : { "uppercase" : {
"field" : "_value" "field" : "_value"
} }
} }
]
} }
} }
-------------------------------------------------- --------------------------------------------------
@ -936,13 +934,11 @@ so the following `foreach` processor is used:
{ {
"foreach" : { "foreach" : {
"field" : "persons", "field" : "persons",
"processors" : [ "processor" : {
{
"remove" : { "remove" : {
"field" : "_value.id" "field" : "_value.id"
} }
} }
]
} }
} }
-------------------------------------------------- --------------------------------------------------
@ -975,8 +971,7 @@ block to send the document to the 'failure_index' index for later inspection:
{ {
"foreach" : { "foreach" : {
"field" : "persons", "field" : "persons",
"processors" : [ "processor" : {
{
"remove" : { "remove" : {
"field" : "_value.id", "field" : "_value.id",
"on_failure" : [ "on_failure" : [
@ -989,7 +984,6 @@ block to send the document to the 'failure_index' index for later inspection:
] ]
} }
} }
]
} }
} }
-------------------------------------------------- --------------------------------------------------

View File

@ -28,10 +28,14 @@ import org.elasticsearch.ingest.Processor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readList; import static org.elasticsearch.ingest.ConfigurationUtils.readList;
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
/** /**
@ -45,12 +49,12 @@ public final class ForEachProcessor extends AbstractProcessor {
public static final String TYPE = "foreach"; public static final String TYPE = "foreach";
private final String field; private final String field;
private final List<Processor> processors; private final Processor processor;
ForEachProcessor(String tag, String field, List<Processor> processors) { ForEachProcessor(String tag, String field, Processor processor) {
super(tag); super(tag);
this.field = field; this.field = field;
this.processors = processors; this.processor = processor;
} }
@Override @Override
@ -61,9 +65,7 @@ public final class ForEachProcessor extends AbstractProcessor {
Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata()); Map<String, Object> innerSource = new HashMap<>(ingestDocument.getSourceAndMetadata());
innerSource.put("_value", value); // scalar value to access the list item being evaluated innerSource.put("_value", value); // scalar value to access the list item being evaluated
IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata()); IngestDocument innerIngestDocument = new IngestDocument(innerSource, ingestDocument.getIngestMetadata());
for (Processor processor : processors) {
processor.execute(innerIngestDocument); processor.execute(innerIngestDocument);
}
newValues.add(innerSource.get("_value")); newValues.add(innerSource.get("_value"));
} }
ingestDocument.setFieldValue(field, newValues); ingestDocument.setFieldValue(field, newValues);
@ -78,8 +80,8 @@ public final class ForEachProcessor extends AbstractProcessor {
return field; return field;
} }
List<Processor> getProcessors() { Processor getProcessor() {
return processors; return processor;
} }
public static final class Factory implements Processor.Factory { public static final class Factory implements Processor.Factory {
@ -87,9 +89,14 @@ public final class ForEachProcessor extends AbstractProcessor {
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag, public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception { Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field"); String field = readStringProperty(TYPE, tag, config, "field");
List<Map<String, Map<String, Object>>> processorConfigs = readList(TYPE, tag, config, "processors"); Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, factories); Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors)); if (entries.size() != 1) {
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor);
} }
} }
} }

View File

@ -19,11 +19,9 @@
package org.elasticsearch.ingest.common; package org.elasticsearch.ingest.common;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
@ -31,7 +29,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.mockito.Mockito.mock; import static org.hamcrest.Matchers.equalTo;
public class ForEachProcessorFactoryTests extends ESTestCase { public class ForEachProcessorFactoryTests extends ESTestCase {
@ -43,30 +41,57 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("field", "_field"); config.put("field", "_field");
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config); ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), Matchers.equalTo("_field")); assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessors().size(), Matchers.equalTo(1)); assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
assertThat(forEachProcessor.getProcessors().get(0), Matchers.sameInstance(processor));
config = new HashMap<>();
config.put("processors", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
try {
forEachFactory.create(registry, null, config);
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
} }
config = new HashMap<>(); public void testCreateWithTooManyProcessorTypes() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field"); config.put("field", "_field");
try { Map<String, Object> processorTypes = new HashMap<>();
forEachFactory.create(registry, null, config); processorTypes.put("_first", Collections.emptyMap());
fail("exception expected"); processorTypes.put("_second", Collections.emptyMap());
} catch (Exception e) { config.put("processor", processorTypes);
assertThat(e.getMessage(), Matchers.equalTo("[processors] required property is missing")); Exception exception = expectThrows(ElasticsearchParseException.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[processor] Must specify exactly one processor type"));
} }
public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Exception expectedException = expectThrows(ElasticsearchParseException.class,
() -> forEachFactory.create(Collections.emptyMap(), null, config));
assertThat(expectedException.getMessage(), equalTo("No processor type exists with name [_name]"));
}
public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}
public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
assertThat(exception.getMessage(), equalTo("[processor] required property is missing"));
} }
} }

View File

@ -25,7 +25,6 @@ import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList; import java.util.ArrayList;
@ -50,7 +49,7 @@ public class ForEachProcessorTests extends ESTestCase {
); );
ForEachProcessor processor = new ForEachProcessor( ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Collections.singletonList(new UppercaseProcessor("_tag", "_value")) "_tag", "values", new UppercaseProcessor("_tag", "_value")
); );
processor.execute(ingestDocument); processor.execute(ingestDocument);
@ -70,7 +69,7 @@ public class ForEachProcessorTests extends ESTestCase {
throw new RuntimeException("failure"); throw new RuntimeException("failure");
} }
}); });
ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(testProcessor)); ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor);
try { try {
processor.execute(ingestDocument); processor.execute(ingestDocument);
fail("exception expected"); fail("exception expected");
@ -90,8 +89,7 @@ public class ForEachProcessorTests extends ESTestCase {
}); });
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
processor = new ForEachProcessor( processor = new ForEachProcessor(
"_tag", "values", "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))
Collections.singletonList(new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)))
); );
processor.execute(ingestDocument); processor.execute(ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(testProcessor.getInvokedCounter(), equalTo(3));
@ -111,7 +109,7 @@ public class ForEachProcessorTests extends ESTestCase {
id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type")); id.setFieldValue("_value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id")); id.setFieldValue("_value.id", id.getSourceAndMetadata().get("_id"));
}); });
ForEachProcessor processor = new ForEachProcessor("_tag", "values", Collections.singletonList(innerProcessor)); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
processor.execute(ingestDocument); processor.execute(ingestDocument);
assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
@ -138,9 +136,7 @@ public class ForEachProcessorTests extends ESTestCase {
TemplateService ts = TestTemplateService.instance(); TemplateService ts = TestTemplateService.instance();
ForEachProcessor processor = new ForEachProcessor( ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", Arrays.asList( "_tag", "values", new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other"))
new AppendProcessor("_tag", ts.compile("flat_values"), ValueSource.wrap("value", ts)),
new SetProcessor("_tag", ts.compile("_value.new_field"), (model) -> model.get("other")))
); );
processor.execute(ingestDocument); processor.execute(ingestDocument);
@ -149,21 +145,10 @@ public class ForEachProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue("values.2.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.2.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.3.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.3.new_field", String.class), equalTo("value"));
assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value"));
List<String> flatValues = ingestDocument.getFieldValue("flat_values", List.class);
assertThat(flatValues.size(), equalTo(5));
assertThat(flatValues.get(0), equalTo("value"));
assertThat(flatValues.get(1), equalTo("value"));
assertThat(flatValues.get(2), equalTo("value"));
assertThat(flatValues.get(3), equalTo("value"));
assertThat(flatValues.get(4), equalTo("value"));
} }
public void testRandom() throws Exception { public void testRandom() throws Exception {
int numProcessors = randomInt(8); Processor innerProcessor = new Processor() {
List<Processor> processors = new ArrayList<>(numProcessors);
for (int i = 0; i < numProcessors; i++) {
processors.add(new Processor() {
@Override @Override
public void execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument) throws Exception {
String existingValue = ingestDocument.getFieldValue("_value", String.class); String existingValue = ingestDocument.getFieldValue("_value", String.class);
@ -179,8 +164,7 @@ public class ForEachProcessorTests extends ESTestCase {
public String getTag() { public String getTag() {
return null; return null;
} }
}); };
}
int numValues = randomIntBetween(1, 32); int numValues = randomIntBetween(1, 32);
List<String> values = new ArrayList<>(numValues); List<String> values = new ArrayList<>(numValues);
for (int i = 0; i < numValues; i++) { for (int i = 0; i < numValues; i++) {
@ -190,18 +174,13 @@ public class ForEachProcessorTests extends ESTestCase {
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
); );
ForEachProcessor processor = new ForEachProcessor("_tag", "values", processors); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
processor.execute(ingestDocument); processor.execute(ingestDocument);
List<String> result = ingestDocument.getFieldValue("values", List.class); List<String> result = ingestDocument.getFieldValue("values", List.class);
assertThat(result.size(), equalTo(numValues)); assertThat(result.size(), equalTo(numValues));
String expectedString = "";
for (int i = 0; i < numProcessors; i++) {
expectedString = expectedString + ".";
}
for (String r : result) { for (String r : result) {
assertThat(r, equalTo(expectedString)); assertThat(r, equalTo("."));
} }
} }

View File

@ -10,13 +10,11 @@
{ {
"foreach" : { "foreach" : {
"field" : "values", "field" : "values",
"processors" : [ "processor" : {
{
"uppercase" : { "uppercase" : {
"field" : "_value" "field" : "_value"
} }
} }
]
} }
} }
] ]

View File

@ -231,14 +231,12 @@
{ {
"foreach": { "foreach": {
"field": "values", "field": "values",
"processors": [ "processor": {
{
"append": { "append": {
"field": "values_flat", "field": "values_flat",
"value": "{{_value.key}}_{{_value.value}}" "value": "{{_value.key}}_{{_value.value}}"
} }
} }
]
} }
} }
] ]

View File

@ -82,13 +82,11 @@
{ {
"foreach" : { "foreach" : {
"field" : "friends", "field" : "friends",
"processors" : [ "processor" : {
{
"remove" : { "remove" : {
"field" : "_value.id" "field" : "_value.id"
} }
} }
]
} }
}, },
{ {
@ -106,13 +104,11 @@
{ {
"foreach" : { "foreach" : {
"field" : "address", "field" : "address",
"processors" : [ "processor" : {
{
"trim" : { "trim" : {
"field" : "_value" "field" : "_value"
} }
} }
]
} }
}, },
{ {