Enrich processor configuration changes (#45466)

Enrich processor configuration changes:
* Renamed `enrich_key` option to `field` option.
* Replaced `set_from` and `targets` options with `target_field`.

The `target_field` option behaves different to how `set_from` and
`targets` worked. The `target_field` is the field that will contain
the looked up document.

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-08-22 09:22:40 +02:00
parent 7f2ba91360
commit 33972423e9
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
12 changed files with 144 additions and 327 deletions

View File

@ -906,8 +906,8 @@ PUT _ingest/pipeline/user_lookup
{
"enrich" : {
"policy_name": "users-policy",
"enrich_key" : "email",
"targets": ["address", "city", "zip", "state"]
"field" : "email",
"target_field": "user"
}
}
]
@ -936,10 +936,15 @@ Which returns:
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"user": {
"email": "mardy.brown@email.me",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA",
"state": "LA"
},
"email": "mardy.brown@email.me"
}
}

View File

@ -6,96 +6,16 @@
The `enrich` processor can enrich documents with data from another index.
See <<ingest-enriching-data,enrich data>> section for more information how to set this up and
check out the <<enrich-processor-getting-started,getting started>> to get familiar with enrich policies and related APIs.
a
[[enrich-options]]
.Enrich Options
[options="header"]
|======
| Name | Required | Default | Description
| `policy_name` | yes | - | The name of the enrich policy to use.
| `enrich_key` | no | Policy enrich_key | The field to get the value from for the enrich lookup.
| `ignore_missing` | no | `false` | If `true` and `enrich_key` does not exist, the processor quietly exits without modifying the document
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data.
| `target_field` | yes | - | The field that will be used for the enrichment data.
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `targets` | no 1) | - | Describes what fields should be added to the document being indexed from the lookup document
| `set_from` | no 1) | - | Same as `targets`, but allows fields from the lookup document to added under a different name to the document being indexed
include::common-options.asciidoc[]
|======
1) Either `targets` or `set_from` must be specified.
[[enrich-processor-set-from]]
==== Enrich `set_from` option
This option should be used in the case that the field in the looked up document should be placed under
a different field in the document being ingested.
The `set_from` accepts an array with two fields:
* `source` - The name of the field in the lookup document
* `target` - The name of the field in the document being ingested that should hold the source field's value.
For example:
//////////////////////////
[source,js]
--------------------------------------------------
PUT /_enrich/policy/users-policy
{
"type": "exact_match",
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
}
--------------------------------------------------
// CONSOLE
// TEST
//////////////////////////
[source,js]
--------------------------------------------------
PUT _ingest/pipeline/user_lookup
{
"description" : "Enriching user details to messages",
"processors" : [
{
"enrich" : {
"policy_name": "users-policy",
"enrich_key" : "email",
"set_from": [
{
"source": "address",
"target": "address-line-1"
},
{
"source": "city",
"target": "residence"
},
{
"source": "zip",
"target": "zipcode"
},
{
"source": "state",
"target": "us_state"
}
]
}
}
]
}
--------------------------------------------------
// CONSOLE
// TEST[continued]
//////////////////////////
[source,js]
--------------------------------------------------
DELETE /_ingest/pipeline/user_lookup
DELETE /_enrich/policy/users-policy
--------------------------------------------------
// CONSOLE
// TEST[continued]
//////////////////////////

View File

