Add enrich processor (#41532)

The enrich processor performs a lookup in a locally allocated
enrich index shard using a field value from the document being enriched.
If there is a match then the _source of the enrich document is fetched.
The document being enriched then gets the decorate values from the
enrich document based on the configured decorate fields in the pipeline.

Note that the usage of the _source field is temporary until the enrich
source field that is part of #41521 is merged into the enrich branch.
Using the _source field involves significant decompression which not
desired for enrich use cases.

The policy contains the information what field in the enrich index
to query and what fields are available to decorate a document being
enriched with.

The enrich processor has the following configuration options:
* `policy_name` - the name of the policy this processor should use
* `enrich_key` - the field in the document being enriched that holds to lookup value
* `ignore_missing` - Whether to allow the key field to be missing
* `enrich_values` - a list of fields to decorate the document being enriched with.
                    Each entry holds a source field and a target field.
                    The source field indicates what decorate field to use that is available in the policy.
                    The target field controls the field name to use in the document being enriched.
                    The source and target fields can be the same.

Example pipeline config:

```
{
   "processors": [
      {
         "policy_name": "my_policy",
         "enrich_key": "host_name",
         "enrich_values": [
            {
              "source": "globalRank",
              "target": "global_rank"
            }
         ]
      }
   ]
}
```

In the above example documents are being enriched with a global rank value.
For each document that has match in the enrich index based on its host_name field,
the document gets an global rank field value, which is fetched from the `globalRank`
field in the enrich index and saved as `global_rank` in the document being enriched.

This is PR is part one of #41521
This commit is contained in:
Martijn van Groningen 2019-04-30 19:42:19 +02:00
parent 57adee0c63
commit 8838bcc776
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
7 changed files with 778 additions and 2 deletions

View File

@ -28,7 +28,7 @@ import java.util.Objects;
*/
public final class EnrichPolicy implements Writeable, ToXContentFragment {
static final String EXACT_MATCH_TYPE = "exact_match";
public static final String EXACT_MATCH_TYPE = "exact_match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};
static final ParseField TYPE = new ParseField("type");
@ -125,6 +125,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
return schedule;
}
public String getAliasName(String policyName) {
// #41553 (list policy api) will add name to policy, so that we don't have to provide the name via a parameter.
return ".enrich-" + policyName;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);

View File

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class EnrichIT extends ESRestTestCase {
// TODO: update this test when policy runner is ready
public void testBasicFlow() throws Exception {
// Create the policy:
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"index_pattern\": \"my-index*\", \"enrich_key\": \"host\", " +
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}");
assertOK(client().performRequest(putPolicyRequest));
// Add a single enrich document for now and then refresh:
Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co");
XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent());
document.startObject();
document.field("host", "elastic.co");
document.field("globalRank", 25);
document.field("tldRank", 7);
document.field("tld", "co");
document.endObject();
document.close();
ByteArrayOutputStream out = (ByteArrayOutputStream) document.getOutputStream();
indexRequest.setEntity(new ByteArrayEntity(out.toByteArray(), ContentType.create("application/smile")));
assertOK(client().performRequest(indexRequest));
Request refreshRequest = new Request("POST", "/.enrich-my_policy/_refresh");
assertOK(client().performRequest(refreshRequest));
// Create pipeline
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
putPipelineRequest.setJsonEntity("{\"processors\":[" +
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"enrich_values\":[" +
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
"]}}" +
"]}");
assertOK(client().performRequest(putPipelineRequest));
// Index document using pipeline with enrich processor:
indexRequest = new Request("PUT", "/my-index/_doc/1");
indexRequest.addParameter("pipeline", "my_pipeline");
indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}");
assertOK(client().performRequest(indexRequest));
// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> _source = (Map<?, ?>) response.get("_source");
assertThat(_source.size(), equalTo(3));
assertThat(_source.get("host"), equalTo("elastic.co"));
assertThat(_source.get("global_rank"), equalTo(25));
assertThat(_source.get("tld_rank"), equalTo(7));
}
private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
}

View File

