Change exact match processor to match processor. (#46041)

Besides a rename, this changes allows to processor to attach multiple
enrich docs to the document being ingested.

Also in order to control the maximum number of enrich docs to be
included in the document being ingested, the `max_matches` setting
is added to the enrich processor.

Relates #32789
This commit is contained in:
Martijn van Groningen 2019-09-04 15:05:27 +02:00
parent 6bec63fdfa
commit ded98e50b7
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
19 changed files with 186 additions and 132 deletions

View File

@ -38,7 +38,7 @@ public class EnrichIT extends ESRestHighLevelClientTestCase {
public void testCRUD() throws Exception {
final EnrichClient enrichClient = highLevelClient().enrich();
PutPolicyRequest putPolicyRequest = new PutPolicyRequest("my-policy", "exact_match",
PutPolicyRequest putPolicyRequest = new PutPolicyRequest("my-policy", "match",
Collections.singletonList("my-index"), "enrich_key", Collections.singletonList("enrich_value"));
AcknowledgedResponse putPolicyResponse = execute(putPolicyRequest, enrichClient::putPolicy, enrichClient::putPolicyAsync);
assertThat(putPolicyResponse.isAcknowledged(), is(true));
@ -51,9 +51,9 @@ public class EnrichIT extends ESRestHighLevelClientTestCase {
List<?> responsePolicies = (List<?>) responseBody.get("policies");
assertThat(responsePolicies.size(), equalTo(1));
Map<?, ?> responsePolicy = (Map<?, ?>) responsePolicies.get(0);
assertThat(XContentMapValues.extractValue("exact_match.indices", responsePolicy), equalTo(putPolicyRequest.getIndices()));
assertThat(XContentMapValues.extractValue("exact_match.match_field", responsePolicy), equalTo(putPolicyRequest.getMatchField()));
assertThat(XContentMapValues.extractValue("exact_match.enrich_fields", responsePolicy),
assertThat(XContentMapValues.extractValue("match.indices", responsePolicy), equalTo(putPolicyRequest.getIndices()));
assertThat(XContentMapValues.extractValue("match.match_field", responsePolicy), equalTo(putPolicyRequest.getMatchField()));
assertThat(XContentMapValues.extractValue("match.enrich_fields", responsePolicy),
equalTo(putPolicyRequest.getEnrichFields()));
DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");

View File

@ -49,7 +49,7 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
// tag::enrich-put-policy-request
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "exact_match", Arrays.asList("users"),
"users-policy", "match", Arrays.asList("users"),
"email", Arrays.asList("address", "zip", "city", "state"));
// end::enrich-put-policy-request
@ -96,7 +96,7 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
{
// Add a policy, so that it can be deleted:
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "exact_match", Arrays.asList("users"),
"users-policy", "match", Arrays.asList("users"),
"email", Arrays.asList("address", "zip", "city", "state"));
client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);
}

View File

@ -810,7 +810,7 @@ The policy type of the policy determines what kind of enrichment an `enrich` pro
The following policy types are currently supported:
* `exact_match` - Can lookup exactly one document and use its content to enrich the document being ingested.
* `match` - Can lookup documents by running a term query and use the retrieved content to enrich the document being ingested.
[[enrich-processor-getting-started]]
=== Getting started
@ -842,7 +842,7 @@ Create an enrich policy:
--------------------------------------------------
PUT /_enrich/policy/users-policy
{
"exact_match": {
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
@ -922,15 +922,17 @@ Which returns:
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"user": {
"email": "mardy.brown@email.me",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA"
},
"user": [
{
"email": "mardy.brown@email.me",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA"
}
],
"email": "mardy.brown@email.me"
}
}
@ -975,7 +977,7 @@ Request:
--------------------------------------------------
PUT /_enrich/policy/my-policy
{
"exact_match": {
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
@ -1015,7 +1017,7 @@ Response:
{
"policies": [
{
"exact_match": {
"match": {
"name" : "my-policy",
"indices" : ["users"],
"match_field" : "email",
@ -1052,7 +1054,7 @@ Response:
{
"policies": [
{
"exact_match": {
"match": {
"name" : "my-policy",
"indices" : ["users"],
"match_field" : "email",

View File

@ -17,5 +17,6 @@ check out the <<enrich-processor-getting-started,getting started>> to get famili
| `target_field` | yes | - | The field that will be used for the enrichment data.
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `max_matches` | no | 1 | The maximum number of matched documents to include under the configured target field. In order to avoid documents getting too large, the maximum allowed value is 128.
include::common-options.asciidoc[]
|======

View File

@ -33,8 +33,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
public static final String EXACT_MATCH_TYPE = "exact_match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};
public static final String MATCH_TYPE = "match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};
private static final ParseField QUERY = new ParseField("query");
private static final ParseField INDICES = new ParseField("indices");

View File

@ -28,13 +28,13 @@ import static org.hamcrest.Matchers.equalTo;
public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
@After
private void deletePolicies() throws Exception {
public void deletePolicies() throws Exception {
Map<String, Object> responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy")));
@SuppressWarnings("unchecked")
List<Map<?,?>> policies = (List<Map<?,?>>) responseMap.get("policies");
for (Map<?, ?> entry: policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("exact_match.name", entry)));
client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("match.name", entry)));
}
}
@ -71,7 +71,8 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> _source = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
List<?> entries = (List<?>) ((Map<?, ?>) response.get("_source")).get("entry");
Map<?, ?> _source = (Map<?, ?>) entries.get(0);
assertThat(_source.size(), equalTo(4));
assertThat(_source.get("host"), equalTo("elastic.co"));
assertThat(_source.get("tld"), equalTo("co"));
@ -132,7 +133,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
}
public static String generatePolicySource(String index) throws IOException {
XContentBuilder source = jsonBuilder().startObject().startObject("exact_match");
XContentBuilder source = jsonBuilder().startObject().startObject("match");
{
source.field("indices", index);
if (randomBoolean()) {

View File

@ -5,7 +5,7 @@
enrich.put_policy:
name: policy-crud
body:
exact_match:
match:
indices: ["bar*"]
match_field: baz
enrich_fields: ["a", "b"]
@ -20,18 +20,18 @@
enrich.get_policy:
name: policy-crud
- length: { policies: 1 }
- match: { policies.0.exact_match.name: policy-crud }
- match: { policies.0.exact_match.indices: ["bar*"] }
- match: { policies.0.exact_match.match_field: baz }
- match: { policies.0.exact_match.enrich_fields: ["a", "b"] }
- match: { policies.0.match.name: policy-crud }
- match: { policies.0.match.indices: ["bar*"] }
- match: { policies.0.match.match_field: baz }
- match: { policies.0.match.enrich_fields: ["a", "b"] }
- do:
enrich.get_policy: {}
- length: { policies: 1 }
- match: { policies.0.exact_match.name: policy-crud }
- match: { policies.0.exact_match.indices: ["bar*"] }
- match: { policies.0.exact_match.match_field: baz }
- match: { policies.0.exact_match.enrich_fields: ["a", "b"] }
- match: { policies.0.match.name: policy-crud }
- match: { policies.0.match.indices: ["bar*"] }
- match: { policies.0.match.match_field: baz }
- match: { policies.0.match.enrich_fields: ["a", "b"] }
- do:
enrich.delete_policy:

View File

@ -192,9 +192,9 @@ public class EnrichPolicyRunner implements Runnable {
}
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.EXACT_MATCH_TYPE, which is a keyword type
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
String keyType;
if (EnrichPolicy.EXACT_MATCH_TYPE.equals(policy.getType())) {
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword";
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());

View File

@ -49,12 +49,16 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
int maxMatches = ConfigurationUtils.readIntProperty(TYPE, tag, config, "max_matches", 1);
if (maxMatches < 1 || maxMatches > 128) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 1024");
}
switch (policyType) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled);
case EnrichPolicy.MATCH_TYPE:
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled, maxMatches);
default:
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
}

View File

@ -18,12 +18,12 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
public final class ExactMatchProcessor extends AbstractEnrichProcessor {
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
public final class MatchProcessor extends AbstractEnrichProcessor {
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
@ -31,34 +31,39 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
private final String matchField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
private final int maxMatches;
ExactMatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled) {
MatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
int maxMatches) {
this(
tag,
createSearchRunner(client),
policyName,
field,
targetField,
matchField, ignoreMissing,
overrideEnabled
matchField,
ignoreMissing,
overrideEnabled,
maxMatches
);
}
ExactMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled) {
MatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
int maxMatches) {
super(tag, policyName);
this.searchRunner = searchRunner;
this.field = field;
@ -66,6 +71,7 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
this.matchField = matchField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.maxMatches = maxMatches;
}
@Override
@ -82,7 +88,7 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(1);
searchBuilder.size(maxMatches);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
@ -105,16 +111,15 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
if (searchHits.length < 1) {
handler.accept(ingestDocument, null);
return;
} else if (searchHits.length > 1) {
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]"));
return;
}
// If a document is returned, add its fields to the document
Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length";
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
ingestDocument.setFieldValue(targetField, enrichDocument);
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});
@ -153,6 +158,10 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
return overrideEnabled;
}
int getMaxMatches() {
return maxMatches;
}
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(

View File

@ -34,7 +34,7 @@ import java.util.Set;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME;
import static org.elasticsearch.xpack.enrich.ExactMatchProcessorTests.mapOf;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -48,10 +48,11 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
public void testIngestDataWithEnrichProcessor() {
int numDocs = 32;
List<String> keys = createSourceIndex(numDocs);
int maxMatches = randomIntBetween(2, 8);
List<String> keys = createSourceIndex(numDocs, maxMatches);
String policyName = "my-policy";
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
@ -59,7 +60,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}";
"\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"users\", \"max_matches\": " + maxMatches + "}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
@ -78,17 +79,21 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
assertThat(itemResponse.getId(), equalTo(Integer.toString(expectedId++)));
}
for (int i = 0; i < numDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
for (int doc = 0; doc < numDocs; doc++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(doc))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
assertThat(userEntry, notNullValue());
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
for (int j = 0; j < 3; j++) {
String field = DECORATE_FIELDS[j];
assertThat(userEntry.get(field), equalTo(keys.get(i) + j));
List<?> userEntries = (List<?>) source.get("users");
assertThat(userEntries, notNullValue());
assertThat(userEntries.size(), equalTo(maxMatches));
for (int i = 0; i < maxMatches; i++) {
Map<?, ?> userEntry = (Map<?, ?>) userEntries.get(i);
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
for (int j = 0; j < 3; j++) {
String field = DECORATE_FIELDS[j];
assertThat(userEntry.get(field), equalTo(keys.get(doc) + j));
}
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}
}
@ -102,7 +107,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet();
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList("source-" + i), "key", Collections.singletonList("value"));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
@ -130,24 +135,24 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("target"), equalTo(mapOf("key", "key", "value", "val" + i)));
assertThat(source.get("target"), equalTo(Arrays.asList(mapOf("key", "key", "value", "val" + i))));
}
}
private List<String> createSourceIndex(int numDocs) {
private List<String> createSourceIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
for (int id = 0; id < numKeys; id++) {
String key;
do {
key = randomAlphaOfLength(16);
} while (keys.add(key) == false);
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.create(true);
indexRequest.id(key);
indexRequest.source(mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0",
DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
client().index(indexRequest).actionGet();
for (int doc = 0; doc < numDocsPerKey; doc++) {
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.source(mapOf(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0",
DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
client().index(indexRequest).actionGet();
}
}
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
return new ArrayList<>(keys);

View File

@ -39,7 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.xpack.enrich.ExactMatchProcessorTests.mapOf;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -76,7 +76,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
@ -149,7 +149,8 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
for (int i = 0; i < numDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
List<?> entries = (List<?>) source.get("user");
Map<?, ?> userEntry = (Map<?, ?>) entries.get(0);
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
for (String field : DECORATE_FIELDS) {
@ -178,7 +179,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
}
private static void createAndExecutePolicy() {
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Arrays.asList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();

View File

@ -101,7 +101,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
public void testNonConcurrentPolicyExecution() throws InterruptedException {
String testPolicyName = "test_policy";
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield",
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool,
new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
@ -141,7 +141,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
public void testMaximumPolicyExecutionLimit() throws InterruptedException {
String testPolicyBaseName = "test_policy_";
Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build();
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool,
new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);

View File

@ -33,7 +33,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.EXACT_MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -114,7 +114,7 @@ public class EnrichPolicyMaintenanceServiceTests extends ESSingleNodeTestCase {
for (int i = 0; i < randomIntBetween(1, 3); i++) {
enrichKeys.add(randomAlphaOfLength(10));
}
return new EnrichPolicy(EXACT_MATCH_TYPE, null, Collections.singletonList(randomAlphaOfLength(10)), randomAlphaOfLength(10),
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(randomAlphaOfLength(10)), randomAlphaOfLength(10),
enrichKeys);
}

View File

@ -94,7 +94,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("field2");
enrichFields.add("field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"field1", enrichFields);
String policyName = "test1";
@ -194,7 +194,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
enrichFields.add("idx");
enrichFields.add("field2");
enrichFields.add("field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndexPattern),
"field1", enrichFields);
String policyName = "test1";
@ -257,7 +257,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("field2");
enrichFields.add("field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex), "field1",
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex), "field1",
enrichFields);
String policyName = "test1";
@ -287,7 +287,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("field2");
enrichFields.add("field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex), "field1",
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex), "field1",
enrichFields);
String policyName = "test1";
@ -336,7 +336,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
String policyName = "test1";
List<String> enrichFields = Collections.singletonList("field2");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"nesting.key", enrichFields);
final long createTime = randomNonNegativeLong();
@ -388,7 +388,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("nesting.field2");
enrichFields.add("missingField");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"key", enrichFields);
final long createTime = randomNonNegativeLong();
@ -471,7 +471,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("data.field2");
enrichFields.add("missingField");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"data.field1", enrichFields);
final long createTime = randomNonNegativeLong();
@ -596,7 +596,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("data.field2");
enrichFields.add("missingField");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"data.field1", enrichFields);
final long createTime = randomNonNegativeLong();
@ -728,7 +728,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("data.fields.field2");
enrichFields.add("missingField");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"data.fields.field1", enrichFields);
final long createTime = randomNonNegativeLong();
@ -852,7 +852,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
List<String> enrichFields = new ArrayList<>();
enrichFields.add("data.field2");
enrichFields.add("missingField");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList(sourceIndex),
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex),
"data.field1", enrichFields);
final long createTime = randomNonNegativeLong();

View File

@ -37,7 +37,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
IngestService ingestService = getInstanceFromNode(IngestService.class);
createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"),
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index"),
"key1", Collections.singletonList("field1"));
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
@ -48,9 +48,9 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON);
assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet());
Pipeline pipelineInstance1 = ingestService.getPipeline("1");
assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(ExactMatchProcessor.class));
assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(MatchProcessor.class));
EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("index"),
EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index"),
"key2", Collections.singletonList("field2"));
ResourceAlreadyExistsException exc = expectThrows(ResourceAlreadyExistsException.class, () ->
client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2)).actionGet());