@ -54,10 +54,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
// Create pipeline
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
putPipelineRequest.setJsonEntity("{\"processors\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
"]}}" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" +
"]}");
assertOK(client().performRequest(putPipelineRequest));
@ -70,11 +67,12 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> _source = (Map<?, ?>) response.get("_source");
assertThat(_source.size(), equalTo(3));
Map<?, ?> _source = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
assertThat(_source.size(), equalTo(4));
assertThat(_source.get("host"), equalTo("elastic.co"));
assertThat(_source.get("global_rank"), equalTo(25));
assertThat(_source.get("tld_rank"), equalTo(7));
assertThat(_source.get("tld"), equalTo("co"));
assertThat(_source.get("globalRank"), equalTo(25));
assertThat(_source.get("tldRank"), equalTo(7));
if (deletePipeilne) {
// delete the pipeline so the policies can be deleted
@ -113,10 +111,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
putPipelineRequest.setJsonEntity("{\"processors\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
"]}}" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" +
"]}");
assertOK(client().performRequest(putPipelineRequest));

View File

@ -12,10 +12,8 @@ import org.elasticsearch.ingest.Processor;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
final class EnrichProcessorFactory implements Processor.Factory, Consumer<ClusterState> {
@ -35,42 +33,15 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
}
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getMatchField());
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
final List<EnrichSpecification> specifications;
final List<Map<?, ?>> setFromConfig = ConfigurationUtils.readOptionalList(TYPE, tag, config, "set_from");
if (setFromConfig != null) {
if (setFromConfig.isEmpty()) {
throw new IllegalArgumentException("provided set_from is empty");
}
// TODO: Add templating support in enrich_values source and target options
specifications = setFromConfig.stream()
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
.collect(Collectors.toList());
} else {
final List<String> targetsConfig = ConfigurationUtils.readList(TYPE, tag, config, "targets");
if (targetsConfig.isEmpty()) {
throw new IllegalArgumentException("provided targets is empty");
}
specifications = targetsConfig.stream()
.map(value -> new EnrichSpecification(value, value))
.collect(Collectors.toList());
}
for (EnrichSpecification specification : specifications) {
if (policy.getEnrichFields().contains(specification.sourceField) == false) {
throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" +
policyName + "]");
}
}
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
switch (policy.getType()) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
return new ExactMatchProcessor(tag, client, policyName, field, targetField, policy.getMatchField(),
ignoreMissing, overrideEnabled);
default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
}
@ -89,15 +60,4 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
policies = enrichMetadata.getPolicies();
}
static final class EnrichSpecification {
final String sourceField;
final String targetField;
EnrichSpecification(String sourceField, String targetField) {
this.sourceField = sourceField;
this.targetField = targetField;
}
}
}

View File

