Change how `max_matches` affects `target_field` option. (#47982)

Prior to this change the `target_field` would always be a json array
field in the document being ingested. This to take into account that
multiple enrich documents could be inserted into the `target_field`.

However the default `max_matches` is `1`. Meaning that by default
only a single enrich document would be added to `target_field` json
array field.

This commit changes this; if `max_matches` is set to `1` then the single
document would be added as a json object to the `target_field` and
if it is configured to a higher value then the enrich documents will be
added as a json array (even if a single enrich document happens to be
enriched).
This commit is contained in:
Martijn van Groningen 2019-10-14 21:04:47 +02:00
parent 7cc73f6193
commit 7fc9198d46
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
9 changed files with 73 additions and 75 deletions

View File

@ -323,15 +323,13 @@ The API returns the following response:
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"geo_data": [
{
"location": {
"type": "envelope",
"coordinates": [[13.0, 53.0], [14.0, 52.0]]
},
"postal_code": "96598"
}
],
"geo_data": {
"location": {
"type": "envelope",
"coordinates": [[13.0, 53.0], [14.0, 52.0]]
},
"postal_code": "96598"
},
"first_name": "Mardy",
"last_name": "Brown",
"geo_location": "POINT (13.5 52.5)"

View File

@ -167,7 +167,8 @@ PUT /_ingest/pipeline/user_lookup
"enrich" : {
"policy_name": "users-policy",
"field" : "email",
"target_field": "user"
"target_field": "user",
"max_matches": "1"
}
}
]
@ -175,34 +176,38 @@ PUT /_ingest/pipeline/user_lookup
----
// TEST[continued]
Because the enrich policy type is `match`,
the enrich processor matches incoming documents
to documents in the enrich index
based on match field values.
The enrich processor then appends the enrich field data
from matching documents in the enrich index
to the target field of incoming documents.
Because the `max_matches` option for the enrich processor is `1`,
the enrich processor appends the data from only the best matching document
to each incoming document's target field as an object.
If the `max_matches` option were greater than `1`,
the processor could append data from up to the `max_matches` number of documents
to the target field as an array.
If the incoming document matches no documents in the enrich index,
the processor appends no data.
You also can add other <<ingest-processors,processors>>
to your ingest pipeline.
You can use these processors to change or drop incoming documents
based on your criteria.
See <<ingest-processors>> for a list of built-in processors.
[float]
[[ingest-enrich-docs]]
==== Ingest and enrich documents
Index incoming documents using your ingest pipeline.
Because the enrich policy type is `match`,
the enrich processor matches incoming documents
to documents in the enrich index
based on match field values.
The processor then appends the enrich field data
from any matching document in the enrich index
to target field of the incoming document.
The enrich processor appends all data to the target field as an array.
If the incoming document matches more than one document in the enrich index,
the processor appends data from those documents to the array.
If the incoming document matches no documents in the enrich index,
the processor appends no data.
The following <<docs-index_,index API>> request uses the ingest pipeline
to index a document
containing the `email` field
@ -240,16 +245,14 @@ The API returns the following response:
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"user": [
{
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"city": "New Orleans",
"state": "LA"
}
],
"user": {
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"city": "New Orleans",
"state": "LA"
},
"email": "mardy.brown@asciidocsmith.com"
}
}

View File

@ -16,18 +16,9 @@ check out the <<ingest-enriching-data,tutorial>> to get familiar with enrich pol
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data.
| `target_field` | yes | - | The field that will be used for the enrichment data.
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `max_matches` | no | 1 | The maximum number of matched documents to include under the configured target field. In order to avoid documents getting too large, the maximum allowed value is 128.
| `shape_relation` | no | `INTERSECTS` a| Spatial relation operator
used to match the <<geo-shape,geo_shape>> of incoming documents
to documents in the enrich index.
+
This option is only used for `geo_match` enrich policy types.
+
The <<spatial-strategy, geo_shape strategy>> mapping parameter determines
which spatial relation operators are availlble.
See <<_spatial_relations>>
for operators and more information.
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `max_matches` | no | 1 | The maximum number of matched documents to include under the configured target field. The `target_field` will be turned into a json array if `max_matches` is higher than 1, otherwise `target_field` will become a json object. In order to avoid documents getting too large, the maximum allowed value is 128.
| `shape_relation` | no | `INTERSECTS` | A spatial relation operator used to match the <<geo-shape,geo_shape>> of incoming documents to documents in the enrich index. This option is only used for `geo_match` enrich policy types. The <<spatial-strategy, geo_shape strategy>> mapping parameter determines which spatial relation operators are available. See <<_spatial_relations>> for operators and more information.
include::common-options.asciidoc[]
|======