View File

@ -31,7 +31,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testCreateProcessorInstance() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -50,13 +50,19 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
config.put("override", overrideEnabled);
}
Integer maxMatches = null;
if (randomBoolean()) {
maxMatches = randomIntBetween(1, 128);
config.put("max_matches", maxMatches);
}
int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
MatchProcessor result = (MatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getField(), equalTo("host"));
@ -68,6 +74,11 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
} else {
assertThat(result.isOverrideEnabled(), is(true));
}
if (maxMatches != null) {
assertThat(result.getMaxMatches(), equalTo(maxMatches));
} else {
assertThat(result.getMaxMatches(), equalTo(1));
}
}
public void testPolicyDoesNotExist() {
@ -154,7 +165,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testCompactEnrichValuesFormat() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -164,7 +175,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
config.put("field", "host");
config.put("target_field", "entry");
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
MatchProcessor result = (MatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getField(), equalTo("host"));
@ -173,7 +184,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
public void testNoTargetField() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList("source_index"), "host", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
@ -186,6 +197,23 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
}
public void testIllegalMaxMatches() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("source_index"), "my_key",
enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
factory.metaData = createMetaData("majestic", policy);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("field", "host");
config.put("target_field", "entry");
config.put("max_matches", randomBoolean() ? between(-2048, 0) : between(129, 2048));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("[max_matches] should be between 1 and 1024"));
}
static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException {
Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)

