Geo-Match Enrich Processor (#47243) (#47701)

this commit introduces a geo-match enrich processor that looks up a specific
`geo_point` field in the enrich-index for all entries that have a geo_shape match field
that meets some specific relation criteria with the input field.

For example, the enrich index may contain documents with zipcodes and their respective
geo_shape. Ingesting documents with a geo_point field can be enriched with which zipcode
they associate according to which shape they are contained within.

this commit also refactors some of the MatchProcessor by moving a lot of the shared code to
AbstractEnrichProcessor.

Closes #42639.
This commit is contained in:
Tal Levy 2019-10-07 15:03:46 -07:00 committed by GitHub
parent f2f2304c75
commit a17f394e27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 584 additions and 163 deletions

View File

@ -60,6 +60,7 @@ public class GeoShapeFieldMapperTests extends ESSingleNodeTestCase {
GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper; GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper;
assertThat(geoShapeFieldMapper.fieldType().orientation(), assertThat(geoShapeFieldMapper.fieldType().orientation(),
equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value())); equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value()));
assertThat(geoShapeFieldMapper.fieldType.hasDocValues(), equalTo(false));
} }
/** /**

View File

@ -35,7 +35,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-"; public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
public static final String MATCH_TYPE = "match"; public static final String MATCH_TYPE = "match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE}; public static final String GEO_MATCH_TYPE = "geo_match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
MATCH_TYPE,
GEO_MATCH_TYPE
};
private static final ParseField QUERY = new ParseField("query"); private static final ParseField QUERY = new ParseField("query");
private static final ParseField INDICES = new ParseField("indices"); private static final ParseField INDICES = new ParseField("indices");

View File

@ -5,18 +5,158 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
public abstract class AbstractEnrichProcessor extends AbstractProcessor { public abstract class AbstractEnrichProcessor extends AbstractProcessor {
private final String policyName; private final String policyName;
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
private final String targetField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
protected final String matchField;
protected final int maxMatches;
protected AbstractEnrichProcessor(String tag, String policyName) { protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}
protected AbstractEnrichProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
String matchField, int maxMatches) {
super(tag); super(tag);
this.policyName = policyName; this.policyName = policyName;
this.searchRunner = searchRunner;
this.field = field;
this.targetField = targetField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.matchField = matchField;
this.maxMatches = maxMatches;
}
public abstract QueryBuilder getQueryBuilder(Object fieldValue);
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
return;
}
QueryBuilder queryBuilder = getQueryBuilder(value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(maxMatches);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);
searchRunner.accept(req, (searchResponse, e) -> {
if (e != null) {
handler.accept(null, e);
return;
}
// If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length < 1) {
handler.accept(ingestDocument, null);
return;
}
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});
} catch (Exception e) {
handler.accept(null, e);
}
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
} }
public String getPolicyName() { public String getPolicyName() {
return policyName; return policyName;
} }
@Override
public String getType() {
return EnrichProcessorFactory.TYPE;
}
String getField() {
return field;
}
public String getTargetField() {
return targetField;
}
boolean isIgnoreMissing() {
return ignoreMissing;
}
boolean isOverrideEnabled() {
return overrideEnabled;
}
public String getMatchField() {
return matchField;
}
int getMaxMatches() {
return maxMatches;
}
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
resp -> {
handler.accept(resp, null);
},
e -> {
handler.accept(null, e);
}));
};
}
} }

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -196,10 +197,13 @@ public class EnrichPolicyRunner implements Runnable {
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
String keyType; final String keyType;
final CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping;
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword"; matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
// No need to also configure index_options, because keyword type defaults to 'docs'. // No need to also configure index_options, because keyword type defaults to 'docs'.
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
} else { } else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
} }
@ -207,18 +211,15 @@ public class EnrichPolicyRunner implements Runnable {
// Enable _source on enrich index. Explicitly mark key mapping type. // Enable _source on enrich index. Explicitly mark key mapping type.
try { try {
XContentBuilder builder = JsonXContent.contentBuilder(); XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject() builder = builder.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME) .startObject(MapperService.SINGLE_MAPPING_NAME)
.field("dynamic", false) .field("dynamic", false)
.startObject("_source") .startObject("_source")
.field("enabled", true) .field("enabled", true)
.endObject() .endObject()
.startObject("properties") .startObject("properties")
.startObject(policy.getMatchField()) .startObject(policy.getMatchField());
.field("type", keyType) builder = matchFieldMapping.apply(builder).endObject().endObject()
.field("doc_values", false)
.endObject()
.endObject()
.startObject("_meta") .startObject("_meta")
.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT) .field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT)
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName) .field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.Processor;
@ -57,8 +58,13 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
switch (policyType) { switch (policyType) {
case EnrichPolicy.MATCH_TYPE: case EnrichPolicy.MATCH_TYPE:
return new MatchProcessor(tag, client, policyName, field, targetField, matchField, return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
ignoreMissing, overrideEnabled, maxMatches); maxMatches);
case EnrichPolicy.GEO_MATCH_TYPE:
String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects");
ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr);
return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches, shapeRelation);
default: default:
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]"); throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
} }

View File

@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.GeoUtils;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.MultiPoint;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
public final class GeoMatchProcessor extends AbstractEnrichProcessor {
private ShapeRelation shapeRelation;
GeoMatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches,
ShapeRelation shapeRelation) {
super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}
/** used in tests **/
GeoMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches, ShapeRelation shapeRelation) {
super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}
@SuppressWarnings("unchecked")
@Override
public QueryBuilder getQueryBuilder(Object fieldValue) {
List<Point> points = new ArrayList<>();
if (fieldValue instanceof List) {
List<Object> values = (List<Object>) fieldValue;
if (values.size() == 2 && values.get(0) instanceof Number) {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(values, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
} else {
for (Object value : values) {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(value, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
}
}
} else {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(fieldValue, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
}
final Geometry queryGeometry;
if (points.isEmpty()) {
throw new IllegalArgumentException("no geopoints found");
} else if (points.size() == 1) {
queryGeometry = points.get(0);
} else {
queryGeometry = new MultiPoint(points);
}
GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, queryGeometry);
shapeQuery.relation(shapeRelation);
return shapeQuery;
}
public ShapeRelation getShapeRelation() {
return shapeRelation;
}
}

View File

@ -5,172 +5,43 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
public final class MatchProcessor extends AbstractEnrichProcessor { public class MatchProcessor extends AbstractEnrichProcessor {
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
private final String targetField;
private final String matchField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
private final int maxMatches;
MatchProcessor(String tag, MatchProcessor(String tag,
Client client, Client client,
String policyName, String policyName,
String field, String field,
String targetField, String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled, boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches) { int maxMatches) {
this( super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
tag,
createSearchRunner(client),
policyName,
field,
targetField,
matchField,
ignoreMissing,
overrideEnabled,
maxMatches
);
} }
/** used in tests **/
MatchProcessor(String tag, MatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner, BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String policyName,
String field, String field,
String targetField, String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled, boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches) { int maxMatches) {
super(tag, policyName); super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.searchRunner = searchRunner;
this.field = field;
this.targetField = targetField;
this.matchField = matchField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.maxMatches = maxMatches;
} }
@Override @Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) { public QueryBuilder getQueryBuilder(Object fieldValue) {
try { return new TermQueryBuilder(matchField, fieldValue);
// If a document does not have the enrich key, return the unchanged document
final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
return;
}
TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(maxMatches);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);
searchRunner.accept(req, (searchResponse, e) -> {
if (e != null) {
handler.accept(null, e);
return;
}
// If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length < 1) {
handler.accept(ingestDocument, null);
return;
}
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});
} catch (Exception e) {
handler.accept(null, e);
}
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}
@Override
public String getType() {
return EnrichProcessorFactory.TYPE;
}
String getField() {
return field;
}
public String getTargetField() {
return targetField;
}
public String getMatchField() {
return matchField;
}
boolean isIgnoreMissing() {
return ignoreMissing;
}
boolean isOverrideEnabled() {
return overrideEnabled;
}
int getMaxMatches() {
return maxMatches;
}
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
resp -> {
handler.accept(resp, null);
},
e -> {
handler.accept(null, e);
}));
};
} }
} }