@ -16,10 +16,8 @@ import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
@ -28,61 +26,65 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String enrichKey;
private final String field;
private final String targetField;
private final String matchField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
private final List<EnrichSpecification> specifications;
ExactMatchProcessor(String tag,
Client client,
String policyName,
String enrichKey,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
List<EnrichSpecification> specifications) {
boolean overrideEnabled) {
this(
tag,
createSearchRunner(client),
policyName,
enrichKey,
ignoreMissing,
overrideEnabled,
specifications
field,
targetField,
matchField, ignoreMissing,
overrideEnabled
);
}
ExactMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String enrichKey,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
List<EnrichSpecification> specifications) {
boolean overrideEnabled) {
super(tag, policyName);
this.searchRunner = searchRunner;
this.enrichKey = enrichKey;
this.field = field;
this.targetField = targetField;
this.matchField = matchField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.specifications = specifications;
}
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
return;
}
TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value);
TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(1);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
@ -104,18 +106,15 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
handler.accept(ingestDocument, null);
return;
} else if (searchHits.length > 1) {
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"));
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]"));
return;
}
// If a document is returned, add its fields to the document
Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
for (EnrichSpecification specification : specifications) {
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
}
assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length";
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
ingestDocument.setFieldValue(targetField, enrichDocument);
}
handler.accept(ingestDocument, null);
});
@ -134,8 +133,16 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
return EnrichProcessorFactory.TYPE;
}
String getEnrichKey() {
return enrichKey;
String getField() {
return field;
}
public String getTargetField() {
return targetField;
}
public String getMatchField() {
return matchField;
}
boolean isIgnoreMissing() {
@ -146,10 +153,6 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
return overrideEnabled;
}
List<EnrichSpecification> getSpecifications() {
return specifications;
}
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(

View File

@ -246,6 +246,10 @@ public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse
}
private static BytesReference filterSource(FetchSourceContext fetchSourceContext, BytesReference source) throws IOException {
if (fetchSourceContext.includes().length == 0 && fetchSourceContext.excludes().length == 0) {
return source;
}
Set<String> includes = new HashSet<>(Arrays.asList(fetchSourceContext.includes()));
Set<String> excludes = new HashSet<>(Arrays.asList(fetchSourceContext.excludes()));

View File

@ -32,11 +32,12 @@ import java.util.Map;
import java.util.Set;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.KEY_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.mapOf;
import static org.elasticsearch.xpack.enrich.ExactMatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class BasicEnrichTests extends ESSingleNodeTestCase {
@ -51,17 +52,14 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
String policyName = "my-policy";
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS));
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"set_from\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" +
"]}}]}";
"\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
@ -70,7 +68,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
IndexRequest indexRequest = new IndexRequest();
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline(pipelineName);
indexRequest.source(Collections.singletonMap(KEY_FIELD, keys.get(i)));
indexRequest.source(Collections.singletonMap(MATCH_FIELD, keys.get(i)));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
@ -83,11 +81,14 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
for (int i = 0; i < numDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length));
Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
assertThat(userEntry, notNullValue());
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
for (int j = 0; j < 3; j++) {
String field = DECORATE_FIELDS[j];
assertThat(source.get(field), equalTo(keys.get(i) + j));
assertThat(userEntry.get(field), equalTo(keys.get(i) + j));
}
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}
}
@ -109,8 +110,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
String pipelineName = "pipeline" + i;
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"set_from\": [{\"source\": \"value\", \"target\": \"value\"}" +
"]}}]}";
"\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
}
@ -130,7 +130,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("value"), equalTo("val" + i));
assertThat(source.get("target"), equalTo(mapOf("key", "key", "value", "val" + i)));
}
}
@ -145,7 +145,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.create(true);
indexRequest.id(key);
indexRequest.source(mapOf(KEY_FIELD, key, DECORATE_FIELDS[0], key + "0",
indexRequest.source(mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0",
DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
client().index(indexRequest).actionGet();
}

View File

@ -34,12 +34,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.xpack.enrich.ExactMatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -50,7 +50,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
static final String POLICY_NAME = "my-policy";
private static final String PIPELINE_NAME = "my-pipeline";
static final String SOURCE_INDEX_NAME = "users";
static final String KEY_FIELD = "email";
static final String MATCH_FIELD = "email";
static final String[] DECORATE_FIELDS = new String[]{"address", "city", "country"};
@Override
@ -77,7 +77,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS));
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
@ -136,7 +136,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
IndexRequest indexRequest = new IndexRequest();
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline(PIPELINE_NAME);
indexRequest.source(Collections.singletonMap(KEY_FIELD, randomFrom(keys)));
indexRequest.source(Collections.singletonMap(MATCH_FIELD, randomFrom(keys)));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client(coordinatingNode).bulk(bulkRequest).actionGet();
@ -149,9 +149,11 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
for (int i = 0; i < numDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length));
Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
for (String field : DECORATE_FIELDS) {
assertThat(source.get(field), notNullValue());
assertThat(userEntry.get(field), notNullValue());
}
}
}
@ -167,7 +169,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.create(true);
indexRequest.id(key);
indexRequest.source(mapOf(KEY_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4),
indexRequest.source(mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], randomAlphaOfLength(4),
DECORATE_FIELDS[1], randomAlphaOfLength(4), DECORATE_FIELDS[2], randomAlphaOfLength(4)));
client().index(indexRequest).actionGet();
}
@ -177,7 +179,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
private static void createAndExecutePolicy() {
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS));
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();
@ -185,20 +187,9 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
private static void createPipeline() {
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + POLICY_NAME +
"\", \"set_from\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," +
"{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" +
"]}}]}";
"\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}";
PutPipelineRequest request = new PutPipelineRequest(PIPELINE_NAME, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(request).actionGet();
}
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
map.put(key4, value4);
return map;
}
}