View File

@ -48,7 +48,7 @@ public class EnrichRestartIT extends ESIntegTestCase {
final int numPolicies = randomIntBetween(2, 4);
internalCluster().startNode();
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;

View File

@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
@ -42,11 +43,12 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ExactMatchProcessorTests extends ESTestCase {
public class MatchProcessorTests extends ESTestCase {
public void testBasics() throws Exception {
int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, maxMatches);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
// Run
@ -58,7 +60,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(request.indices().length, equalTo(1));
assertThat(request.indices()[0], equalTo(".enrich-_name"));
assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
assertThat(request.source().size(), equalTo(1));
assertThat(request.source().size(), equalTo(maxMatches));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().excludes(), emptyArray());
@ -69,7 +71,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
assertThat(termQueryBuilder.value(), equalTo("elastic.co"));
// Check result
Map<?, ?> entry = ingestDocument.getFieldValue("entry", Map.class);
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
Map<?, ?> entry = (Map<?, ?>) entries.get(0);
assertThat(entry.size(), equalTo(3));
assertThat(entry.get("globalRank"), equalTo(451));
assertThat(entry.get("tldRank"), equalTo(23));
@ -78,7 +81,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testNoMatch() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction();
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
int numProperties = ingestDocument.getSourceAndMetadata().size();
@ -108,7 +111,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testSearchFailure() throws Exception {
String indexName = ".enrich-_name";
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
// Run
@ -142,8 +145,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testIgnoreKeyMissing() throws Exception {
{
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true);
MatchProcessor processor =
new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true, 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
@ -153,8 +156,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
}
{
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true);
MatchProcessor processor =
new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true, 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
@ -170,7 +173,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testExistingFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false);
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1);
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf());
IngestDocument[] resultHolder = new IngestDocument[1];
@ -186,7 +189,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
public void testExistingNullFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false);
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1);
Map<String, Object> source = new HashMap<>();
source.put("domain", "elastic.co");