Added a custom api to perform the msearch more efficiently for enrich processor (#43965)

Currently the msearch api is used to execute buffered search requests;
however the msearch api doesn't deal with search requests in an intelligent way.
It basically executes each search separately in a concurrent manner.

This api reuses the msearch request and response classes and executes
the searches as one request in the node holding the enrich index shard.
Things like engine.searcher and query shard context are only created once.
Also there are less layers than executing a regular msearch request. This
results in an interesting speedup.

Without this change, in a single node cluster, enriching documents
with a bulk size of 5000 items, the ingest time in each bulk response
varied from 174ms to 822ms. With this change the ingest time in each
bulk response varied from 54ms to 109ms.

I think we should add a change like this based on this improvement in ingest time.

However I do wonder if instead of doing this change, we should improve
the msearch api to execute more efficiently. That would be more complicated
then this change, because in this change the custom api can only search
enrich index shards and these are special because they always have a single
primary shard. If msearch api is to be improved then that should work for
any search request to any indices. Making the same optimization for
indices with more than 1 primary shard requires much more work.

The current change is isolated in the enrich plugin and LOC / complexity
is small. So this good enough for now.
This commit is contained in:
Martijn van Groningen 2019-08-09 08:55:38 +02:00
parent bb429d3b5c
commit f1ee29f22e
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
8 changed files with 568 additions and 6 deletions

View File

@ -121,7 +121,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
private final Item[] items;
private final long tookInMillis;
MultiSearchResponse(StreamInput in) throws IOException {
public MultiSearchResponse(StreamInput in) throws IOException {
super(in);
items = new Item[in.readVInt()];
for (int i = 0; i < items.length; i++) {

View File

@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction;
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.TransportGetEnrichPolicyAction;
@ -117,7 +118,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class),
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class),
new ActionHandler<>(CoordinatorProxyAction.INSTANCE, CoordinatorProxyAction.TransportAction.class)
new ActionHandler<>(CoordinatorProxyAction.INSTANCE, CoordinatorProxyAction.TransportAction.class),
new ActionHandler<>(EnrichShardMultiSearchAction.INSTANCE, EnrichShardMultiSearchAction.TransportAction.class)
);
}

View File

@ -81,8 +81,8 @@ final class ExactMatchProcessor extends AbstractProcessor {
TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
// TODO: Use a custom transport action instead of the search API
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(1);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null);

View File

