diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 8f22e7573dc..fb41cc11cc4 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -30,14 +30,12 @@ import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -48,12 +46,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisRegistry; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -95,10 +89,9 @@ public class IngestService implements ClusterStateApplier { public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, - List ingestPlugins, IndicesService indicesService) { + List ingestPlugins, Client client) { this.clusterService = clusterService; this.scriptService = scriptService; - final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -106,35 +99,7 @@ public class IngestService implements ClusterStateApplier { threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this, indexExpression -> { - ClusterState state = clusterService.state(); - Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression); - if (resolvedIndices.length != 1) { - throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index"); - } - Index index = resolvedIndices[0]; - - // check if indexExpression matches with an alias that has a filter - // There is no guarantee that alias filters are applied, so fail if this is the case. - Set indicesAndAliases = resolver.resolveExpressions(state, indexExpression); - String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases); - if (aliasesWithFilter != null && aliasesWithFilter.length > 0) { - throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter"); - } - - IndexService indexService = indicesService.indexServiceSafe(index); - int numShards = indexService.getMetaData().getNumberOfShards(); - if (numShards != 1) { - throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards + - " shards"); - } - - IndexShard indexShard = indexService.getShard(0); - IndexMetaData imd = state.metaData().index(index); - return new Tuple<>(imd, indexShard.acquireSearcher("ingest")); - } - ) - ); + ), this, client)); this.threadPool = threadPool; } diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 2821be5c262..4de02ddeaa6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -19,18 +19,15 @@ package org.elasticsearch.ingest; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.LongSupplier; /** @@ -115,17 +112,13 @@ public interface Processor { public final BiFunction scheduler; /** - * Provides access to an engine searcher of a locally allocated index specified for the provided index. - * The input of this function is an index expression and this function returns the {@link IndexMetaData} - * of the resolved locally allocated index and {@link Engine.Searcher} instance for the resolved index. - * - * The locally allocated index must be have a single primary shard. + * Provides access to the client */ - public final Function> localShardSearcher; + public final Client client; public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService, Function> localShardSearcher) { + IngestService ingestService, Client client) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -133,7 +126,7 @@ public interface Processor { this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; this.ingestService = ingestService; - this.localShardSearcher = localShardSearcher; + this.client = client; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index a24de7d4c1a..5ea04e4e0b5 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -432,7 +432,7 @@ public class Node implements Closeable { final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), - pluginsService.filterPlugins(IngestPlugin.class), indicesService); + pluginsService.filterPlugins(IngestPlugin.class), client); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java deleted file mode 100644 index c5a68458298..00000000000 --- a/server/src/test/java/org/elasticsearch/ingest/IngestLocalShardSearcherTests.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.ingest; - -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; -import org.apache.lucene.index.LeafReader; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.IndexModule; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.shard.IndexSearcherWrapper; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESSingleNodeTestCase; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.function.Function; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; - -public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase { - - @Override - protected Collection> getPlugins() { - return Collections.singleton(TestPlugin.class); - } - - public void testLocalShardSearcher() throws Exception { - client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet(); - client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet(); - - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet(); - client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet(); - - Map result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap(); - assertThat(result.size(), equalTo(1)); - assertThat(result.get("id"), equalTo("1")); - } - - public void testMultipleIndicesAreResolved() throws Exception { - createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1") - .addAlias(new Alias("reference-index"))); - createIndex("reference-index2", client().admin().indices().prepareCreate("reference-index2") - .addAlias(new Alias("reference-index"))); - - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline"); - ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet()); - assertThat(e.getRootCause(), instanceOf(IllegalStateException.class)); - assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] can only point to a single concrete index")); - } - - public void testMoreThanOnePrimaryShard() throws Exception { - createIndex("reference-index", Settings.builder().put("index.number_of_shards", 2).build()); - - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline"); - ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet()); - assertThat(e.getRootCause(), instanceOf(IllegalStateException.class)); - assertThat(e.getRootCause().getMessage(), equalTo("index [reference-index] must have 1 shard, but has 2 shards")); - } - - public void testFailWithFilteredAlias() throws Exception { - createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1") - .addAlias(new Alias("reference-index").filter(QueryBuilders.matchAllQuery()))); - - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline"); - ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet()); - assertThat(e.getRootCause(), instanceOf(IllegalStateException.class)); - assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] points an alias with a filter")); - } - - public void testWithAlias() throws Exception { - createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1") - .addAlias(new Alias("reference-index"))); - - client().index(new IndexRequest("reference-index1").id("1").source("{}", XContentType.JSON)).actionGet(); - client().admin().indices().refresh(new RefreshRequest("reference-index1")).actionGet(); - - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet(); - client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet(); - - Map result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap(); - assertThat(result.size(), equalTo(1)); - assertThat(result.get("id"), equalTo("1")); - } - - public void testSearchWrapperIsApplied() throws Exception { - client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet(); - client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet(); - - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet(); - client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet(); - - Map result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap(); - assertThat(result.size(), equalTo(1)); - assertThat(result.get("id"), equalTo("1")); - } - - private static BytesReference createPipelineSource() throws IOException { - return BytesReference.bytes(jsonBuilder().startObject() - .startArray("processors") - .startObject() - .startObject(TestProcessor.NAME) - .endObject() - .endObject() - .endArray() - .endObject()); - } - - public static class TestPlugin extends Plugin implements IngestPlugin { - - @Override - public Map getProcessors(Processor.Parameters parameters) { - return Collections.singletonMap(TestProcessor.NAME, new TestProcessor.Factory(parameters.localShardSearcher)); - } - - @Override - public void onIndexModule(IndexModule indexModule) { - indexModule.setSearcherWrapper(indexService -> new IndexSearcherWrapper() { - - @Override - protected DirectoryReader wrap(DirectoryReader reader) throws IOException { - return new TestDirectyReader(reader); - } - }); - } - } - - static class TestProcessor extends AbstractProcessor { - - static final String NAME = "test_processor"; - - private final Function> localShardSearcher; - - TestProcessor(String tag, Function> localShardSearcher) { - super(tag); - this.localShardSearcher = localShardSearcher; - } - - @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - String indexExpression = "reference-index"; - try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression).v2()) { - // Ensure that search wrapper has been invoked by checking the directory instance type: - if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) { - // asserting or throwing a AssertionError makes this test hang: - // so just throw a runtime exception here: - throw new RuntimeException("unexpected directory instance type"); - } - Document document = engineSearcher.searcher().doc(0); - ingestDocument.setFieldValue("id", Uid.decodeId(document.getBinaryValue("_id").bytes)); - } - return ingestDocument; - } - - @Override - public String getType() { - return NAME; - } - - static class Factory implements Processor.Factory { - - private final Function> localShardSearcher; - - Factory(Function> localShardSearcher) { - this.localShardSearcher = localShardSearcher; - } - - @Override - public Processor create(Map processorFactories, - String tag, Map config) throws Exception { - return new TestProcessor(tag, localShardSearcher); - } - } - - } - - static class TestDirectyReader extends FilterDirectoryReader { - - TestDirectyReader(DirectoryReader in) throws IOException { - super(in, new TestSubReaderWrapper()); - } - - @Override - protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { - return new TestDirectyReader(in); - } - - @Override - public CacheHelper getReaderCacheHelper() { - return in.getReaderCacheHelper(); - } - - static class TestSubReaderWrapper extends SubReaderWrapper { - @Override - public LeafReader wrap(LeafReader reader) { - return new TestLeafReader(reader); - } - } - - static class TestLeafReader extends FilterLeafReader { - - TestLeafReader(LeafReader in) { - super(in); - } - - @Override - public CacheHelper getCoreCacheHelper() { - return in.getCoreCacheHelper(); - } - - @Override - public CacheHelper getReaderCacheHelper() { - return in.getReaderCacheHelper(); - } - } - } - -} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 94faee93ea3..5a8300f8b60 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1140,7 +1140,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new IngestService( clusterService, threadPool, environment, scriptService, new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), - Collections.emptyList(), null), + Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) )); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 65247d8ddd7..8127705b999 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -72,7 +72,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { @Override public Map getProcessors(Processor.Parameters parameters) { - EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.localShardSearcher); + EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client); parameters.ingestService.addIngestClusterStateListener(factory); return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 00e269ae995..1fcceb42afa 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -5,10 +5,8 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -17,17 +15,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; final class EnrichProcessorFactory implements Processor.Factory, Consumer { static final String TYPE = "enrich"; - private final Function> searchProvider; + private final Client client; volatile Map policies = Collections.emptyMap(); - EnrichProcessorFactory(Function> searchProvider) { - this.searchProvider = searchProvider; + EnrichProcessorFactory(Client client) { + this.client = client; } @Override @@ -57,7 +54,7 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer> searchProvider; + private final CheckedFunction searchRunner; private final String policyName; private final String enrichKey; @@ -41,13 +34,22 @@ final class ExactMatchProcessor extends AbstractProcessor { private final List specifications; ExactMatchProcessor(String tag, - Function> searchProvider, + Client client, + String policyName, + String enrichKey, + boolean ignoreMissing, + List specifications) { + this(tag, (req) -> client.search(req).actionGet(), policyName, enrichKey, ignoreMissing, specifications); + } + + ExactMatchProcessor(String tag, + CheckedFunction searchRunner, String policyName, String enrichKey, boolean ignoreMissing, List specifications) { super(tag); - this.searchProvider = searchProvider; + this.searchRunner = searchRunner; this.policyName = policyName; this.enrichKey = enrichKey; this.ignoreMissing = ignoreMissing; @@ -56,49 +58,45 @@ final class ExactMatchProcessor extends AbstractProcessor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + // If a document does not have the enrich key, return the unchanged document final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); if (value == null) { return ingestDocument; } - // TODO: re-use the engine searcher between enriching documents from the same write request - Tuple tuple = searchProvider.apply(EnrichPolicy.getBaseName(policyName)); - String enrichKeyField = getEnrichKeyField(tuple.v1()); + TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); + // TODO: Use a custom transport action instead of the search API + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.size(1); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null); + searchBuilder.query(constantScore); - try (Engine.Searcher engineSearcher = tuple.v2()) { - if (engineSearcher.getDirectoryReader().leaves().size() == 0) { - return ingestDocument; - } else if (engineSearcher.getDirectoryReader().leaves().size() != 1) { - throw new IllegalStateException("enrich index must have exactly a single segment"); - } + SearchRequest req = new SearchRequest(); + req.indices(EnrichPolicy.getBaseName(policyName)); + req.preference(Preference.LOCAL.type()); + req.source(searchBuilder); - final LeafReader leafReader = engineSearcher.getDirectoryReader().leaves().get(0).reader(); - final Terms terms = leafReader.terms(enrichKeyField); - if (terms == null) { - throw new IllegalStateException("enrich key field does not exist"); - } + // TODO: Make this Async + SearchResponse searchResponse = searchRunner.apply(req); - final TermsEnum tenum = terms.iterator(); - if (tenum.seekExact(new BytesRef(value))) { - PostingsEnum penum = tenum.postings(null, PostingsEnum.NONE); - final int docId = penum.nextDoc(); - assert docId != PostingsEnum.NO_MORE_DOCS : "no matching doc id for [" + enrichKey + "]"; - assert penum.nextDoc() == PostingsEnum.NO_MORE_DOCS : "more than one doc id matching for [" + enrichKey + "]"; + // If the index is empty, return the unchanged document + // If the enrich key does not exist in the index, throw an error + // If no documents match the key, return the unchanged document + SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits.length < 1) { + return ingestDocument; + } else if (searchHits.length > 1) { + throw new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"); + } - // TODO: The use of _source is temporarily until enrich source field mapper has been added (see PR #41521) - Document document = leafReader.document(docId, Collections.singleton(SourceFieldMapper.NAME)); - BytesRef source = document.getBinaryValue(SourceFieldMapper.NAME); - assert source != null; - - final BytesReference encoded = new BytesArray(source); - final Map decoded = - XContentHelper.convertToMap(encoded, false, XContentType.SMILE).v2(); - for (EnrichSpecification specification : specifications) { - Object enrichValue = decoded.get(specification.sourceField); - // TODO: add support over overwrite option (like in SetProcessor) - ingestDocument.setFieldValue(specification.targetField, enrichValue); - } - } + // If a document is returned, add its fields to the document + Map enrichDocument = searchHits[0].getSourceAsMap(); + assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length"; + for (EnrichSpecification specification : specifications) { + Object enrichFieldValue = enrichDocument.get(specification.sourceField); + ingestDocument.setFieldValue(specification.targetField, enrichFieldValue); } return ingestDocument; } @@ -123,22 +121,4 @@ final class ExactMatchProcessor extends AbstractProcessor { List getSpecifications() { return specifications; } - - private static String getEnrichKeyField(IndexMetaData imd) { - if (imd == null) { - throw new IllegalStateException("enrich index is missing"); - } - - Map mappingSource = imd.mapping().getSourceAsMap(); - Map meta = (Map) mappingSource.get("_meta"); - if (meta == null) { - throw new IllegalStateException("_meta field is missing in enrich index"); - } - - String fieldName = (String) meta.get(ENRICH_KEY_FIELD_NAME); - if (fieldName == null) { - throw new IllegalStateException("enrich key fieldname missing"); - } - return fieldName; - } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java index 8616e494ace..d7626168732 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java @@ -5,278 +5,216 @@ */ package org.elasticsearch.xpack.enrich; -import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.StoredField; -import org.apache.lucene.document.StringField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.store.Directory; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.function.Function; -import static org.elasticsearch.xpack.enrich.ExactMatchProcessor.ENRICH_KEY_FIELD_NAME; +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; public class ExactMatchProcessorTests extends ESTestCase { public void testBasics() throws Exception { - try (Directory directory = newDirectory()) { - IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); - iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); - try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { - indexWriter.addDocument(createEnrichDocument("google.com", "globalRank", 1, "tldRank", 1, "tld", "com")); - indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co")); - indexWriter.addDocument(createEnrichDocument("bbc.co.uk", "globalRank", 45, "tldRank", 14, "tld", "co.uk")); - indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl")); - indexWriter.commit(); - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - assertThat(processor.execute(ingestDocument), notNullValue()); - assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23)); - assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co")); - } - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "eops.nl")); - assertThat(processor.execute(ingestDocument), notNullValue()); - assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(80)); - assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("nl")); - } - } + Map> documents = new HashMap<>(); + { + Map document1 = new HashMap<>(); + document1.put("globalRank", 451); + document1.put("tldRank",23); + document1.put("tld", "co"); + documents.put("elastic.co", document1); } + MockSearchFunction mockSearch = mockedSearchFunction(documents); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, + Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Collections.singletonMap("domain", "elastic.co")); + // Run + assertThat(processor.execute(ingestDocument), notNullValue()); + // Check request + SearchRequest request = mockSearch.getCapturedRequest(); + assertThat(request.indices().length, equalTo(1)); + assertThat(request.indices()[0], equalTo(".enrich-_name")); + assertThat(request.preference(), equalTo(Preference.LOCAL.type())); + assertThat(request.source().size(), equalTo(1)); + assertThat(request.source().trackScores(), equalTo(false)); + assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); + assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld")); + assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); + assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class)); + TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); + assertThat(termQueryBuilder.fieldName(), equalTo("domain")); + assertThat(termQueryBuilder.value(), equalTo("elastic.co")); + // Check result + assertThat(ingestDocument.getFieldValue("tld_rank", Integer.class), equalTo(23)); + assertThat(ingestDocument.getFieldValue("tld", String.class), equalTo("co")); } public void testNoMatch() throws Exception { - try (Directory directory = newDirectory()) { - IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); - iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); - try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { - indexWriter.addDocument(createEnrichDocument("google.com", "globalRank", 1, "tldRank", 1, "tld", "com")); - indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co")); - indexWriter.addDocument(createEnrichDocument("bbc.co.uk", "globalRank", 45, "tldRank", 14, "tld", "co.uk")); - indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl")); - indexWriter.commit(); - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.com")); - int numProperties = ingestDocument.getSourceAndMetadata().size(); - assertThat(processor.execute(ingestDocument), notNullValue()); - assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties)); - } - } - } - } - - public void testMoreThanOneSegment() throws Exception { - try (Directory directory = newDirectory()) { - IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); - iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); - try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { - indexWriter.addDocument(createEnrichDocument("elastic.co", "globalRank", 451, "tldRank",23, "tld", "co")); - indexWriter.commit(); - indexWriter.addDocument(createEnrichDocument("eops.nl", "globalRank", 4567, "tldRank", 80, "tld", "nl")); - indexWriter.commit(); - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); - assertThat(e.getMessage(), equalTo("enrich index must have exactly a single segment")); - } - } - } - } - - public void testEmptyIndex() throws Exception { - try (Directory directory = newDirectory()) { - IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); - iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); - try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { - indexWriter.commit(); - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - int numProperties = ingestDocument.getSourceAndMetadata().size(); - assertThat(processor.execute(ingestDocument), notNullValue()); - assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties)); - } - } - } - } - - public void testEnrichKeyFieldMissing() throws Exception { - try (Directory directory = newDirectory()) { - IndexWriterConfig iwConfig = new IndexWriterConfig(new MockAnalyzer(random())); - iwConfig.setMergePolicy(NoMergePolicy.INSTANCE); - try (IndexWriter indexWriter = new IndexWriter(directory, iwConfig)) { - Document document = new Document(); - document.add(new StringField("different_key", "elastic.co", Field.Store.NO)); - indexWriter.addDocument(document); - indexWriter.commit(); - - try (IndexReader indexReader = DirectoryReader.open(directory)) { - ExactMatchProcessor processor = - new ExactMatchProcessor("_tag", createSearchProvider(indexReader), "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); - assertThat(e.getMessage(), equalTo("enrich key field does not exist")); - } - } - } - } - - public void testIndexMetadataMissing() { - Function> provider = indexExpression -> new Tuple<>(null, null); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, + MockSearchFunction mockSearch = mockedSearchFunction(); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); + Collections.singletonMap("domain", "elastic.com")); + int numProperties = ingestDocument.getSourceAndMetadata().size(); + // Run + assertThat(processor.execute(ingestDocument), notNullValue()); + // Check request + SearchRequest request = mockSearch.getCapturedRequest(); + assertThat(request.indices().length, equalTo(1)); + assertThat(request.indices()[0], equalTo(".enrich-_name")); + assertThat(request.preference(), equalTo(Preference.LOCAL.type())); + assertThat(request.source().size(), equalTo(1)); + assertThat(request.source().trackScores(), equalTo(false)); + assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); + assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld")); + assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); + assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class)); + TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); + assertThat(termQueryBuilder.fieldName(), equalTo("domain")); + assertThat(termQueryBuilder.value(), equalTo("elastic.com")); + // Check result + assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(numProperties)); } - public void testMetaFieldMissing() throws Exception { - Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexMetaData imd = IndexMetaData.builder("majestic_index") - .settings(indexSettings) - .putMapping("_doc", "{}") - .build(); - - Function> provider = indexExpression -> new Tuple<>(imd, null); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, + public void testSearchFailure() throws Exception { + String indexName = ".enrich-_name"; + MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName)); + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); - } - - public void testEnrichKeyFieldNameMissing() throws Exception { - Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexMetaData imd = IndexMetaData.builder("majestic_index") - .settings(indexSettings) - .putMapping("_doc", "{\"_meta\": {}}") - .build(); - - Function> provider = indexExpression -> new Tuple<>(imd, null); - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", provider, "_name", "domain", false, - Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Collections.singletonMap("domain", "elastic.co")); - expectThrows(IllegalStateException.class, () -> processor.execute(ingestDocument)); + Collections.singletonMap("domain", "elastic.com")); + // Run + IndexNotFoundException expectedException = expectThrows(IndexNotFoundException.class, () -> processor.execute(ingestDocument)); + // Check request + SearchRequest request = mockSearch.getCapturedRequest(); + assertThat(request.indices().length, equalTo(1)); + assertThat(request.indices()[0], equalTo(".enrich-_name")); + assertThat(request.preference(), equalTo(Preference.LOCAL.type())); + assertThat(request.source().size(), equalTo(1)); + assertThat(request.source().trackScores(), equalTo(false)); + assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); + assertThat(request.source().fetchSource().includes(), arrayContainingInAnyOrder("tldRank", "tld")); + assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); + assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(TermQueryBuilder.class)); + TermQueryBuilder termQueryBuilder = (TermQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); + assertThat(termQueryBuilder.fieldName(), equalTo("domain")); + assertThat(termQueryBuilder.value(), equalTo("elastic.com")); + // Check result + assertThat(expectedException.getMessage(), equalTo("no such index [" + indexName + "]")); } public void testIgnoreKeyMissing() throws Exception { { - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", ndexExpression -> null, "_name", "domain", + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", true, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - IngestDocument ingestDocument = - new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Collections.emptyMap()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); assertThat(processor.execute(ingestDocument), notNullValue()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); } { - ExactMatchProcessor processor = new ExactMatchProcessor("_tag", indexExpression -> null, "_name", "domain", + ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", false, Arrays.asList(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); - IngestDocument ingestDocument = - new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Collections.emptyMap()); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Collections.emptyMap()); expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); } } - private static Document createEnrichDocument(String key, Object... decorateValues) throws IOException { - assert decorateValues.length % 2 ==0; + private static final class MockSearchFunction implements CheckedFunction { + private final SearchResponse mockResponse; + private final SetOnce capturedRequest; + private final Exception exception; - BytesReference decorateContent; - try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { - Map map = new HashMap<>(); - for (int i = 0; i < decorateValues.length; i += 2) { - map.put((String) decorateValues[i], decorateValues[i + 1]); - } - builder.map(map); - builder.flush(); - ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); - decorateContent = new BytesArray(outputStream.toByteArray()); + MockSearchFunction(SearchResponse mockResponse) { + this.mockResponse = mockResponse; + this.exception = null; + this.capturedRequest = new SetOnce<>(); + } + + MockSearchFunction(Exception exception) { + this.mockResponse = null; + this.exception = exception; + this.capturedRequest = new SetOnce<>(); + } + + @Override + public SearchResponse apply(SearchRequest request) throws Exception { + capturedRequest.set(request); + if (exception != null) { + throw exception; + } else { + return mockResponse; + } + } + + SearchRequest getCapturedRequest() { + return capturedRequest.get(); } - Document document = new Document(); - document.add(new StringField("key", key, Field.Store.NO)); - document.add(new StoredField(SourceFieldMapper.NAME, decorateContent.toBytesRef())); - return document; } - private static Function> createSearchProvider(IndexReader indexReader) throws Exception { - Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexMetaData imd = IndexMetaData.builder("majestic_index") - .settings(indexSettings) - .putMapping("_doc", "{\"_meta\": {\"" + ENRICH_KEY_FIELD_NAME +"\": \"key\"}}") - .build(); + public MockSearchFunction mockedSearchFunction() { + return new MockSearchFunction(mockResponse(Collections.emptyMap())); + } - IndexSearcher indexSearcher = new IndexSearcher(indexReader); - Engine.Searcher searcher = new Engine.Searcher("_enrich", indexSearcher, indexReader); - return indexExpression -> new Tuple<>(imd, searcher); + public MockSearchFunction mockedSearchFunction(Exception exception) { + return new MockSearchFunction(exception); + } + + public MockSearchFunction mockedSearchFunction(Map> documents) { + return new MockSearchFunction(mockResponse(documents)); + } + + public SearchResponse mockResponse(Map> documents) { + SearchHit[] searchHits = documents.entrySet().stream().map(e -> { + SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME), + Collections.emptyMap()); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { + builder.map(e.getValue()); + builder.flush(); + ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); + searchHit.sourceRef(new BytesArray(outputStream.toByteArray())); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + return searchHit; + }).toArray(SearchHit[]::new); + return new SearchResponse(new SearchResponseSections( + new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f), + new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()), + false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0)); } }