View File

@ -55,10 +55,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
return true; return true;
} }
public void testIngestDataWithEnrichProcessor() { public void testIngestDataWithMatchProcessor() {
int numDocs = 32; int numDocs = 32;
int maxMatches = randomIntBetween(2, 8); int maxMatches = randomIntBetween(2, 8);
List<String> keys = createSourceIndex(numDocs, maxMatches); List<String> keys = createSourceMatchIndex(numDocs, maxMatches);
String policyName = "my-policy"; String policyName = "my-policy";
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
@ -114,6 +114,62 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs)); assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs));
} }
public void testIngestDataWithGeoMatchProcessor() {
String matchField = "location";
String enrichField = "zipcode";
// create enrich index
{
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
indexRequest.source(mapOf(matchField, "POLYGON((" +
"-122.08592534065245 37.38501746624134," +
"-122.08193421363829 37.38501746624134," +
"-122.08193421363829 37.3879329075567," +
"-122.08592534065245 37.3879329075567," +
"-122.08592534065245 37.38501746624134))",
"zipcode", "94040"));
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
}
String policyName = "my-policy";
EnrichPolicy enrichPolicy =
new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, Arrays.asList(SOURCE_INDEX_NAME), matchField, Arrays.asList(enrichField));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"field\": \"" + matchField + "\", \"target_field\": \"enriched\", \"max_matches\": 1 }}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
BulkRequest bulkRequest = new BulkRequest("my-index");
IndexRequest indexRequest = new IndexRequest();
indexRequest.id("_id");
indexRequest.setPipeline(pipelineName);
indexRequest.source(mapOf(matchField, "37.386444, -122.083863")); // point within match boundary
bulkRequest.add(indexRequest);
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
assertThat(bulkResponse.getItems().length, equalTo(1));
assertThat(bulkResponse.getItems()[0].getId(), equalTo("_id"));
GetResponse getResponse = client().get(new GetRequest("my-index", "_id")).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
List<?> entries = (List<?>) source.get("enriched");
assertThat(entries, notNullValue());
assertThat(entries.size(), equalTo(1));
EnrichStatsAction.Response statsResponse =
client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId));
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo(1L));
}
public void testMultiplePolicies() { public void testMultiplePolicies() {
int numPolicies = 8; int numPolicies = 8;
for (int i = 0; i < numPolicies; i++) { for (int i = 0; i < numPolicies; i++) {
@ -156,7 +212,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
} }
} }
private List<String> createSourceIndex(int numKeys, int numDocsPerKey) { private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>(); Set<String> keys = new HashSet<>();
for (int id = 0; id < numKeys; id++) { for (int id = 0; id < numKeys; id++) {
String key; String key;
@ -174,5 +230,4 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
return new ArrayList<>(keys); return new ArrayList<>(keys);
} }
} }