View File

@ -41,7 +41,8 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat(enrichProcessorFactory.policies.get("my_policy"), equalTo(instance1));
String pipelineConfig = "{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"targets\": [\"field1\"]}}]}";
String pipelineConfig =
"{\"processors\":[{\"enrich\": {\"policy_name\": \"my_policy\", \"field\": \"key\", \"target_field\": \"target\"}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON);
assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet());
Pipeline pipelineInstance1 = ingestService.getPipeline("1");

View File

@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.util.ArrayList;
import java.util.Arrays;
@ -33,7 +32,8 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
config.put("field", "host");
config.put("target_field", "entry");
boolean keyIgnoreMissing = randomBoolean();
if (keyIgnoreMissing || randomBoolean()) {
config.put("ignore_missing", keyIgnoreMissing);
@ -50,32 +50,18 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
List<Map<String, Object>> valuesConfig = new ArrayList<>(numRandomValues);
for (Tuple<String, String> tuple : randomValues) {
Map<String, Object> entry = new HashMap<>();
entry.put("source", tuple.v1());
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("set_from", valuesConfig);
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getEnrichKey(), equalTo("host"));
assertThat(result.getField(), equalTo("host"));
assertThat(result.getTargetField(), equalTo("entry"));
assertThat(result.getMatchField(), equalTo("my_key"));
assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing));
if (overrideEnabled != null) {
assertThat(result.isOverrideEnabled(), is(overrideEnabled));
} else {
assertThat(result.isOverrideEnabled(), is(true));
}
assertThat(result.getSpecifications().size(), equalTo(numRandomValues));
for (int i = 0; i < numRandomValues; i++) {
EnrichSpecification actual = result.getSpecifications().get(i);
Tuple<String, String> expected = randomValues.get(i);
assertThat(actual.sourceField, equalTo(expected.v1()));
assertThat(actual.targetField, equalTo(expected.v2()));
}
}
public void testPolicyDoesNotExist() {
@ -151,52 +137,17 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
config.put("field", "host");
config.put("target_field", "entry");
boolean keyIgnoreMissing = randomBoolean();
if (keyIgnoreMissing || randomBoolean()) {
config.put("ignore_missing", keyIgnoreMissing);
}
int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
List<Map<String, Object>> valuesConfig = new ArrayList<>(numRandomValues);
for (Tuple<String, String> tuple : randomValues) {
Map<String, Object> entry = new HashMap<>();
entry.put("source", tuple.v1());
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("set_from", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]"));
}
public void testNonExistingDecorateField() {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.policies = Collections.singletonMap("majestic", policy);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
Map<String, Object> entry = new HashMap<>();
entry.put("source", "rank");
entry.put("target", "rank");
List<Map<String, Object>> valuesConfig = Collections.singletonList(entry);
config.put("set_from", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]"));
}
public void testCompactEnrichValuesFormat() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
@ -206,22 +157,17 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("targets", enrichValues);
config.put("field", "host");
config.put("target_field", "entry");
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getEnrichKey(), equalTo("host"));
assertThat(result.getSpecifications().size(), equalTo(enrichValues.size()));
for (int i = 0; i < enrichValues.size(); i++) {
EnrichSpecification actual = result.getSpecifications().get(i);
String expected = enrichValues.get(i);
assertThat(actual.sourceField, equalTo(expected));
assertThat(actual.targetField, equalTo(expected));
}
assertThat(result.getField(), equalTo("host"));
assertThat(result.getTargetField(), equalTo("entry"));
}
public void testNoEnrichValues() throws Exception {
public void testNoTargetField() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
@ -230,16 +176,10 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
Map<String, Object> config1 = new HashMap<>();
config1.put("policy_name", "majestic");
config1.put("set_from", Collections.emptyList());
config1.put("field", "host");
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config1));
assertThat(e.getMessage(), equalTo("provided set_from is empty"));
Map<String, Object> config2 = new HashMap<>();
config2.put("policy_name", "majestic");
config2.put("targets", Collections.emptyList());
e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config2));
assertThat(e.getMessage(), equalTo("provided targets is empty"));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config1));
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
}
}