@ -7,8 +7,10 @@ package org.elasticsearch.xpack.enrich;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -31,6 +33,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
@ -48,7 +51,15 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
final ClusterService clusterService = parameters.ingestService.getClusterService();
// Pipelines are created from cluster state update thead and calling ClusterService#state() from that thead is illegal
// (because the current cluster state update is in progress)
// So with the below atomic reference we keep track of the latest updated cluster state:
AtomicReference<ClusterState> reference = new AtomicReference<>();
clusterService.addStateApplier(event -> reference.set(event.state()));
return Collections.singletonMap(EnrichProcessorFactory.TYPE,
new EnrichProcessorFactory(reference::get, parameters.localShardSearcher));
}
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
final class EnrichProcessorFactory implements Processor.Factory {
static final String TYPE = "enrich";
private final Function<String, EnrichPolicy> policyLookup;
private final Function<String, Engine.Searcher> searchProvider;
EnrichProcessorFactory(Supplier<ClusterState> clusterStateSupplier,
Function<String, Engine.Searcher> searchProvider) {
this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get());
this.searchProvider = searchProvider;
}
@Override
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, Map<String, Object> config) throws Exception {
String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name");
EnrichPolicy policy = policyLookup.apply(policyName);
if (policy == null) {
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
}
String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
final List<EnrichSpecification> specifications;
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
specifications = specificationConfig.stream()
// TODO: Add templating support in enrich_values source and target options
.map(entry -> new EnrichSpecification((String) entry.get("source"), (String) entry.get("target")))
.collect(Collectors.toList());
for (EnrichSpecification specification : specifications) {
if (policy.getEnrichValues().contains(specification.sourceField) == false) {
throw new IllegalArgumentException("source field [" + specification.sourceField + "] does not exist in policy [" +
policyName + "]");
}
}
switch (policy.getType()) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications);
default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
}
}
static final class EnrichSpecification {
final String sourceField;
final String targetField;
EnrichSpecification(String sourceField, String targetField) {
this.sourceField = sourceField;
this.targetField = targetField;
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
final class ExactMatchProcessor extends AbstractProcessor {
private final Function<String, EnrichPolicy> policyLookup;
private final Function<String, Engine.Searcher> searchProvider;
private final String policyName;
private final String enrichKey;
private final boolean ignoreMissing;
private final List<EnrichSpecification> specifications;
ExactMatchProcessor(String tag,
Function<String, EnrichPolicy> policyLookup,
Function<String, Engine.Searcher> searchProvider,
String policyName,
String enrichKey,
boolean ignoreMissing,
List<EnrichSpecification> specifications) {
super(tag);
this.policyLookup = policyLookup;
this.searchProvider = searchProvider;
this.policyName = policyName;
this.enrichKey = enrichKey;
this.ignoreMissing = ignoreMissing;
this.specifications = specifications;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
final EnrichPolicy policy = policyLookup.apply(policyName);
if (policy == null) {
throw new IllegalArgumentException("policy [" + policyName + "] does not exists");
}
final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing);
if (value == null) {
return ingestDocument;
}
// TODO: re-use the engine searcher between enriching documents from the same write request
try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getAliasName(policyName))) {
if (engineSearcher.getDirectoryReader().leaves().size() == 0) {
return ingestDocument;
} else if (engineSearcher.getDirectoryReader().leaves().size() != 1) {
throw new IllegalStateException("enrich index must have exactly a single segment");
}
final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader();
final Terms terms = leafReader.terms(policy.getEnrichKey());
if (terms == null) {
throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist");
}
final TermsEnum tenum = terms.iterator();
if (tenum.seekExact(new BytesRef(value))) {
PostingsEnum penum = tenum.postings(null, PostingsEnum.NONE);
final int docId = penum.nextDoc();
assert docId != PostingsEnum.NO_MORE_DOCS : "no matching doc id for [" + enrichKey + "]";
assert penum.nextDoc() == PostingsEnum.NO_MORE_DOCS : "more than one doc id matching for [" + enrichKey + "]";
// TODO: The use of _source is temporarily until enrich source field mapper has been added (see PR #41521)
Document document = leafReader.document(docId, Collections.singleton(SourceFieldMapper.NAME));
BytesRef source = document.getBinaryValue(SourceFieldMapper.NAME);
assert source != null;
final BytesReference encoded = new BytesArray(source);
final Map<String, Object> decoded =
XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2();
for (EnrichSpecification specification : specifications) {
Object enrichValue = decoded.get(specification.sourceField);
// TODO: add support over overwrite option (like in SetProcessor)
ingestDocument.setFieldValue(specification.targetField, enrichValue);
}
}
}
return ingestDocument;
}
@Override
public String getType() {
return EnrichProcessorFactory.TYPE;
}
String getPolicyName() {
return policyName;
}
String getEnrichKey() {
return enrichKey;
}
boolean isIgnoreMissing() {
return ignoreMissing;
}
List<EnrichSpecification> getSpecifications() {
return specifications;
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class EnrichProcessorFactoryTests extends ESTestCase {
public void testCreateProcessorInstance() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "source_index", "my_key", enrichValues, "schedule");
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
boolean keyIgnoreMissing = randomBoolean();
if (keyIgnoreMissing || randomBoolean()) {
config.put("ignore_missing", keyIgnoreMissing);
}
int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
List<Map<String, Object>> valuesConfig = new ArrayList<>(numRandomValues);
for (Tuple<String, String> tuple : randomValues) {
Map<String, Object> entry = new HashMap<>();
entry.put("source", tuple.v1());
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
assertThat(result, notNullValue());
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getEnrichKey(), equalTo("host"));
assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing));
assertThat(result.getSpecifications().size(), equalTo(numRandomValues));
for (int i = 0; i < numRandomValues; i++) {
EnrichSpecification actual = result.getSpecifications().get(i);
Tuple<String, String> expected = randomValues.get(i);
assertThat(actual.sourceField, equalTo(expected.v1()));
assertThat(actual.targetField, equalTo(expected.v2()));
}
}
public void testPolicyDoesNotExist() {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier(), null);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
boolean keyIgnoreMissing = randomBoolean();
if (keyIgnoreMissing || randomBoolean()) {
config.put("ignore_missing", keyIgnoreMissing);
}
int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
List<Map<String, Object>> valuesConfig = new ArrayList<>(numRandomValues);
for (Tuple<String, String> tuple : randomValues) {
Map<String, Object> entry = new HashMap<>();
entry.put("source", tuple.v1());
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("policy [majestic] does not exists"));
}
public void testPolicyNameMissing() {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "source_index", "my_key", enrichValues, "schedule");
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("_name", policy), null);
Map<String, Object> config = new HashMap<>();
config.put("enrich_key", "host");
boolean keyIgnoreMissing = randomBoolean();
if (keyIgnoreMissing || randomBoolean()) {
config.put("ignore_missing", keyIgnoreMissing);
}
int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
List<Map<String, Object>> valuesConfig = new ArrayList<>(numRandomValues);
for (Tuple<String, String> tuple : randomValues) {
Map<String, Object> entry = new HashMap<>();
entry.put("source", tuple.v1());
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("[policy_name] required property is missing"));
}
public void testUnsupportedPolicy() {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy("unsupported", null, "source_index", "my_key", enrichValues, "schedule");
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
boolean keyIgnoreMissing = randomBoolean();
if (keyIgnoreMissing || randomBoolean()) {
config.put("ignore_missing", keyIgnoreMissing);
}
int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
}
List<Map<String, Object>> valuesConfig = new ArrayList<>(numRandomValues);
for (Tuple<String, String> tuple : randomValues) {
Map<String, Object> entry = new HashMap<>();
entry.put("source", tuple.v1());
entry.put("target", tuple.v2());
valuesConfig.add(entry);
}
config.put("enrich_values", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("unsupported policy type [unsupported]"));
}
public void testNonExistingDecorateField() throws Exception {
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "source_index", "my_key", enrichValues, "schedule");
EnrichProcessorFactory factory = new EnrichProcessorFactory(createClusterStateSupplier("majestic", policy), null);
Map<String, Object> config = new HashMap<>();
config.put("policy_name", "majestic");
config.put("enrich_key", "host");
Map<String, Object> entry = new HashMap<>();
entry.put("source", "rank");
entry.put("target", "rank");
List<Map<String, Object>> valuesConfig = Collections.singletonList(entry);
config.put("enrich_values", valuesConfig);
Exception e = expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
assertThat(e.getMessage(), equalTo("source field [rank] does not exist in policy [majestic]"));
}
private static Supplier<ClusterState> createClusterStateSupplier(String policyName, EnrichPolicy policy) {
EnrichMetadata enrichMetadata = new EnrichMetadata(Collections.singletonMap(policyName, policy));
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build())
.build();
return () -> state;
}
private static Supplier<ClusterState> createClusterStateSupplier() {
EnrichMetadata enrichMetadata = new EnrichMetadata(Collections.emptyMap());
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(EnrichMetadata.TYPE, enrichMetadata).build())
.build();
return () -> state;
}
}