@ -15,6 +15,8 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
@ -24,10 +26,14 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.enrich.EnrichPlugin;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -71,8 +77,7 @@ public class CoordinatorProxyAction extends ActionType<SearchResponse> {
public Coordinator(Client client, Settings settings) {
this(
(request, consumer) -> client.multiSearch(request,
ActionListener.wrap(response -> consumer.accept(response, null), e -> consumer.accept(null, e))),
lookupFunction(client),
EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings),
EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings),
EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.get(settings)
@ -138,7 +143,7 @@ public class CoordinatorProxyAction extends ActionType<SearchResponse> {
throw new AssertionError("no response and no error");
}
// There may be room to for a new request now the numberOfOutstandingRequests has been decreased:
// There may be room to for a new request now that numberOfOutstandingRequests has been decreased:
coordinateLookups();
}
@ -153,6 +158,68 @@ public class CoordinatorProxyAction extends ActionType<SearchResponse> {
}
}
static BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction(ElasticsearchClient client) {
return (request, consumer) -> {
int slot = 0;
final Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex = new HashMap<>();
for (SearchRequest searchRequest : request.requests()) {
List<Tuple<Integer, SearchRequest>> items =
itemsPerIndex.computeIfAbsent(searchRequest.indices()[0], k -> new ArrayList<>());
items.add(new Tuple<>(slot, searchRequest));
slot++;
}
final AtomicInteger counter = new AtomicInteger(0);
final ConcurrentMap<String, Tuple<MultiSearchResponse, Exception>> shardResponses = new ConcurrentHashMap<>();
for (Map.Entry<String, List<Tuple<Integer, SearchRequest>>> entry : itemsPerIndex.entrySet()) {
final String enrichIndexName = entry.getKey();
final List<Tuple<Integer, SearchRequest>> enrichIndexRequestsAndSlots = entry.getValue();
ActionListener<MultiSearchResponse> listener = ActionListener.wrap(
response -> {
shardResponses.put(enrichIndexName, new Tuple<>(response, null));
if (counter.incrementAndGet() == itemsPerIndex.size()) {
consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
}
},
e -> {
shardResponses.put(enrichIndexName, new Tuple<>(null, e));
if (counter.incrementAndGet() == itemsPerIndex.size()) {
consumer.accept(reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
}
}
);
MultiSearchRequest mrequest = new MultiSearchRequest();
enrichIndexRequestsAndSlots.stream().map(Tuple::v2).forEach(mrequest::add);
client.execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(mrequest), listener);
}
};
}
static MultiSearchResponse reduce(int numRequest,
Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex,
Map<String, Tuple<MultiSearchResponse, Exception>> shardResponses) {
MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numRequest];
for (Map.Entry<String, Tuple<MultiSearchResponse, Exception>> rspEntry : shardResponses.entrySet()) {
List<Tuple<Integer, SearchRequest>> reqSlots = itemsPerIndex.get(rspEntry.getKey());
if (rspEntry.getValue().v1() != null) {
MultiSearchResponse shardResponse = rspEntry.getValue().v1();
for (int i = 0; i < shardResponse.getResponses().length; i++) {
int slot = reqSlots.get(i).v1();
items[slot] = shardResponse.getResponses()[i];
}
} else if (rspEntry.getValue().v2() != null) {
Exception e = rspEntry.getValue().v2();
for (Tuple<Integer, SearchRequest> originSlot : reqSlots) {
items[originSlot.v1()] = new MultiSearchResponse.Item(null, e);
}
} else {
throw new AssertionError();
}
}
return new MultiSearchResponse(items, 1L);
}
}
}

View File

