Add support for a more compact enrich values format (#45033)

In the case that source and target are the same in `enrich_values` then
a string array can be specified.

For example instead of this:

```
PUT /_ingest/pipeline/my-pipeline
{
    "processors": [
        {
            "enrich" : {
                "policy_name": "my-policy",
                "enrich_values": [
                    {
                        "source": "first_name",
                        "target": "first_name"
                    },
                    {
                        "source": "last_name",
                        "target": "last_name"
                    },
                    {
                        "source": "address",
                        "target": "address"
                    },
                    {
                        "source": "city",
                        "target": "city"
                    },
                    {
                        "source": "state",
                        "target": "state"
                    },
                    {
                        "source": "zip",
                        "target": "zip"
                    }
                ]
            }
        }
    ]
}
```
This more compact format can be specified:

```
PUT /_ingest/pipeline/my-pipeline
{
    "processors": [
        {
            "enrich" : {
                "policy_name": "my-policy",
                "targets": [
                   "first_name",
                   "last_name",
                   "address",
                   "city",
                   "state",
                   "zip"
                ]
            }
        }
    ]
}
```

And the `enrich_values` key has been renamed to `set_from`.

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-08-09 12:34:48 +02:00
parent 5e1c0d598c
commit 4ac25b23f6
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
6 changed files with 74 additions and 14 deletions

View File

@ -54,7 +54,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
// Create pipeline
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
putPipelineRequest.setJsonEntity("{\"processors\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"enrich_values\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
"]}}" +

View File

@ -40,11 +40,26 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
final List<EnrichSpecification> specifications;
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
specifications = specificationConfig.stream()
final List<Map<?, ?>> setFromConfig = ConfigurationUtils.readOptionalList(TYPE, tag, config, "set_from");
if (setFromConfig != null) {
if (setFromConfig.isEmpty()) {
throw new IllegalArgumentException("provided set_from is empty");
}
// TODO: Add templating support in enrich_values source and target options
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
.collect(Collectors.toList());
specifications = setFromConfig.stream()
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
.collect(Collectors.toList());
} else {
final List<String> targetsConfig = ConfigurationUtils.readList(TYPE, tag, config, "targets");
if (targetsConfig.isEmpty()) {
throw new IllegalArgumentException("provided targets is empty");
}
specifications = targetsConfig.stream()
.map(value -> new EnrichSpecification(value, value))
.collect(Collectors.toList());
}
for (EnrichSpecification specification : specifications) {
if (policy.getEnrichValues().contains(specification.sourceField) == false) {

View File

@ -57,7 +57,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"enrich_values\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"\", \"set_from\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" +
"]}}]}";
@ -104,7 +104,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
String pipelineName = "pipeline" + i;
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"enrich_values\": [{\"source\": \"value\", \"target\": \"value\"}" +
"\", \"set_from\": [{\"source\": \"value\", \"target\": \"value\"}" +
"]}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();

View File

@ -180,7 +180,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
private static void createPipeline() {
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + POLICY_NAME +
"\", \"enrich_values\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"\", \"set_from\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" +
"]}}]}";

View File

@ -41,7 +41,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"enrich_values\": []}}]}";
String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"targets\": [\"field1\"]}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON);
assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet());
Pipeline pipelineInstance1 = ingestService.getPipeline("1");

View File

@ -57,7 +57,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
config.put("set_from", valuesConfig);
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
@ -103,7 +103,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
config.put("set_from", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("policy [majestic] does not exists"));
@ -136,7 +136,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
config.put("set_from", valuesConfig);
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("[policy_name] required property is missing"));
@ -170,7 +170,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
config.put("set_from", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]"));
@ -191,10 +191,55 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
entry.put("source", "rank");
entry.put("target", "rank");
List<Map<String, Object>> valuesConfig = Collections.singletonList(entry);
config.put("enrich_values", valuesConfig);
config.put("set_from", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]"));
}
public void testCompactEnrichValuesFormat() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("majestic", policy);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("targets", enrichValues);
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getEnrichKey(), equalTo("host"));
assertThat(result.getSpecifications().size(), equalTo(enrichValues.size()));
for (int i = 0; i < enrichValues.size(); i++) {
EnrichSpecification actual = result.getSpecifications().get(i);
String expected = enrichValues.get(i);
assertThat(actual.sourceField, equalTo(expected));
assertThat(actual.targetField, equalTo(expected));
}
}
public void testNoEnrichValues() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("majestic", policy);
Map<String, Object> config1 = new HashMap<>();
config1.put("policy_name", "majestic");
config1.put("set_from", Collections.emptyList());
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config1));
assertThat(e.getMessage(), equalTo("provided set_from is empty"));
Map<String, Object> config2 = new HashMap<>();
config2.put("policy_name", "majestic");
config2.put("targets", Collections.emptyList());
e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config2));
assertThat(e.getMessage(), equalTo("provided targets is empty"));
}
}