diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 971dede5b7f..cca8afd8c09 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -121,7 +121,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class), new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class), new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class), - new ActionHandler<>(CoordinatorProxyAction.INSTANCE, CoordinatorProxyAction.TransportAction.class) + new ActionHandler<>(CoordinatorProxyAction.INSTANCE, CoordinatorProxyAction.TransportAction.class), + new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class) ); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java index 48bffea1684..3e702901902 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java @@ -81,8 +81,8 @@ final class ExactMatchProcessor extends AbstractProcessor { TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value); ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); - // TODO: Use a custom transport action instead of the search API SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.from(0); searchBuilder.size(1); searchBuilder.trackScores(false); searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/CoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/CoordinatorProxyAction.java index cd27af8835e..ca98c6c107e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/CoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/CoordinatorProxyAction.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -24,10 +26,14 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.enrich.EnrichPlugin; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** @@ -71,8 +77,7 @@ public class CoordinatorProxyAction extends ActionType { public Coordinator(Client client, Settings settings) { this( - (request, consumer) -> client.multiSearch(request, - ActionListener.wrap(response -> consumer.accept(response, null), e -> consumer.accept(null, e))), + lookupFunction(client), EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings), EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings), EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.get(settings) @@ -138,7 +143,7 @@ public class CoordinatorProxyAction extends ActionType { throw new AssertionError("no response and no error"); } - // There may be room to for a new request now the numberOfOutstandingRequests has been decreased: + // There may be room to for a new request now that numberOfOutstandingRequests has been decreased: coordinateLookups(); } @@ -153,6 +158,68 @@ public class CoordinatorProxyAction extends ActionType { } } + static BiConsumer> lookupFunction(ElasticsearchClient client) { + return (request, consumer) -> { + int slot = 0; + final Map>> itemsPerIndex = new HashMap<>(); + for (SearchRequest searchRequest : request.requests()) { + List> items = + itemsPerIndex.computeIfAbsent(searchRequest.indices()[0], k -> new ArrayList<>()); + items.add(new Tuple<>(slot, searchRequest)); + slot++; + } + + final AtomicInteger counter = new AtomicInteger(0); + final ConcurrentMap> shardResponses = new ConcurrentHashMap<>(); + for (Map.Entry>> entry : itemsPerIndex.entrySet()) { + final String enrichIndexName = entry.getKey(); + final List> enrichIndexRequestsAndSlots = entry.getValue(); + ActionListener listener = ActionListener.wrap( + response -> { + shardResponses.put(enrichIndexName, new Tuple<>(response, null)); + if (counter.incrementAndGet() == itemsPerIndex.size()) { + consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null); + } + }, + e -> { + shardResponses.put(enrichIndexName, new Tuple<>(null, e)); + if (counter.incrementAndGet() == itemsPerIndex.size()) { + consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null); + } + } + ); + + MultiSearchRequest mrequest = new MultiSearchRequest(); + enrichIndexRequestsAndSlots.stream().map(Tuple::v2).forEach(mrequest::add); + client.execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(mrequest), listener); + } + }; + } + + static MultiSearchResponse reduce(int numRequest, + Map>> itemsPerIndex, + Map> shardResponses) { + MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numRequest]; + for (Map.Entry> rspEntry : shardResponses.entrySet()) { + List> reqSlots = itemsPerIndex.get(rspEntry.getKey()); + if (rspEntry.getValue().v1() != null) { + MultiSearchResponse shardResponse = rspEntry.getValue().v1(); + for (int i = 0; i < shardResponse.getResponses().length; i++) { + int slot = reqSlots.get(i).v1(); + items[slot] = shardResponse.getResponses()[i]; + } + } else if (rspEntry.getValue().v2() != null) { + Exception e = rspEntry.getValue().v2(); + for (Tuple originSlot : reqSlots) { + items[originSlot.v1()] = new MultiSearchResponse.Item(null, e); + } + } else { + throw new AssertionError(); + } + } + return new MultiSearchResponse(items, 1L); + } + } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java new file mode 100644 index 00000000000..22e8d3aa264 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java @@ -0,0 +1,269 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * This is an internal action, that executes msearch requests for enrich indices in a more efficient manner. + * Currently each search request inside a msearch request is executed as a separate search. If many search requests + * are targeted to the same shards then there is quite some overhead in executing each search request as a separate + * search (multiple search contexts, opening of multiple searchers). + * + * In case for the enrich processor, searches are always targeting the same single shard indices. This action + * handles multi search requests targeting enrich indices more efficiently by executing them in a bulk using the same + * searcher and query shard context. + * + * This action (plus some coordination logic in {@link CoordinatorProxyAction}) can be removed when msearch can + * execute search requests targeted to the same shard more efficiently in a bulk like style. + * + * Note that this 'msearch' implementation only supports executing a query, pagination and source filtering. + * Other search features are not supported, because the enrich processor isn't using these search features. + */ +public class EnrichShardMultiSearchAction extends ActionType { + + public static final EnrichShardMultiSearchAction INSTANCE = new EnrichShardMultiSearchAction(); + private static final String NAME = "indices:data/read/shard_multi_search"; + + private EnrichShardMultiSearchAction() { + super(NAME, MultiSearchResponse::new); + } + + public static class Request extends SingleShardRequest { + + private final MultiSearchRequest multiSearchRequest; + + public Request(MultiSearchRequest multiSearchRequest) { + super(multiSearchRequest.requests().get(0).indices()[0]); + this.multiSearchRequest = multiSearchRequest; + assert multiSearchRequest.requests().stream() + .map(SearchRequest::indices) + .flatMap(Arrays::stream) + .distinct() + .count() == 1 : "action [" + NAME + "] cannot handle msearch request pointing to multiple indices"; + assert assertSearchSource(); + } + + public Request(StreamInput in) throws IOException { + super(in); + multiSearchRequest = new MultiSearchRequest(in); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = validateNonNullIndex(); + if (index.startsWith(EnrichPolicy.ENRICH_INDEX_NAME_BASE) == false) { + validationException = ValidateActions.addValidationError("index [" + index + "] is not an enrich index", + validationException); + } + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + multiSearchRequest.writeTo(out); + } + + MultiSearchRequest getMultiSearchRequest() { + return multiSearchRequest; + } + + private boolean assertSearchSource() { + for (SearchRequest request : multiSearchRequest.requests()) { + SearchSourceBuilder copy = copy(request.source()); + + // validate that only a from, size, query and source filtering has been provided (other features are not supported): + // (first unset, what is supported and then see if there is anything left) + copy.query(null); + copy.from(0); + copy.size(10); + copy.fetchSource(null); + assert EMPTY_SOURCE.equals(copy) : "search request [" + Strings.toString(copy) + + "] is using features that is not supported"; + } + return true; + } + + private SearchSourceBuilder copy(SearchSourceBuilder source) { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + NamedWriteableRegistry registry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + try (BytesStreamOutput output = new BytesStreamOutput()) { + source.writeTo(output); + try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), registry)) { + return new SearchSourceBuilder(in); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static final SearchSourceBuilder EMPTY_SOURCE = new SearchSourceBuilder() + // can't set -1 to indicate not specified + .from(0).size(10); + } + + public static class TransportAction extends TransportSingleShardAction { + + private final IndicesService indicesService; + + @Inject + public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + IndicesService indicesService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, + Request::new, ThreadPool.Names.SEARCH); + this.indicesService = indicesService; + } + + @Override + protected Writeable.Reader getResponseReader() { + return MultiSearchResponse::new; + } + + @Override + protected boolean resolveIndex(Request request) { + return true; + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + String index = request.concreteIndex(); + IndexRoutingTable indexRouting = state.routingTable().index(index); + int numShards = indexRouting.shards().size(); + if (numShards != 1) { + throw new IllegalStateException("index [" + index + "] should have 1 shard, but has " + numShards + " shards"); + } + return indexRouting.shard(0).shardsRandomIt(); + } + + @Override + protected MultiSearchResponse shardOperation(Request request, ShardId shardId) throws IOException { + final long nowInMillis = System.currentTimeMillis(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard indexShard = indicesService.getShardOrNull(shardId); + try (Engine.Searcher searcher = indexShard.acquireSearcher("enrich_msearch")) { + final FieldsVisitor visitor = new FieldsVisitor(true); + final QueryShardContext context = + indexService.newQueryShardContext(shardId.id(), searcher.getIndexReader(), () -> nowInMillis, null); + final MapperService mapperService = context.getMapperService(); + final Text typeText = mapperService.documentMapper().typeText(); + + final MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[request.multiSearchRequest.requests().size()]; + for (int i = 0; i < request.multiSearchRequest.requests().size(); i++) { + final SearchSourceBuilder searchSourceBuilder = request.multiSearchRequest.requests().get(i).source(); + + final QueryBuilder queryBuilder = searchSourceBuilder.query(); + final int from = searchSourceBuilder.from(); + final int size = searchSourceBuilder.size(); + final FetchSourceContext fetchSourceContext = searchSourceBuilder.fetchSource(); + + final Query luceneQuery = queryBuilder.rewrite(context).toQuery(context); + final int n = from + size; + final TopDocs topDocs = searcher.search(luceneQuery, n, new Sort(SortField.FIELD_DOC)); + + final SearchHit[] hits = new SearchHit[topDocs.scoreDocs.length]; + for (int j = 0; j < topDocs.scoreDocs.length; j++) { + final ScoreDoc scoreDoc = topDocs.scoreDocs[j]; + + visitor.reset(); + searcher.doc(scoreDoc.doc, visitor); + visitor.postProcess(mapperService); + final SearchHit hit = new SearchHit(scoreDoc.doc, visitor.uid().id(), typeText, Collections.emptyMap()); + hit.sourceRef(filterSource(fetchSourceContext, visitor.source())); + hits[j] = hit; + } + items[i] = new MultiSearchResponse.Item(createSearchResponse(topDocs, hits), null); + } + return new MultiSearchResponse(items, 1L); + } + } + + } + + private static BytesReference filterSource(FetchSourceContext fetchSourceContext, BytesReference source) throws IOException { + Set includes = new HashSet<>(Arrays.asList(fetchSourceContext.includes())); + Set excludes = new HashSet<>(Arrays.asList(fetchSourceContext.excludes())); + + XContentBuilder builder = + new XContentBuilder(XContentType.SMILE.xContent(), new BytesStreamOutput(source.length()), includes, excludes); + XContentParser sourceParser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, source, XContentType.SMILE); + builder.copyCurrentStructure(sourceParser); + return BytesReference.bytes(builder); + } + + private static SearchResponse createSearchResponse(TopDocs topDocs, SearchHit[] hits) { + SearchHits searchHits = new SearchHits(hits, topDocs.totalHits, 0); + return new SearchResponse( + new InternalSearchResponse(searchHits, null, null, null, false, null, 0), + null, 1, 1, 0, 1L, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY + ); + } + +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 8bdc050f75c..6937f7b9700 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -86,6 +86,49 @@ public class BasicEnrichTests extends ESSingleNodeTestCase { } } + public void testMultiplePolicies() { + int numPolicies = 8; + for (int i = 0; i < numPolicies; i++) { + String policyName = "policy" + i; + + IndexRequest indexRequest = new IndexRequest("source-" + i); + indexRequest.source("key", "key", "value", "val" + i); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet(); + + EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, + Collections.singletonList("source-" + i), "key", Collections.singletonList("value")); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + String pipelineName = "pipeline" + i; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + + "\", \"enrich_values\": [{\"source\": \"value\", \"target\": \"value\"}" + + "]}}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + } + + BulkRequest bulkRequest = new BulkRequest("my-index"); + for (int i = 0; i < numPolicies; i++) { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.id(Integer.toString(i)); + indexRequest.setPipeline("pipeline" + i); + indexRequest.source(Collections.singletonMap("key", "key")); + bulkRequest.add(indexRequest); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + + for (int i = 0; i < numPolicies; i++) { + GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet(); + Map source = getResponse.getSourceAsMap(); + assertThat(source.size(), equalTo(2)); + assertThat(source.get("value"), equalTo("val" + i)); + } + } + private List createSourceIndex(int numDocs) { Set keys = new HashSet<>(); for (int i = 0; i < numDocs; i++) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java index 0a6fdba1f4f..46ab7cb251b 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java @@ -7,22 +7,34 @@ package org.elasticsearch.xpack.enrich.action; import org.apache.logging.log4j.util.BiConsumer; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.mockito.Mockito; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction.Coordinator; @@ -207,6 +219,78 @@ public class CoordinatorTests extends ESTestCase { assertThat(lookupFunction.capturedRequests.get(1).requests().get(0), sameInstance(searchRequest)); } + public void testLookupFunction() { + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + List indices = Arrays.asList("index1", "index2", "index3"); + for (String index : indices) { + multiSearchRequest.add(new SearchRequest(index)); + multiSearchRequest.add(new SearchRequest(index)); + } + + List requests = new ArrayList<>(); + ElasticsearchClient client = new ElasticsearchClient() { + + @Override + public ActionFuture execute( + ActionType action, Request request) { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(ActionType action, + Request request, + ActionListener listener) { + requests.add((EnrichShardMultiSearchAction.Request) request); + } + + @Override + public ThreadPool threadPool() { + throw new UnsupportedOperationException(); + } + }; + BiConsumer> consumer = Coordinator.lookupFunction(client); + consumer.accept(multiSearchRequest, null); + + assertThat(requests.size(), equalTo(indices.size())); + requests.sort(Comparator.comparing(SingleShardRequest::index)); + for (int i = 0; i < indices.size(); i++) { + String index = indices.get(i); + assertThat(requests.get(i).index(), equalTo(index)); + assertThat(requests.get(i).getMultiSearchRequest().requests().size(), equalTo(2)); + assertThat(requests.get(i).getMultiSearchRequest().requests().get(0).indices().length, equalTo(1)); + assertThat(requests.get(i).getMultiSearchRequest().requests().get(0).indices()[0], equalTo(index)); + } + } + + public void testReduce() { + Map>> itemsPerIndex = new HashMap<>(); + Map> shardResponses = new HashMap<>(); + + MultiSearchResponse.Item item1 = new MultiSearchResponse.Item(emptySearchResponse(), null); + itemsPerIndex.put("index1", Arrays.asList(new Tuple<>(0, null), new Tuple<>(1, null), new Tuple<>(2, null))); + shardResponses.put("index1", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[]{item1, item1, item1}, 1), null)); + + Exception failure = new RuntimeException(); + itemsPerIndex.put("index2", Arrays.asList(new Tuple<>(3, null), new Tuple<>(4, null), new Tuple<>(5, null))); + shardResponses.put("index2", new Tuple<>(null, failure)); + + MultiSearchResponse.Item item2 = new MultiSearchResponse.Item(emptySearchResponse(), null); + itemsPerIndex.put("index3", Arrays.asList(new Tuple<>(6, null), new Tuple<>(7, null), new Tuple<>(8, null))); + shardResponses.put("index3", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[]{item2, item2, item2}, 1), null)); + + MultiSearchResponse result = Coordinator.reduce(9, itemsPerIndex, shardResponses); + assertThat(result.getResponses().length, equalTo(9)); + assertThat(result.getResponses()[0], sameInstance(item1)); + assertThat(result.getResponses()[1], sameInstance(item1)); + assertThat(result.getResponses()[2], sameInstance(item1)); + assertThat(result.getResponses()[3].getFailure(), sameInstance(failure)); + assertThat(result.getResponses()[4].getFailure(), sameInstance(failure)); + assertThat(result.getResponses()[5].getFailure(), sameInstance(failure)); + assertThat(result.getResponses()[6], sameInstance(item2)); + assertThat(result.getResponses()[7], sameInstance(item2)); + assertThat(result.getResponses()[8], sameInstance(item2)); + } + private static SearchResponse emptySearchResponse() { InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchActionTests.java new file mode 100644 index 00000000000..5d1bb6e98c4 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchActionTests.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.LocalStateEnrich; + +import java.util.Collection; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class EnrichShardMultiSearchActionTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singletonList(LocalStateEnrich.class); + } + + public void testExecute() throws Exception { + XContentBuilder source = XContentBuilder.builder(XContentType.SMILE.xContent()); + source.startObject(); + source.field("key1", "value1"); + source.field("key2", "value2"); + source.endObject(); + + String indexName = EnrichPolicy.ENRICH_INDEX_NAME_BASE + "1"; + IndexRequest indexRequest = new IndexRequest(indexName); + indexRequest.source(source); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); + + int numSearches = randomIntBetween(2, 32); + MultiSearchRequest request = new MultiSearchRequest(); + for (int i = 0; i < numSearches; i++) { + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source().from(0); + searchRequest.source().size(1); + searchRequest.source().query(new MatchAllQueryBuilder()); + searchRequest.source().fetchSource("key1", null); + request.add(searchRequest); + } + + MultiSearchResponse result = + client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet(); + assertThat(result.getResponses().length, equalTo(numSearches)); + for (int i = 0; i < numSearches; i++) { + assertThat(result.getResponses()[i].isFailure(), is(false)); + assertThat(result.getResponses()[i].getResponse().getHits().getTotalHits().value, equalTo(1L)); + assertThat(result.getResponses()[i].getResponse().getHits().getHits()[0].getSourceAsMap().size(), equalTo(1)); + assertThat(result.getResponses()[i].getResponse().getHits().getHits()[0].getSourceAsMap().get("key1"), equalTo("value1")); + } + } + + public void testNonEnrichIndex() throws Exception { + createIndex("index"); + MultiSearchRequest request = new MultiSearchRequest(); + request.add(new SearchRequest("index")); + Exception e = expectThrows(ActionRequestValidationException.class, + () -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet()); + assertThat(e.getMessage(), equalTo("Validation Failed: 1: index [index] is not an enrich index;")); + } + + public void testMultipleShards() throws Exception { + String indexName = EnrichPolicy.ENRICH_INDEX_NAME_BASE + "1"; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 2).build()); + MultiSearchRequest request = new MultiSearchRequest(); + request.add(new SearchRequest(indexName)); + Exception e = expectThrows(IllegalStateException.class, + () -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet()); + assertThat(e.getMessage(), equalTo("index [.enrich-1] should have 1 shard, but has 2 shards")); + } + + public void testMultipleIndices() throws Exception { + MultiSearchRequest request = new MultiSearchRequest(); + request.add(new SearchRequest("index1")); + request.add(new SearchRequest("index2")); + expectThrows(AssertionError.class, () -> new EnrichShardMultiSearchAction.Request(request)); + } + +}