View File

@ -156,6 +156,90 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
ensureEnrichIndexIsReadOnly(createdEnrichIndex); ensureEnrichIndexIsReadOnly(createdEnrichIndex);
} }
public void testRunnerGeoMatchType() throws Exception {
final String sourceIndex = "source-index";
IndexResponse indexRequest = client().index(new IndexRequest()
.index(sourceIndex)
.id("id")
.source(
"{" +
"\"location\":" +
"\"POINT(10.0 10.0)\"," +
"\"zipcode\":90210" +
"}",
XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
).actionGet();
assertEquals(RestStatus.CREATED, indexRequest.status());
SearchResponse sourceSearchResponse = client().search(
new SearchRequest(sourceIndex)
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchAllQuery()))).actionGet();
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
assertNotNull(sourceDocMap);
assertThat(sourceDocMap.get("location"), is(equalTo("POINT(10.0 10.0)")));
assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210)));
List<String> enrichFields = Arrays.asList("zipcode");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, Arrays.asList(sourceIndex), "location", enrichFields);
String policyName = "test1";
final long createTime = randomNonNegativeLong();
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime);
logger.info("Starting policy run");
enrichPolicyRunner.run();
latch.await();
if (exception.get() != null) {
throw exception.get();
}
// Validate Index definition
String createdEnrichIndex = ".enrich-test1-" + createTime;
GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet();
assertThat(enrichIndex.getIndices().length, equalTo(1));
assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex));
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
assertNotNull(settings);
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
// Validate Mapping
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
validateMappingMetadata(mapping, policyName, policy);
assertThat(mapping.get("dynamic"), is("false"));
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
assertNotNull(properties);
assertThat(properties.size(), is(equalTo(1)));
Map<?, ?> field1 = (Map<?, ?>) properties.get("location");
assertNotNull(field1);
assertThat(field1.get("type"), is(equalTo("geo_shape")));
assertNull(field1.get("doc_values"));
// Validate document structure
SearchResponse enrichSearchResponse = client().search(
new SearchRequest(".enrich-test1")
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchAllQuery()))).actionGet();
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
assertNotNull(enrichDocument);
assertThat(enrichDocument.size(), is(equalTo(2)));
assertThat(enrichDocument.get("location"), is(equalTo("POINT(10.0 10.0)")));
assertThat(enrichDocument.get("zipcode"), is(equalTo(90210)));
// Validate segments
validateSegments(createdEnrichIndex, 1);
// Validate Index is read only
ensureEnrichIndexIsReadOnly(createdEnrichIndex);
}
public void testRunnerMultiSource() throws Exception { public void testRunnerMultiSource() throws Exception {
String baseSourceName = "source-index-"; String baseSourceName = "source-index-";
int numberOfSourceIndices = 3; int numberOfSourceIndices = 3;

View File

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.MultiPoint;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class GeoMatchProcessorTests extends ESTestCase {
public void testBasics() {
Point expectedPoint = new Point(-122.084110, 37.386637);
testBasicsForFieldValue(mapOf("lat", 37.386637, "lon", -122.084110), expectedPoint);
testBasicsForFieldValue("37.386637, -122.084110", expectedPoint);
testBasicsForFieldValue("POINT (-122.084110 37.386637)", expectedPoint);
testBasicsForFieldValue(Arrays.asList(-122.084110, 37.386637), expectedPoint);
testBasicsForFieldValue(Arrays.asList(Arrays.asList(-122.084110, 37.386637),
"37.386637, -122.084110", "POINT (-122.084110 37.386637)"),
new MultiPoint(Arrays.asList(expectedPoint, expectedPoint, expectedPoint)));
testBasicsForFieldValue("not a point", null);
}
private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometry) {
int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("key", mapOf("shape", "object", "zipcode",94040)));
GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry",
false, false, "shape", maxMatches, ShapeRelation.INTERSECTS);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
mapOf("location", fieldValue));
// Run
IngestDocument[] holder = new IngestDocument[1];
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
if (expectedGeometry == null) {
assertThat(holder[0], nullValue());
return;
} else {
assertThat(holder[0], notNullValue());
}
// Check request
SearchRequest request = mockSearch.getCapturedRequest();
assertThat(request.indices().length, equalTo(1));
assertThat(request.indices()[0], equalTo(".enrich-_name"));
assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
assertThat(request.source().size(), equalTo(maxMatches));
assertThat(request.source().trackScores(), equalTo(false));
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
assertThat(request.source().fetchSource().excludes(), emptyArray());
assertThat(request.source().fetchSource().includes(), emptyArray());
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(GeoShapeQueryBuilder.class));
GeoShapeQueryBuilder shapeQueryBuilder = (GeoShapeQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
assertThat(shapeQueryBuilder.fieldName(), equalTo("shape"));
assertThat(shapeQueryBuilder.shape(), equalTo(expectedGeometry));
// Check result
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
Map<?, ?> entry = (Map<?, ?>) entries.get(0);
assertThat(entry.size(), equalTo(2));
assertThat(entry.get("zipcode"), equalTo(94040));
}
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private final SearchResponse mockResponse;
private final SetOnce<SearchRequest> capturedRequest;
private final Exception exception;
MockSearchFunction(SearchResponse mockResponse) {
this.mockResponse = mockResponse;
this.exception = null;
this.capturedRequest = new SetOnce<>();
}
MockSearchFunction(Exception exception) {
this.mockResponse = null;
this.exception = exception;
this.capturedRequest = new SetOnce<>();
}
@Override
public void accept(SearchRequest request, BiConsumer<SearchResponse, Exception> handler) {
capturedRequest.set(request);
if (exception != null) {
handler.accept(null, exception);
} else {
handler.accept(mockResponse, null);
}
}
SearchRequest getCapturedRequest() {
return capturedRequest.get();
}
}
public MockSearchFunction mockedSearchFunction() {
return new MockSearchFunction(mockResponse(Collections.emptyMap()));
}
public MockSearchFunction mockedSearchFunction(Exception exception) {
return new MockSearchFunction(exception);
}
public MockSearchFunction mockedSearchFunction(Map<String, Map<String, ?>> documents) {
return new MockSearchFunction(mockResponse(documents));
}
public SearchResponse mockResponse(Map<String, Map<String, ?>> documents) {
SearchHit[] searchHits = documents.entrySet().stream().map(e -> {
SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap());
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) {
builder.map(e.getValue());
builder.flush();
ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream();
searchHit.sourceRef(new BytesArray(outputStream.toByteArray()));
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
return searchHit;
}).toArray(SearchHit[]::new);
return new SearchResponse(new SearchResponseSections(
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
}
}

