diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4d8477257a2..49d3cc1513b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -127,7 +128,8 @@ public class IngestService implements ClusterStateApplier { } IndexShard indexShard = indexService.getShard(0); - return indexShard.acquireSearcher("ingest"); + IndexMetaData imd = state.metaData().index(index); + return new Tuple<>(imd, indexShard.acquireSearcher("ingest")); } ) ); diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 5d01101e8be..2821be5c262 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -114,14 +116,16 @@ public interface Processor { /** * Provides access to an engine searcher of a locally allocated index specified for the provided index. + * The input of this function is an index expression and this function returns the {@link IndexMetaData} + * of the resolved locally allocated index and {@link Engine.Searcher} instance for the resolved index. * * The locally allocated index must be have a single primary shard. */ - public final Function localShardSearcher; + public final Function> localShardSearcher; public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService, Function localShardSearcher) { + IngestService ingestService, Function> localShardSearcher) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java index d15f9d68150..c5a68458298 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexModule; @@ -180,9 +182,9 @@ public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase { static final String NAME = "test_processor"; - private final Function localShardSearcher; + private final Function> localShardSearcher; - TestProcessor(String tag, Function localShardSearcher) { + TestProcessor(String tag, Function> localShardSearcher) { super(tag); this.localShardSearcher = localShardSearcher; } @@ -190,7 +192,7 @@ public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String indexExpression = "reference-index"; - try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) { + try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression).v2()) { // Ensure that search wrapper has been invoked by checking the directory instance type: if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) { // asserting or throwing a AssertionError makes this test hang: @@ -210,9 +212,9 @@ public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase { static class Factory implements Processor.Factory { - private final Function localShardSearcher; + private final Function> localShardSearcher; - Factory(Function localShardSearcher) { + Factory(Function> localShardSearcher) { this.localShardSearcher = localShardSearcher; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java index f8c036fd2ee..0f82463aefa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java @@ -132,7 +132,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment { return schedule; } - public String getBaseName(String policyName) { + public static String getBaseName(String policyName) { return ENRICH_INDEX_NAME_BASE + policyName; } diff --git a/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java b/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java index 36519d68f0b..5477e5c4f5e 100644 --- a/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java +++ b/x-pack/plugin/enrich/qa/rest/src/test/java/org/elasticsearch/xpack/enrich/EnrichIT.java @@ -10,6 +10,7 @@ import org.apache.http.entity.ContentType; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -45,6 +46,10 @@ public class EnrichIT extends ESRestTestCase { "\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"], \"schedule\": \"0 5 * * *\"}"); assertOK(client().performRequest(putPolicyRequest)); + // create index (remove when execute policy api has been added) + String mapping = "\"_meta\": {\"enrich_key_field\": \"host\"}"; + createIndex(".enrich-my_policy", Settings.EMPTY, mapping); + // Add a single enrich document for now and then refresh: Request indexRequest = new Request("PUT", "/.enrich-my_policy/_doc/elastic.co"); XContentBuilder document = XContentBuilder.builder(XContentType.SMILE.xContent()); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 46af214bd50..ea1d101ddde 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -45,6 +45,8 @@ import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME; + public class EnrichPolicyRunner implements Runnable { private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class); @@ -145,6 +147,9 @@ public class EnrichPolicyRunner implements Runnable { .field("doc_values", false) .endObject() .endObject() + .startObject("_meta") + .field(ENRICH_KEY_FIELD_NAME, policy.getEnrichKey()) + .endObject() .endObject() .endObject(); @@ -156,7 +161,7 @@ public class EnrichPolicyRunner implements Runnable { private void prepareAndCreateEnrichIndex() { long nowTimestamp = nowSupplier.getAsLong(); - String enrichIndexName = policy.getBaseName(policyName) + "-" + nowTimestamp; + String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp; Settings enrichIndexSettings = Settings.builder() .put("index.auto_expand_replicas", "0-all") .build(); @@ -231,7 +236,7 @@ public class EnrichPolicyRunner implements Runnable { } private void updateEnrichPolicyAlias(final String destinationIndexName) { - String enrichIndexBase = policy.getBaseName(policyName); + String enrichIndexBase = EnrichPolicy.getBaseName(policyName); logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase); GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase); String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), aliasRequest); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 91a2236cec6..a1e1f1e1870 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; @@ -22,10 +24,10 @@ final class EnrichProcessorFactory implements Processor.Factory { static final String TYPE = "enrich"; private final Function policyLookup; - private final Function searchProvider; + private final Function> searchProvider; EnrichProcessorFactory(Supplier clusterStateSupplier, - Function searchProvider) { + Function> searchProvider) { this.policyLookup = policyName -> EnrichStore.getPolicy(policyName, clusterStateSupplier.get()); this.searchProvider = searchProvider; } @@ -57,7 +59,7 @@ final class EnrichProcessorFactory implements Processor.Factory { switch (policy.getType()) { case EnrichPolicy.EXACT_MATCH_TYPE: - return new ExactMatchProcessor(tag, policyLookup, searchProvider, policyName, enrichKey, ignoreMissing, specifications); + return new ExactMatchProcessor(tag, searchProvider, policyName, enrichKey, ignoreMissing, specifications); default: throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]"); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java index 9a9835af8e4..c33501f39b4 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java @@ -11,8 +11,10 @@ import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; @@ -29,8 +31,9 @@ import java.util.function.Function; final class ExactMatchProcessor extends AbstractProcessor { - private final Function policyLookup; - private final Function searchProvider; + static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field"; + + private final Function> searchProvider; private final String policyName; private final String enrichKey; @@ -38,14 +41,12 @@ final class ExactMatchProcessor extends AbstractProcessor { private final List specifications; ExactMatchProcessor(String tag, - Function policyLookup, - Function searchProvider, + Function> searchProvider, String policyName, String enrichKey, boolean ignoreMissing, List specifications) { super(tag); - this.policyLookup = policyLookup; this.searchProvider = searchProvider; this.policyName = policyName; this.enrichKey = enrichKey; @@ -55,18 +56,16 @@ final class ExactMatchProcessor extends AbstractProcessor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - final EnrichPolicy policy = policyLookup.apply(policyName); - if (policy == null) { - throw new IllegalArgumentException("policy [" + policyName + "] does not exists"); - } - final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); if (value == null) { return ingestDocument; } // TODO: re-use the engine searcher between enriching documents from the same write request - try (Engine.Searcher engineSearcher = searchProvider.apply(policy.getBaseName(policyName))) { + Tuple tuple = searchProvider.apply(EnrichPolicy.getBaseName(policyName)); + String enrichKeyField = getEnrichKeyField(tuple.v1()); + + try (Engine.Searcher engineSearcher = tuple.v2()) { if (engineSearcher.getDirectoryReader().leaves().size() == 0) { return ingestDocument; } else if (engineSearcher.getDirectoryReader().leaves().size() != 1) { @@ -74,9 +73,9 @@ final class ExactMatchProcessor extends AbstractProcessor { } final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader(); - final Terms terms = leafReader.terms(policy.getEnrichKey()); + final Terms terms = leafReader.terms(enrichKeyField); if (terms == null) { - throw new IllegalStateException("enrich key field [" + policy.getEnrichKey() + "] does not exist"); + throw new IllegalStateException("enrich key field does not exist"); } final TermsEnum tenum = terms.iterator(); @@ -124,4 +123,22 @@ final class ExactMatchProcessor extends AbstractProcessor { List getSpecifications() { return specifications; } + + private static String getEnrichKeyField(IndexMetaData imd) { + if (imd == null) { + throw new IllegalStateException("enrich index is missing"); + } + + Map mappingSource = imd.mapping().getSourceAsMap(); + Map meta = (Map) mappingSource.get("_meta"); + if (meta == null) { + throw new IllegalStateException("_meta field is missing in enrich index"); + } + + String fieldName = (String) meta.get(ENRICH_KEY_FIELD_NAME); + if (fieldName == null) { + throw new IllegalStateException("enrich key fieldname missing"); + } + return fieldName; + } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java index 265eec7c004..8616e494ace 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java @@ -17,8 +17,12 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -26,7 +30,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import java.io.ByteArrayOutputStream; @@ -37,6 +40,7 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Function; +import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -53,16 +57,9 @@ public class ExactMatchProcessorTests extends ESTestCase { indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl")); indexWriter.commit(); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, - Collections.singletonList("majestic_index"), "key", Collections.emptyList(), "schedule"); - Function policyLookup = policyName -> policy; - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, @@ -73,11 +70,8 @@ public class ExactMatchProcessorTests extends ESTestCase { } try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, @@ -101,16 +95,9 @@ public class ExactMatchProcessorTests extends ESTestCase { indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl")); indexWriter.commit(); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"), - "key", Collections.emptyList(), "schedule"); - Function policyLookup = policyName -> policy; - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, @@ -133,16 +120,9 @@ public class ExactMatchProcessorTests extends ESTestCase { indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl")); indexWriter.commit(); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"), - "key", Collections.emptyList(), "schedule"); - Function policyLookup = policyName -> policy; - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, @@ -161,16 +141,9 @@ public class ExactMatchProcessorTests extends ESTestCase { try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { indexWriter.commit(); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"), - "key", Collections.emptyList(), "schedule"); - Function policyLookup = policyName -> policy; - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, @@ -193,43 +166,69 @@ public class ExactMatchProcessorTests extends ESTestCase { indexWriter.addDocument(document); indexWriter.commit(); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"), - "key", Collections.emptyList(), "schedule"); - Function policyLookup = policyName -> policy; - try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Function searchProvider = index -> new Engine.Searcher("_enrich", indexSearcher, indexReader); - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", policyLookup, searchProvider, "_name", "domain", false, + new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.singletonMap("domain", "elastic.co")); Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); - assertThat(e.getMessage(), equalTo("enrich key field [key] does not exist")); + assertThat(e.getMessage(), equalTo("enrich key field does not exist")); } } } } - public void testPolicyMissing() { - Function policyLookup = policyName -> null; - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain", - true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - IngestDocument ingestDocument = - new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); - expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + public void testIndexMetadataMissing() { + Function> provider = indexExpression -> new Tuple<>(null, null); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, + Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Collections.singletonMap("domain", "elastic.co")); + expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); + } + + public void testMetaFieldMissing() throws Exception { + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexMetaData imd = IndexMetaData.builder("majestic_index") + .settings(indexSettings) + .putMapping("_doc", "{}") + .build(); + + Function> provider = indexExpression -> new Tuple<>(imd, null); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, + Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Collections.singletonMap("domain", "elastic.co")); + expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); + } + + public void testEnrichKeyFieldNameMissing() throws Exception { + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexMetaData imd = IndexMetaData.builder("majestic_index") + .settings(indexSettings) + .putMapping("_doc", "{\"_meta\": {}}") + .build(); + + Function> provider = indexExpression -> new Tuple<>(imd, null); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, + Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Collections.singletonMap("domain", "elastic.co")); + expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); } public void testIgnoreKeyMissing() throws Exception { - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("majestic_index"), "key", - Collections.emptyList(), - "schedule"); - Function policyLookup = policyName -> policy; { - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain", + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", ndexExpression -> null, "_name", "domain", true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); @@ -239,7 +238,7 @@ public class ExactMatchProcessorTests extends ESTestCase { assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); } { - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", policyLookup, indexExpression -> null, "_name", "domain", + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", indexExpression -> null, "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); @@ -267,4 +266,17 @@ public class ExactMatchProcessorTests extends ESTestCase { return document; } + private static Function> createSearchProvider(IndexReader indexReader) throws Exception { + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexMetaData imd = IndexMetaData.builder("majestic_index") + .settings(indexSettings) + .putMapping("_doc", "{\"_meta\": {\"" + ENRICH_KEY_FIELD_NAME +"\": \"key\"}}") + .build(); + + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + Engine.Searcher searcher = new Engine.Searcher("_enrich", indexSearcher, indexReader); + return indexExpression -> new Tuple<>(imd, searcher); + } }