mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-03 01:19:10 +00:00
Add enrich coordinator proxy action (#43801)
Introduced proxy api the handle the search request load that originates from enrich processor. The enrich processor can execute many search requests that execute asynchronously in parallel and that can easily overwhelm the search thread pool on nodes. In order to protect this the Coordinator queues the search requests and only executes a fixed number of search requests in parallel. Besides this; the Coordinator tries to include as much as possible search requests (up to a defined maximum) inside a multi search request in order to reduce the number of remote api calls to be made from the node that performs ingestion.
This commit is contained in:
parent
785aedebad
commit
397150fa1e
@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ListEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportDeleteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.action.TransportGetEnrichPolicyAction;
|
||||
@ -62,6 +63,22 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
||||
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING =
|
||||
Setting.intSetting("index.xpack.enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
|
||||
|
||||
public static final Setting<Integer> COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS =
|
||||
Setting.intSetting("enrich.coordinator_proxy.max_concurrent_requests", 8, 1, 10000, Setting.Property.NodeScope);
|
||||
|
||||
public static final Setting<Integer> COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST =
|
||||
Setting.intSetting("enrich.coordinator_proxy.max_lookups_per_request", 128, 1, 10000, Setting.Property.NodeScope);
|
||||
|
||||
private static final String QUEUE_CAPACITY_SETTING_NAME = "enrich.coordinator_proxy.queue_capacity";
|
||||
public static final Setting<Integer> COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting<>(QUEUE_CAPACITY_SETTING_NAME,
|
||||
settings -> {
|
||||
int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings);
|
||||
int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings);
|
||||
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
|
||||
},
|
||||
val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME),
|
||||
Setting.Property.NodeScope);
|
||||
|
||||
private final Settings settings;
|
||||
private final Boolean enabled;
|
||||
|
||||
@ -87,7 +104,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
||||
new ActionHandler<>(DeleteEnrichPolicyAction.INSTANCE, TransportDeleteEnrichPolicyAction.class),
|
||||
new ActionHandler<>(ListEnrichPolicyAction.INSTANCE, TransportListEnrichPolicyAction.class),
|
||||
new ActionHandler<>(PutEnrichPolicyAction.INSTANCE, TransportPutEnrichPolicyAction.class),
|
||||
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class)
|
||||
new ActionHandler<>(ExecuteEnrichPolicyAction.INSTANCE, TransportExecuteEnrichPolicyAction.class),
|
||||
new ActionHandler<>(CoordinatorProxyAction.INSTANCE, CoordinatorProxyAction.TransportAction.class)
|
||||
);
|
||||
}
|
||||
|
||||
@ -115,7 +133,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool,
|
||||
new IndexNameExpressionResolver(), System::currentTimeMillis);
|
||||
return Arrays.asList(enrichPolicyExecutor);
|
||||
return Arrays.asList(enrichPolicyExecutor, new CoordinatorProxyAction.Coordinator(client, settings));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,6 +153,11 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(ENRICH_FETCH_SIZE_SETTING);
|
||||
return Arrays.asList(
|
||||
ENRICH_FETCH_SIZE_SETTING,
|
||||
COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS,
|
||||
COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST,
|
||||
COORDINATOR_PROXY_QUEUE_CAPACITY
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -18,10 +18,10 @@ import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
import org.elasticsearch.xpack.enrich.EnrichProcessorFactory.EnrichSpecification;
|
||||
import org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
final class ExactMatchProcessor extends AbstractProcessor {
|
||||
@ -146,28 +146,13 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
||||
return specifications;
|
||||
}
|
||||
|
||||
// TODO: This is temporary and will be removed once internal transport action that does an efficient lookup instead of a search.
|
||||
// This semaphore purpose is to throttle the number of concurrent search requests, if this is not done then search thread pool
|
||||
// on nodes may get full and search request fail because they get rejected.
|
||||
// Because this code is going to change, a semaphore seemed like an easy quick fix to address this problem.
|
||||
private static final Semaphore SEMAPHORE = new Semaphore(100);
|
||||
|
||||
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
|
||||
return (req, handler) -> {
|
||||
try {
|
||||
SEMAPHORE.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
handler.accept(null, e);
|
||||
return;
|
||||
}
|
||||
client.search(req, ActionListener.wrap(
|
||||
client.execute(CoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
|
||||
resp -> {
|
||||
SEMAPHORE.release();
|
||||
handler.accept(resp, null);
|
||||
},
|
||||
e -> {
|
||||
SEMAPHORE.release();
|
||||
handler.accept(null, e);
|
||||
}));
|
||||
};
|
||||
|
@ -0,0 +1,158 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich.action;
|
||||
|
||||
import org.apache.logging.log4j.util.BiConsumer;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.enrich.EnrichPlugin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* An internal action to locally manage the load of the search requests that originate from the enrich processor.
|
||||
* This is because the enrich processor executes asynchronously and a bulk request could easily overload
|
||||
* the search tp.
|
||||
*/
|
||||
public class CoordinatorProxyAction extends Action<SearchResponse> {
|
||||
|
||||
public static final CoordinatorProxyAction INSTANCE = new CoordinatorProxyAction();
|
||||
public static final String NAME = "indices:data/read/xpack/enrich/coordinate_lookups";
|
||||
|
||||
private CoordinatorProxyAction() {
|
||||
super(NAME, SearchResponse::new);
|
||||
}
|
||||
|
||||
public static class TransportAction extends HandledTransportAction<SearchRequest, SearchResponse> {
|
||||
|
||||
private final Coordinator coordinator;
|
||||
|
||||
@Inject
|
||||
public TransportAction(TransportService transportService, ActionFilters actionFilters, Coordinator coordinator) {
|
||||
super(NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
|
||||
this.coordinator = coordinator;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
|
||||
coordinator.schedule(request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Coordinator {
|
||||
|
||||
final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
|
||||
final int maxLookupsPerRequest;
|
||||
final int maxNumberOfConcurrentRequests;
|
||||
final BlockingQueue<Slot> queue;
|
||||
final AtomicInteger numberOfOutstandingRequests = new AtomicInteger(0);
|
||||
|
||||
public Coordinator(Client client, Settings settings) {
|
||||
this(
|
||||
(request, consumer) -> client.multiSearch(request,
|
||||
ActionListener.wrap(response -> consumer.accept(response, null), e -> consumer.accept(null, e))),
|
||||
EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings),
|
||||
EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings),
|
||||
EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.get(settings)
|
||||
);
|
||||
}
|
||||
|
||||
Coordinator(BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction,
|
||||
int maxLookupsPerRequest, int maxNumberOfConcurrentRequests, int queueCapacity) {
|
||||
this.lookupFunction = lookupFunction;
|
||||
this.maxLookupsPerRequest = maxLookupsPerRequest;
|
||||
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
|
||||
this.queue = new ArrayBlockingQueue<>(queueCapacity);
|
||||
}
|
||||
|
||||
void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
|
||||
// Use put(...), because if queue is full then this method will wait until a free slot becomes available
|
||||
// The calling thread here is a write thread (write tp is used by ingest) and
|
||||
// this will create natural back pressure from the enrich processor.
|
||||
// If there are no write threads available then write requests with ingestion will fail with 429 error code.
|
||||
try {
|
||||
queue.put(new Slot(searchRequest, listener));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("unable to add item to queue", e);
|
||||
}
|
||||
coordinateLookups();
|
||||
}
|
||||
|
||||
synchronized void coordinateLookups() {
|
||||
while (queue.isEmpty() == false &&
|
||||
numberOfOutstandingRequests.get() < maxNumberOfConcurrentRequests) {
|
||||
|
||||
final List<Slot> slots = new ArrayList<>();
|
||||
queue.drainTo(slots, maxLookupsPerRequest);
|
||||
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
|
||||
slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest));
|
||||
|
||||
numberOfOutstandingRequests.incrementAndGet();
|
||||
lookupFunction.accept(multiSearchRequest, (response, e) -> {
|
||||
handleResponse(slots, response, e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void handleResponse(List<Slot> slots, MultiSearchResponse response, Exception e) {
|
||||
numberOfOutstandingRequests.decrementAndGet();
|
||||
|
||||
if (response != null) {
|
||||
assert slots.size() == response.getResponses().length;
|
||||
for (int i = 0; i < response.getResponses().length; i++) {
|
||||
MultiSearchResponse.Item responseItem = response.getResponses()[i];
|
||||
Slot slot = slots.get(i);
|
||||
|
||||
if (responseItem.isFailure()) {
|
||||
slot.actionListener.onFailure(responseItem.getFailure());
|
||||
} else {
|
||||
slot.actionListener.onResponse(responseItem.getResponse());
|
||||
}
|
||||
}
|
||||
} else if (e != null) {
|
||||
slots.forEach(slot -> slot.actionListener.onFailure(e));
|
||||
} else {
|
||||
throw new AssertionError("no response and no error");
|
||||
}
|
||||
|
||||
// There may be room to for a new request now the numberOfOutstandingRequests has been decreased:
|
||||
coordinateLookups();
|
||||
}
|
||||
|
||||
static class Slot {
|
||||
|
||||
final SearchRequest searchRequest;
|
||||
final ActionListener<SearchResponse> actionListener;
|
||||
|
||||
Slot(SearchRequest searchRequest, ActionListener<SearchResponse> actionListener) {
|
||||
this.searchRequest = Objects.requireNonNull(searchRequest);
|
||||
this.actionListener = Objects.requireNonNull(actionListener);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,228 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich.action;
|
||||
|
||||
import org.apache.logging.log4j.util.BiConsumer;
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.index.query.MatchQueryBuilder;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.xpack.enrich.action.CoordinatorProxyAction.Coordinator;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class CoordinatorTests extends ESTestCase {
|
||||
|
||||
public void testCoordinateLookups() {
|
||||
MockLookupFunction lookupFunction = new MockLookupFunction();
|
||||
Coordinator coordinator = new Coordinator(lookupFunction, 5, 1, 100);
|
||||
|
||||
List<ActionListener<SearchResponse>> searchActionListeners = new ArrayList<>();
|
||||
for (int i = 0; i < 9; i++) {
|
||||
SearchRequest searchRequest = new SearchRequest("my-index");
|
||||
searchRequest.source().query(new MatchQueryBuilder("my_field", String.valueOf(i)));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<SearchResponse> actionListener = Mockito.mock(ActionListener.class);
|
||||
searchActionListeners.add(actionListener);
|
||||
coordinator.queue.add(new Coordinator.Slot(searchRequest, actionListener));
|
||||
}
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest("my-index");
|
||||
searchRequest.source().query(new MatchQueryBuilder("my_field", String.valueOf(10)));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<SearchResponse> actionListener = Mockito.mock(ActionListener.class);
|
||||
searchActionListeners.add(actionListener);
|
||||
coordinator.schedule(searchRequest, actionListener);
|
||||
|
||||
// First batch of search requests have been sent off:
|
||||
// (However still 5 should remain in the queue)
|
||||
assertThat(coordinator.queue.size(), equalTo(5));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
|
||||
|
||||
// Nothing should happen now, because there is an outstanding request and max number of requests has been set to 1:
|
||||
coordinator.coordinateLookups();
|
||||
assertThat(coordinator.queue.size(), equalTo(5));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
|
||||
|
||||
SearchResponse emptyResponse = emptySearchResponse();
|
||||
// Replying a response and that should trigger another coordination round
|
||||
MultiSearchResponse.Item[] responseItems = new MultiSearchResponse.Item[5];
|
||||
for (int i = 0; i < 5; i++) {
|
||||
responseItems[i] = new MultiSearchResponse.Item(emptyResponse, null);
|
||||
}
|
||||
lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null);
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
|
||||
|
||||
// Replying last response, resulting in an empty queue and no outstanding requests.
|
||||
responseItems = new MultiSearchResponse.Item[5];
|
||||
for (int i = 0; i < 5; i++) {
|
||||
responseItems[i] = new MultiSearchResponse.Item(emptyResponse, null);
|
||||
}
|
||||
lookupFunction.capturedConsumers.get(1).accept(new MultiSearchResponse(responseItems, 1L), null);
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
|
||||
|
||||
// All individual action listeners for the search requests should have been invoked:
|
||||
for (ActionListener<SearchResponse> searchActionListener : searchActionListeners) {
|
||||
Mockito.verify(searchActionListener).onResponse(Mockito.eq(emptyResponse));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCoordinateLookupsMultiSearchError() {
|
||||
MockLookupFunction lookupFunction = new MockLookupFunction();
|
||||
Coordinator coordinator = new Coordinator(lookupFunction, 5, 1, 100);
|
||||
|
||||
List<ActionListener<SearchResponse>> searchActionListeners = new ArrayList<>();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
SearchRequest searchRequest = new SearchRequest("my-index");
|
||||
searchRequest.source().query(new MatchQueryBuilder("my_field", String.valueOf(i)));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<SearchResponse> actionListener = Mockito.mock(ActionListener.class);
|
||||
searchActionListeners.add(actionListener);
|
||||
coordinator.queue.add(new Coordinator.Slot(searchRequest, actionListener));
|
||||
}
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest("my-index");
|
||||
searchRequest.source().query(new MatchQueryBuilder("my_field", String.valueOf(5)));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<SearchResponse> actionListener = Mockito.mock(ActionListener.class);
|
||||
searchActionListeners.add(actionListener);
|
||||
coordinator.schedule(searchRequest, actionListener);
|
||||
|
||||
// First batch of search requests have been sent off:
|
||||
// (However still 5 should remain in the queue)
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
|
||||
|
||||
RuntimeException e = new RuntimeException();
|
||||
lookupFunction.capturedConsumers.get(0).accept(null, e);
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
|
||||
|
||||
// All individual action listeners for the search requests should have been invoked:
|
||||
for (ActionListener<SearchResponse> searchActionListener : searchActionListeners) {
|
||||
Mockito.verify(searchActionListener).onFailure(Mockito.eq(e));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCoordinateLookupsMultiSearchItemError() {
|
||||
MockLookupFunction lookupFunction = new MockLookupFunction();
|
||||
Coordinator coordinator = new Coordinator(lookupFunction, 5, 1, 100);
|
||||
|
||||
List<ActionListener<SearchResponse>> searchActionListeners = new ArrayList<>();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
SearchRequest searchRequest = new SearchRequest("my-index");
|
||||
searchRequest.source().query(new MatchQueryBuilder("my_field", String.valueOf(i)));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<SearchResponse> actionListener = Mockito.mock(ActionListener.class);
|
||||
searchActionListeners.add(actionListener);
|
||||
coordinator.queue.add(new Coordinator.Slot(searchRequest, actionListener));
|
||||
}
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest("my-index");
|
||||
searchRequest.source().query(new MatchQueryBuilder("my_field", String.valueOf(5)));
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<SearchResponse> actionListener = Mockito.mock(ActionListener.class);
|
||||
searchActionListeners.add(actionListener);
|
||||
coordinator.schedule(searchRequest, actionListener);
|
||||
|
||||
// First batch of search requests have been sent off:
|
||||
// (However still 5 should remain in the queue)
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
|
||||
assertThat(lookupFunction.capturedRequests.get(0).requests().size(), equalTo(5));
|
||||
|
||||
RuntimeException e = new RuntimeException();
|
||||
// Replying a response and that should trigger another coordination round
|
||||
MultiSearchResponse.Item[] responseItems = new MultiSearchResponse.Item[5];
|
||||
for (int i = 0; i < 5; i++) {
|
||||
responseItems[i] = new MultiSearchResponse.Item(null, e);
|
||||
}
|
||||
lookupFunction.capturedConsumers.get(0).accept(new MultiSearchResponse(responseItems, 1L), null);
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(coordinator.numberOfOutstandingRequests.get(), equalTo(0));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(1));
|
||||
|
||||
// All individual action listeners for the search requests should have been invoked:
|
||||
for (ActionListener<SearchResponse> searchActionListener : searchActionListeners) {
|
||||
Mockito.verify(searchActionListener).onFailure(Mockito.eq(e));
|
||||
}
|
||||
}
|
||||
|
||||
public void testQueueing() throws Exception {
|
||||
MockLookupFunction lookupFunction = new MockLookupFunction();
|
||||
Coordinator coordinator = new Coordinator(lookupFunction, 1, 1, 1);
|
||||
coordinator.queue.add(new Coordinator.Slot(new SearchRequest(), ActionListener.wrap(() -> {})));
|
||||
|
||||
AtomicBoolean completed = new AtomicBoolean(false);
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
Thread t = new Thread(() -> {
|
||||
coordinator.schedule(searchRequest, ActionListener.wrap(() -> {}));
|
||||
completed.set(true);
|
||||
});
|
||||
t.start();
|
||||
assertBusy(() -> {
|
||||
assertThat(t.getState(), equalTo(Thread.State.WAITING));
|
||||
assertThat(completed.get(), is(false));
|
||||
});
|
||||
|
||||
coordinator.coordinateLookups();
|
||||
assertBusy(() -> {
|
||||
assertThat(completed.get(), is(true));
|
||||
});
|
||||
|
||||
lookupFunction.capturedConsumers.get(0).accept(
|
||||
new MultiSearchResponse(new MultiSearchResponse.Item[]{new MultiSearchResponse.Item(emptySearchResponse(), null)}, 1L), null);
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
|
||||
assertThat(lookupFunction.capturedRequests.get(1).requests().get(0), sameInstance(searchRequest));
|
||||
}
|
||||
|
||||
private static SearchResponse emptySearchResponse() {
|
||||
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
|
||||
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
|
||||
return new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
|
||||
}
|
||||
|
||||
private class MockLookupFunction implements BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> {
|
||||
|
||||
private final List<MultiSearchRequest> capturedRequests = new ArrayList<>();
|
||||
private final List<BiConsumer<MultiSearchResponse, Exception>> capturedConsumers = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void accept(MultiSearchRequest multiSearchRequest, BiConsumer<MultiSearchResponse, Exception> consumer) {
|
||||
capturedRequests.add(multiSearchRequest);
|
||||
capturedConsumers.add(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user