View File

@ -80,13 +80,12 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
List<?> entries = (List<?>) ((Map<?, ?>) response.get("_source")).get("entry");
Map<?, ?> _source = (Map<?, ?>) entries.get(0);
assertThat(_source.size(), equalTo(4));
assertThat(_source.get("host"), equalTo("elastic.co"));
assertThat(_source.get("tld"), equalTo("co"));
assertThat(_source.get("globalRank"), equalTo(25));
assertThat(_source.get("tldRank"), equalTo(7));
Map<?, ?> entry = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
assertThat(entry.size(), equalTo(4));
assertThat(entry.get("host"), equalTo("elastic.co"));
assertThat(entry.get("tld"), equalTo("co"));
assertThat(entry.get("globalRank"), equalTo(25));
assertThat(entry.get("tldRank"), equalTo(7));
if (deletePipeilne) {
// delete the pipeline so the policies can be deleted

View File

@ -96,12 +96,17 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
}
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
if (maxMatches == 1) {
Map<String, Object> firstDocument = searchHits[0].getSourceAsMap();
ingestDocument.setFieldValue(targetField, firstDocument);
} else {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});

View File

@ -157,9 +157,11 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
GetResponse getResponse = client().get(new GetRequest("my-index", "_id")).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
List<?> entries = (List<?>) source.get("enriched");
Map<?, ?> entries = (Map) source.get("enriched");
assertThat(entries, notNullValue());
assertThat(entries.size(), equalTo(1));
assertThat(entries.size(), equalTo(2));
assertThat(entries.containsKey(matchField), is(true));
assertThat(entries.get(enrichField), equalTo("94040"));
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
@ -208,7 +210,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("target"), equalTo(Arrays.asList(mapOf("key", "key", "value", "val" + i))));
assertThat(source.get("target"), equalTo(mapOf("key", "key", "value", "val" + i)));
}
}

View File

@ -154,8 +154,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
for (int i = 0; i < numDocs; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
List<?> entries = (List<?>) source.get("user");
Map<?, ?> userEntry = (Map<?, ?>) entries.get(0);
Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
for (String field : DECORATE_FIELDS) {

View File

@ -95,8 +95,13 @@ public class GeoMatchProcessorTests extends ESTestCase {
assertThat(shapeQueryBuilder.shape(), equalTo(expectedGeometry));
// Check result
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
Map<?, ?> entry = (Map<?, ?>) entries.get(0);
Map<?, ?> entry;
if (maxMatches == 1) {
entry = ingestDocument.getFieldValue("entry", Map.class);
} else {
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
entry = (Map<?, ?>) entries.get(0);
}
assertThat(entry.size(), equalTo(2));
assertThat(entry.get("zipcode"), equalTo(94040));

View File

@ -209,10 +209,9 @@ public class MatchProcessorTests extends ESTestCase {
}
public void testNumericValue() {
int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch = mockedSearchFunction(mapOf(2, mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor =
new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", maxMatches);
new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", 1);
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf("domain", 2));
@ -230,8 +229,7 @@ public class MatchProcessorTests extends ESTestCase {
assertThat(termQueryBuilder.value(), equalTo(2));
// Check result
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
Map<?, ?> entry = (Map<?, ?>) entries.get(0);
Map<?, ?> entry = ingestDocument.getFieldValue("entry", Map.class);
assertThat(entry.size(), equalTo(3));
assertThat(entry.get("globalRank"), equalTo(451));
assertThat(entry.get("tldRank"), equalTo(23));
@ -239,11 +237,10 @@ public class MatchProcessorTests extends ESTestCase {
}
public void testArray() {
int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch =
mockedSearchFunction(mapOf(Arrays.asList("1", "2"), mapOf("globalRank", 451, "tldRank", 23, "tld", "co")));
MatchProcessor processor =
new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", maxMatches);
new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, true, "domain", 1);
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf("domain", Arrays.asList("1", "2")));
@ -263,8 +260,7 @@ public class MatchProcessorTests extends ESTestCase {
assertThat(termQueryBuilder.values().get(1), equalTo("2"));
// Check result
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
Map<?, ?> entry = (Map<?, ?>) entries.get(0);
Map<?, ?> entry = ingestDocument.getFieldValue("entry", Map.class);
assertThat(entry.size(), equalTo(3));
assertThat(entry.get("globalRank"), equalTo(451));
assertThat(entry.get("tldRank"), equalTo(23));