View File

@ -20,7 +20,7 @@ import java.util.Collections;
import java.util.Optional;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.KEY_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.POLICY_NAME;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME;
import static org.hamcrest.Matchers.equalTo;
@ -49,7 +49,7 @@ public class EnrichRestartIT extends ESIntegTestCase {
internalCluster().startNode();
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Collections.singletonList(SOURCE_INDEX_NAME), KEY_FIELD, Arrays.asList(DECORATE_FIELDS));
Collections.singletonList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);

View File

@ -27,18 +27,16 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
@ -47,17 +45,8 @@ import static org.hamcrest.Matchers.nullValue;
public class ExactMatchProcessorTests extends ESTestCase {
public void testBasics() throws Exception {
Map<String, Map<String, ?>> documents = new HashMap<>();
{
Map<String, Object> document1 = new HashMap<>();
document1.put("globalRank", 451);
document1.put("tldRank",23);
document1.put("tld", "co");
documents.put("elastic.co", document1);
}
MockSearchFunction mockSearch = mockedSearchFunction(documents);
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
// Run
@ -72,21 +61,24 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(request.source().size(), equalTo(1));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld"));
assertThat(request.source().fetchSource().excludes(), emptyArray());
assertThat(request.source().fetchSource().includes(), emptyArray());
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class));
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
assertThat(termQueryBuilder.value(), equalTo("elastic.co"));
// Check result
assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23));
assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co"));
Map<?, ?> entry = ingestDocument.getFieldValue("entry", Map.class);
assertThat(entry.size(), equalTo(3));
assertThat(entry.get("globalRank"), equalTo(451));
assertThat(entry.get("tldRank"), equalTo(23));
assertThat(entry.get("tld"), equalTo("co"));
}
public void testNoMatch() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction();
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
int numProperties = ingestDocument.getSourceAndMetadata().size();
@ -102,7 +94,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(request.source().size(), equalTo(1));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld"));
assertThat(request.source().fetchSource().includes(), emptyArray());
assertThat(request.source().fetchSource().excludes(), emptyArray());
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class));
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
@ -115,8 +108,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testSearchFailure() throws Exception {
String indexName = ".enrich-_name";
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
// Run
@ -137,7 +129,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(request.source().size(), equalTo(1));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld"));
assertThat(request.source().fetchSource().includes(), emptyArray());
assertThat(request.source().fetchSource().excludes(), emptyArray());
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class));
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
@ -149,10 +142,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testIgnoreKeyMissing() throws Exception {
{
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
true, true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.emptyMap());
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
IngestDocument[] holder = new IngestDocument[1];
@ -161,10 +153,9 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
}
{
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
false, true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.emptyMap());
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
processor.execute(ingestDocument, (result, e) -> {
@ -178,9 +169,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
}
public void testExistingFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
Collections.singletonList(new EnrichSpecification("tld", "tld")));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false);
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf());
IngestDocument[] resultHolder = new IngestDocument[1];
@ -195,9 +185,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
}
public void testExistingNullFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
Collections.singletonList(new EnrichSpecification("tld", "tld")));
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false);
Map<String, Object> source = new HashMap<>();
source.put("domain", "elastic.co");
@ -302,4 +291,13 @@ public class ExactMatchProcessorTests extends ESTestCase {
map.put(key3, value3);
return map;
}
static <K, V> Map<K, V> mapOf(K key1, V value1, K key2, V value2, K key3, V value3, K key4, V value4) {
Map<K, V> map = new HashMap<>();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
map.put(key4, value4);
return map;
}
}