@ -0,0 +1,269 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* This is an internal action, that executes msearch requests for enrich indices in a more efficient manner.
* Currently each search request inside a msearch request is executed as a separate search. If many search requests
* are targeted to the same shards then there is quite some overhead in executing each search request as a separate
* search (multiple search contexts, opening of multiple searchers).
*
* In case for the enrich processor, searches are always targeting the same single shard indices. This action
* handles multi search requests targeting enrich indices more efficiently by executing them in a bulk using the same
* searcher and query shard context.
*
* This action (plus some coordination logic in {@link CoordinatorProxyAction}) can be removed when msearch can
* execute search requests targeted to the same shard more efficiently in a bulk like style.
*
* Note that this 'msearch' implementation only supports executing a query, pagination and source filtering.
* Other search features are not supported, because the enrich processor isn't using these search features.
*/
public class EnrichShardMultiSearchAction extends ActionType<MultiSearchResponse> {
public static final EnrichShardMultiSearchAction INSTANCE = new EnrichShardMultiSearchAction();
private static final String NAME = "indices:data/read/shard_multi_search";
private EnrichShardMultiSearchAction() {
super(NAME, MultiSearchResponse::new);
}
public static class Request extends SingleShardRequest<Request> {
private final MultiSearchRequest multiSearchRequest;
public Request(MultiSearchRequest multiSearchRequest) {
super(multiSearchRequest.requests().get(0).indices()[0]);
this.multiSearchRequest = multiSearchRequest;
assert multiSearchRequest.requests().stream()
.map(SearchRequest::indices)
.flatMap(Arrays::stream)
.distinct()
.count() == 1 : "action [" + NAME + "] cannot handle msearch request pointing to multiple indices";
assert assertSearchSource();
}
public Request(StreamInput in) throws IOException {
super(in);
multiSearchRequest = new MultiSearchRequest(in);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = validateNonNullIndex();
if (index.startsWith(EnrichPolicy.ENRICH_INDEX_NAME_BASE) == false) {
validationException = ValidateActions.addValidationError("index [" + index + "] is not an enrich index",
validationException);
}
return validationException;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
multiSearchRequest.writeTo(out);
}
MultiSearchRequest getMultiSearchRequest() {
return multiSearchRequest;
}
private boolean assertSearchSource() {
for (SearchRequest request : multiSearchRequest.requests()) {
SearchSourceBuilder copy = copy(request.source());
// validate that only a from, size, query and source filtering has been provided (other features are not supported):
// (first unset, what is supported and then see if there is anything left)
copy.query(null);
copy.from(0);
copy.size(10);
copy.fetchSource(null);
assert EMPTY_SOURCE.equals(copy) : "search request [" + Strings.toString(copy) +
"] is using features that is not supported";
}
return true;
}
private SearchSourceBuilder copy(SearchSourceBuilder source) {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
NamedWriteableRegistry registry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
try (BytesStreamOutput output = new BytesStreamOutput()) {
source.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), registry)) {
return new SearchSourceBuilder(in);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static final SearchSourceBuilder EMPTY_SOURCE = new SearchSourceBuilder()
// can't set -1 to indicate not specified
.from(0).size(10);
}
public static class TransportAction extends TransportSingleShardAction<Request, MultiSearchResponse> {
private final IndicesService indicesService;
@Inject
public TransportAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, ThreadPool.Names.SEARCH);
this.indicesService = indicesService;
}
@Override
protected Writeable.Reader<MultiSearchResponse> getResponseReader() {
return MultiSearchResponse::new;
}
@Override
protected boolean resolveIndex(Request request) {
return true;
}
@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
String index = request.concreteIndex();
IndexRoutingTable indexRouting = state.routingTable().index(index);
int numShards = indexRouting.shards().size();
if (numShards != 1) {
throw new IllegalStateException("index [" + index + "] should have 1 shard, but has " + numShards + " shards");
}
return indexRouting.shard(0).shardsRandomIt();
}
@Override
protected MultiSearchResponse shardOperation(Request request, ShardId shardId) throws IOException {
final long nowInMillis = System.currentTimeMillis();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard indexShard = indicesService.getShardOrNull(shardId);
try (Engine.Searcher searcher = indexShard.acquireSearcher("enrich_msearch")) {
final FieldsVisitor visitor = new FieldsVisitor(true);
final QueryShardContext context =
indexService.newQueryShardContext(shardId.id(), searcher.getIndexReader(), () -> nowInMillis, null);
final MapperService mapperService = context.getMapperService();
final Text typeText = mapperService.documentMapper().typeText();
final MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[request.multiSearchRequest.requests().size()];
for (int i = 0; i < request.multiSearchRequest.requests().size(); i++) {
final SearchSourceBuilder searchSourceBuilder = request.multiSearchRequest.requests().get(i).source();
final QueryBuilder queryBuilder = searchSourceBuilder.query();
final int from = searchSourceBuilder.from();
final int size = searchSourceBuilder.size();
final FetchSourceContext fetchSourceContext = searchSourceBuilder.fetchSource();
final Query luceneQuery = queryBuilder.rewrite(context).toQuery(context);
final int n = from + size;
final TopDocs topDocs = searcher.search(luceneQuery, n, new Sort(SortField.FIELD_DOC));
final SearchHit[] hits = new SearchHit[topDocs.scoreDocs.length];
for (int j = 0; j < topDocs.scoreDocs.length; j++) {
final ScoreDoc scoreDoc = topDocs.scoreDocs[j];
visitor.reset();
searcher.doc(scoreDoc.doc, visitor);
visitor.postProcess(mapperService);
final SearchHit hit = new SearchHit(scoreDoc.doc, visitor.uid().id(), typeText, Collections.emptyMap());
hit.sourceRef(filterSource(fetchSourceContext, visitor.source()));
hits[j] = hit;
}
items[i] = new MultiSearchResponse.Item(createSearchResponse(topDocs, hits), null);
}
return new MultiSearchResponse(items, 1L);
}
}
}
private static BytesReference filterSource(FetchSourceContext fetchSourceContext, BytesReference source) throws IOException {
Set<String> includes = new HashSet<>(Arrays.asList(fetchSourceContext.includes()));
Set<String> excludes = new HashSet<>(Arrays.asList(fetchSourceContext.excludes()));
XContentBuilder builder =
new XContentBuilder(XContentType.SMILE.xContent(), new BytesStreamOutput(source.length()), includes, excludes);
XContentParser sourceParser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, source, XContentType.SMILE);
builder.copyCurrentStructure(sourceParser);
return BytesReference.bytes(builder);
}
private static SearchResponse createSearchResponse(TopDocs topDocs, SearchHit[] hits) {
SearchHits searchHits = new SearchHits(hits, topDocs.totalHits, 0);
return new SearchResponse(
new InternalSearchResponse(searchHits, null, null, null, false, null, 0),
null, 1, 1, 0, 1L, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY
);
}
}