View File

@ -48,7 +48,7 @@ public class MatchProcessorTests extends ESTestCase {
public void testBasics() throws Exception { public void testBasics() throws Exception {
int maxMatches = randomIntBetween(1, 8); int maxMatches = randomIntBetween(1, 8);
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co"))); MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, maxMatches); MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", maxMatches);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co")); Collections.singletonMap("domain", "elastic.co"));
// Run // Run
@ -81,7 +81,7 @@ public class MatchProcessorTests extends ESTestCase {
public void testNoMatch() throws Exception { public void testNoMatch() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(); MockSearchFunction mockSearch = mockedSearchFunction();
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1); MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com")); Collections.singletonMap("domain", "elastic.com"));
int numProperties = ingestDocument.getSourceAndMetadata().size(); int numProperties = ingestDocument.getSourceAndMetadata().size();
@ -111,7 +111,7 @@ public class MatchProcessorTests extends ESTestCase {
public void testSearchFailure() throws Exception { public void testSearchFailure() throws Exception {
String indexName = ".enrich-_name"; String indexName = ".enrich-_name";
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName)); MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1); MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com")); Collections.singletonMap("domain", "elastic.com"));
// Run // Run
@ -146,7 +146,7 @@ public class MatchProcessorTests extends ESTestCase {
public void testIgnoreKeyMissing() throws Exception { public void testIgnoreKeyMissing() throws Exception {
{ {
MatchProcessor processor = MatchProcessor processor =
new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true, 1); new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, true, "domain", 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf()); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
@ -157,7 +157,7 @@ public class MatchProcessorTests extends ESTestCase {
} }
{ {
MatchProcessor processor = MatchProcessor processor =
new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true, 1); new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf()); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, mapOf());
IngestDocument[] resultHolder = new IngestDocument[1]; IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1]; Exception[] exceptionHolder = new Exception[1];
@ -173,7 +173,7 @@ public class MatchProcessorTests extends ESTestCase {
public void testExistingFieldWithOverrideDisabled() throws Exception { public void testExistingFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co"))); MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1); MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1);
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf()); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(mapOf("domain", "elastic.co", "tld", "tld")), mapOf());
IngestDocument[] resultHolder = new IngestDocument[1]; IngestDocument[] resultHolder = new IngestDocument[1];
@ -189,7 +189,7 @@ public class MatchProcessorTests extends ESTestCase {
public void testExistingNullFieldWithOverrideDisabled() throws Exception { public void testExistingNullFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co"))); MockSearchFunction mockSearch = mockedSearchFunction(mapOf("elastic.co", mapOf("globalRank", 451, "tldRank",23, "tld", "co")));
MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1); MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1);
Map<String, Object> source = new HashMap<>(); Map<String, Object> source = new HashMap<>();
source.put("domain", "elastic.co"); source.put("domain", "elastic.co");