View File

@ -0,0 +1,269 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
public class ExactMatchProcessorTests extends ESTestCase {
public void testBasics() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.addDocument(createEnrichDocument("google.com", "globalRank", 1, "tldRank", 1, "tld", "com"));
indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co"));
indexWriter.addDocument(createEnrichDocument("bbc.co.uk", "globalRank", 45, "tldRank", 14, "tld", "co.uk"));
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
indexWriter.commit();
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key",
Collections.emptyList(), "schedule");
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23));
assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co"));
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "eops.nl"));
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(80));
assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("nl"));
}
}
}
}
public void testNoMatch() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.addDocument(createEnrichDocument("google.com", "globalRank", 1, "tldRank", 1, "tld", "com"));
indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co"));
indexWriter.addDocument(createEnrichDocument("bbc.co.uk", "globalRank", 45, "tldRank", 14, "tld", "co.uk"));
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
indexWriter.commit();
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key",
Collections.emptyList(), "schedule");
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.com"));
int numProperties = ingestDocument.getSourceAndMetadata().size();
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties));
}
}
}
}
public void testMoreThanOneSegment() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co"));
indexWriter.commit();
indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl"));
indexWriter.commit();
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key",
Collections.emptyList(), "schedule");
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("enrich index must have exactly a single segment"));
}
}
}
}
public void testEmptyIndex() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
indexWriter.commit();
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key",
Collections.emptyList(), "schedule");
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
int numProperties = ingestDocument.getSourceAndMetadata().size();
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties));
}
}
}
}
public void testEnrichKeyFieldMissing() throws Exception {
try (Directory directory = newDirectory()) {
IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random()));
iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) {
Document document = new Document();
document.add(new StringField("different_key", "elastic.co", Field.Store.NO));
indexWriter.addDocument(document);
indexWriter.commit();
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key",
Collections.emptyList(), "schedule");
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Function<String, Engine.Searcher> searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader);
ExactMatchProcessor processor =
new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false,
Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Collections.singletonMap("domain", "elastic.co"));
Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("enrich key field [key] does not exist"));
}
}
}
}
public void testPolicyMissing() {
Function<String, EnrichPolicy> policyLookup = policyName -> null;
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain",
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
}
public void testIgnoreKeyMissing() throws Exception {
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, "majestic_index", "key",
Collections.emptyList(), "schedule");
Function<String, EnrichPolicy> policyLookup = policyName -> policy;
{
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain",
true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
assertThat(processor.execute(ingestDocument), notNullValue());
assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
}
{
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain",
false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument =
new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap());
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
}
}
private static Document createEnrichDocument(String key, Object... decorateValues) throws IOException {
assert decorateValues.length % 2 ==0;
BytesReference decorateContent;
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < decorateValues.length; i += 2) {
map.put((String) decorateValues[i], decorateValues[i + 1]);
}
builder.map(map);
builder.flush();
ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream();
decorateContent = new BytesArray(outputStream.toByteArray());
}
Document document = new Document();
document.add(new StringField("key", key, Field.Store.NO));
document.add(new StoredField(SourceFieldMapper.NAME, decorateContent.toBytesRef()));
return document;
}
}