View File

@ -86,6 +86,49 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
}
}
public void testMultiplePolicies() {
int numPolicies = 8;
for (int i = 0; i < numPolicies; i++) {
String policyName = "policy" + i;
IndexRequest indexRequest = new IndexRequest("source-" + i);
indexRequest.source("key", "key", "value", "val" + i);
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet();
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null,
Collections.singletonList("source-" + i), "key", Collections.singletonList("value"));
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
String pipelineName = "pipeline" + i;
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
"\", \"enrich_values\": [{\"source\": \"value\", \"target\": \"value\"}" +
"]}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
}
BulkRequest bulkRequest = new BulkRequest("my-index");
for (int i = 0; i < numPolicies; i++) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.id(Integer.toString(i));
indexRequest.setPipeline("pipeline" + i);
indexRequest.source(Collections.singletonMap("key", "key"));
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
for (int i = 0; i < numPolicies; i++) {
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
assertThat(source.size(), equalTo(2));
assertThat(source.get("value"), equalTo("val" + i));
}
}
private List<String> createSourceIndex(int numDocs) {
Set<String> keys = new HashSet<>();
for (int i = 0; i < numDocs; i++) {

View File

@ -7,22 +7,34 @@ package org.elasticsearch.xpack.enrich.action;
import org.apache.logging.log4j.util.BiConsumer;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction.Coordinator;
@ -207,6 +219,78 @@ public class CoordinatorTests extends ESTestCase {
assertThat(lookupFunction.capturedRequests.get(1).requests().get(0), sameInstance(searchRequest));
}
public void testLookupFunction() {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
List<String> indices = Arrays.asList("index1", "index2", "index3");
for (String index : indices) {
multiSearchRequest.add(new SearchRequest(index));
multiSearchRequest.add(new SearchRequest(index));
}
List<EnrichShardMultiSearchAction.Request> requests = new ArrayList<>();
ElasticsearchClient client = new ElasticsearchClient() {
@Override
public <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
ActionType<Response> action, Request request) {
throw new UnsupportedOperationException();
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,
Request request,
ActionListener<Response> listener) {
requests.add((EnrichShardMultiSearchAction.Request) request);
}
@Override
public ThreadPool threadPool() {
throw new UnsupportedOperationException();
}
};
BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> consumer = Coordinator.lookupFunction(client);
consumer.accept(multiSearchRequest, null);
assertThat(requests.size(), equalTo(indices.size()));
requests.sort(Comparator.comparing(SingleShardRequest::index));
for (int i = 0; i < indices.size(); i++) {
String index = indices.get(i);
assertThat(requests.get(i).index(), equalTo(index));
assertThat(requests.get(i).getMultiSearchRequest().requests().size(), equalTo(2));
assertThat(requests.get(i).getMultiSearchRequest().requests().get(0).indices().length, equalTo(1));
assertThat(requests.get(i).getMultiSearchRequest().requests().get(0).indices()[0], equalTo(index));
}
}
public void testReduce() {
Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex = new HashMap<>();
Map<String, Tuple<MultiSearchResponse, Exception>> shardResponses = new HashMap<>();
MultiSearchResponse.Item item1 = new MultiSearchResponse.Item(emptySearchResponse(), null);
itemsPerIndex.put("index1", Arrays.asList(new Tuple<>(0, null), new Tuple<>(1, null), new Tuple<>(2, null)));
shardResponses.put("index1", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[]{item1, item1, item1}, 1), null));
Exception failure = new RuntimeException();
itemsPerIndex.put("index2", Arrays.asList(new Tuple<>(3, null), new Tuple<>(4, null), new Tuple<>(5, null)));
shardResponses.put("index2", new Tuple<>(null, failure));
MultiSearchResponse.Item item2 = new MultiSearchResponse.Item(emptySearchResponse(), null);
itemsPerIndex.put("index3", Arrays.asList(new Tuple<>(6, null), new Tuple<>(7, null), new Tuple<>(8, null)));
shardResponses.put("index3", new Tuple<>(new MultiSearchResponse(new MultiSearchResponse.Item[]{item2, item2, item2}, 1), null));
MultiSearchResponse result = Coordinator.reduce(9, itemsPerIndex, shardResponses);
assertThat(result.getResponses().length, equalTo(9));
assertThat(result.getResponses()[0], sameInstance(item1));
assertThat(result.getResponses()[1], sameInstance(item1));
assertThat(result.getResponses()[2], sameInstance(item1));
assertThat(result.getResponses()[3].getFailure(), sameInstance(failure));
assertThat(result.getResponses()[4].getFailure(), sameInstance(failure));
assertThat(result.getResponses()[5].getFailure(), sameInstance(failure));
assertThat(result.getResponses()[6], sameInstance(item2));
assertThat(result.getResponses()[7], sameInstance(item2));
assertThat(result.getResponses()[8], sameInstance(item2));
}
private static SearchResponse emptySearchResponse() {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);

View File

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.LocalStateEnrich;
import java.util.Collection;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class EnrichShardMultiSearchActionTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(LocalStateEnrich.class);
}
public void testExecute() throws Exception {
XContentBuilder source = XContentBuilder.builder(XContentType.SMILE.xContent());
source.startObject();
source.field("key1", "value1");
source.field("key2", "value2");
source.endObject();
String indexName = EnrichPolicy.ENRICH_INDEX_NAME_BASE + "1";
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.source(source);
client().index(indexRequest).actionGet();
client().admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
int numSearches = randomIntBetween(2, 32);
MultiSearchRequest request = new MultiSearchRequest();
for (int i = 0; i < numSearches; i++) {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source().from(0);
searchRequest.source().size(1);
searchRequest.source().query(new MatchAllQueryBuilder());
searchRequest.source().fetchSource("key1", null);
request.add(searchRequest);
}
MultiSearchResponse result =
client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet();
assertThat(result.getResponses().length, equalTo(numSearches));
for (int i = 0; i < numSearches; i++) {
assertThat(result.getResponses()[i].isFailure(), is(false));
assertThat(result.getResponses()[i].getResponse().getHits().getTotalHits().value, equalTo(1L));
assertThat(result.getResponses()[i].getResponse().getHits().getHits()[0].getSourceAsMap().size(), equalTo(1));
assertThat(result.getResponses()[i].getResponse().getHits().getHits()[0].getSourceAsMap().get("key1"), equalTo("value1"));
}
}
public void testNonEnrichIndex() throws Exception {
createIndex("index");
MultiSearchRequest request = new MultiSearchRequest();
request.add(new SearchRequest("index"));
Exception e = expectThrows(ActionRequestValidationException.class,
() -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet());
assertThat(e.getMessage(), equalTo("Validation Failed: 1: index [index] is not an enrich index;"));
}
public void testMultipleShards() throws Exception {
String indexName = EnrichPolicy.ENRICH_INDEX_NAME_BASE + "1";
createIndex(indexName, Settings.builder().put("index.number_of_shards", 2).build());
MultiSearchRequest request = new MultiSearchRequest();
request.add(new SearchRequest(indexName));
Exception e = expectThrows(IllegalStateException.class,
() -> client().execute(EnrichShardMultiSearchAction.INSTANCE, new EnrichShardMultiSearchAction.Request(request)).actionGet());
assertThat(e.getMessage(), equalTo("index [.enrich-1] should have 1 shard, but has 2 shards"));
}
public void testMultipleIndices() throws Exception {
MultiSearchRequest request = new MultiSearchRequest();
request.add(new SearchRequest("index1"));
request.add(new SearchRequest("index2"));
expectThrows(AssertionError.class, () -> new EnrichShardMultiSearchAction.Request(request));
}
}