This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
This commit is contained in:
parent
2f561084f0
commit
3b73ce3112
|
@ -19,6 +19,7 @@ import org.elasticsearch.client.ElasticsearchClient;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -65,7 +66,10 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
|
|||
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
// Write tp is expected when executing enrich processor from index / bulk api
|
||||
// Management tp is expected when executing enrich processor from ingest simulate api
|
||||
// Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the
|
||||
// search thread, which could end up here again if there is more than one enrich processor in a pipeline.
|
||||
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|
||||
|| Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH)
|
||||
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
|
||||
coordinator.schedule(request, listener);
|
||||
}
|
||||
|
@ -76,6 +80,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
|
|||
final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
|
||||
final int maxLookupsPerRequest;
|
||||
final int maxNumberOfConcurrentRequests;
|
||||
final int queueCapacity;
|
||||
final BlockingQueue<Slot> queue;
|
||||
final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
|
||||
volatile long remoteRequestsTotal = 0;
|
||||
|
@ -99,21 +104,30 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
|
|||
this.lookupFunction = lookupFunction;
|
||||
this.maxLookupsPerRequest = maxLookupsPerRequest;
|
||||
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
|
||||
this.queueCapacity = queueCapacity;
|
||||
this.queue = new ArrayBlockingQueue<>(queueCapacity);
|
||||
}
|
||||
|
||||
void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
|
||||
// Use put(...), because if queue is full then this method will wait until a free slot becomes available
|
||||
// The calling thread here is a write thread (write tp is used by ingest) and
|
||||
// this will create natural back pressure from the enrich processor.
|
||||
// If there are no write threads available then write requests with ingestion will fail with 429 error code.
|
||||
try {
|
||||
queue.put(new Slot(searchRequest, listener));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("unable to add item to queue", e);
|
||||
}
|
||||
// Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous,
|
||||
// especially since the logic to kick off draining the queue is located right after this section. If we
|
||||
// cannot insert a request to the queue, we should reject the document with a 429 error code.
|
||||
boolean accepted = queue.offer(new Slot(searchRequest, listener));
|
||||
int queueSize = queue.size();
|
||||
|
||||
// Coordinate lookups no matter what, even if queues were full. Search threads should be draining the queue,
|
||||
// but they may be busy with processing the remaining work for enrich results. If there is more than one
|
||||
// enrich processor in a pipeline, those search threads may find themselves here again before they can
|
||||
// coordinate the next set of lookups.
|
||||
coordinateLookups();
|
||||
|
||||
if (accepted == false) {
|
||||
listener.onFailure(
|
||||
new EsRejectedExecutionException(
|
||||
"Could not perform enrichment, " + "enrich coordination queue at capacity [" + queueSize + "/" + queueCapacity + "]"
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
CoordinatorStats getStats(String nodeId) {
|
||||
|
|
|
@ -0,0 +1,288 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.ingest.PutPipelineAction;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||
import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class EnrichResiliencyTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class, LocalStateEnrich.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings() {
|
||||
// Severely throttle the processing throughput to reach max capacity easier
|
||||
return Settings.builder()
|
||||
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1)
|
||||
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1)
|
||||
.put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testWriteThreadLivenessBackToBack() throws Exception {
|
||||
ensureGreen();
|
||||
|
||||
long testSuffix = System.currentTimeMillis();
|
||||
String enrichIndexName = "enrich_lookup_" + testSuffix;
|
||||
String enrichPolicyName = "enrich_policy_" + testSuffix;
|
||||
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
|
||||
String enrichedIndexName = "enrich_results_" + testSuffix;
|
||||
|
||||
client().index(
|
||||
new IndexRequest(enrichIndexName).source(
|
||||
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
|
||||
)
|
||||
).actionGet();
|
||||
|
||||
client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();
|
||||
|
||||
client().execute(
|
||||
PutEnrichPolicyAction.INSTANCE,
|
||||
new PutEnrichPolicyAction.Request(
|
||||
enrichPolicyName,
|
||||
new EnrichPolicy(
|
||||
EnrichPolicy.MATCH_TYPE,
|
||||
null,
|
||||
Collections.singletonList(enrichIndexName),
|
||||
"my_key",
|
||||
Collections.singletonList("my_value")
|
||||
)
|
||||
)
|
||||
).actionGet();
|
||||
|
||||
client().execute(
|
||||
ExecuteEnrichPolicyAction.INSTANCE,
|
||||
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
|
||||
).actionGet();
|
||||
|
||||
XContentBuilder pipe1 = JsonXContent.contentBuilder();
|
||||
pipe1.startObject();
|
||||
{
|
||||
pipe1.startArray("processors");
|
||||
{
|
||||
pipe1.startObject();
|
||||
{
|
||||
pipe1.startObject("enrich");
|
||||
{
|
||||
pipe1.field("policy_name", enrichPolicyName);
|
||||
pipe1.field("field", "custom_id");
|
||||
pipe1.field("target_field", "enrich_value_1");
|
||||
}
|
||||
pipe1.endObject();
|
||||
}
|
||||
pipe1.endObject();
|
||||
pipe1.startObject();
|
||||
{
|
||||
pipe1.startObject("enrich");
|
||||
{
|
||||
pipe1.field("policy_name", enrichPolicyName);
|
||||
pipe1.field("field", "custom_id");
|
||||
pipe1.field("target_field", "enrich_value_2");
|
||||
}
|
||||
pipe1.endObject();
|
||||
}
|
||||
pipe1.endObject();
|
||||
}
|
||||
pipe1.endArray();
|
||||
}
|
||||
pipe1.endObject();
|
||||
|
||||
client().execute(
|
||||
PutPipelineAction.INSTANCE,
|
||||
new PutPipelineRequest(enrichPipelineName, BytesReference.bytes(pipe1), XContentType.JSON)
|
||||
).actionGet();
|
||||
|
||||
client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();
|
||||
|
||||
XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();
|
||||
|
||||
BulkRequest bulk = new BulkRequest(enrichedIndexName);
|
||||
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
|
||||
for (int idx = 0; idx < 50; idx++) {
|
||||
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
|
||||
}
|
||||
|
||||
BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));
|
||||
|
||||
assertTrue(bulkItemResponses.hasFailures());
|
||||
BulkItemResponse.Failure firstFailure = null;
|
||||
int successfulItems = 0;
|
||||
for (BulkItemResponse item : bulkItemResponses.getItems()) {
|
||||
if (item.isFailed() && firstFailure == null) {
|
||||
firstFailure = item.getFailure();
|
||||
} else if (item.isFailed() == false) {
|
||||
successfulItems++;
|
||||
}
|
||||
}
|
||||
assertNotNull(firstFailure);
|
||||
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
|
||||
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
|
||||
|
||||
client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
|
||||
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
|
||||
}
|
||||
|
||||
public void testWriteThreadLivenessWithPipeline() throws Exception {
|
||||
ensureGreen();
|
||||
|
||||
long testSuffix = System.currentTimeMillis();
|
||||
String enrichIndexName = "enrich_lookup_" + testSuffix;
|
||||
String enrichPolicyName = "enrich_policy_" + testSuffix;
|
||||
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
|
||||
String enrichedIndexName = "enrich_results_" + testSuffix;
|
||||
String enrichPipelineName1 = enrichPipelineName + "_1";
|
||||
String enrichPipelineName2 = enrichPipelineName + "_2";
|
||||
|
||||
client().index(
|
||||
new IndexRequest(enrichIndexName).source(
|
||||
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
|
||||
)
|
||||
).actionGet();
|
||||
|
||||
client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();
|
||||
|
||||
client().execute(
|
||||
PutEnrichPolicyAction.INSTANCE,
|
||||
new PutEnrichPolicyAction.Request(
|
||||
enrichPolicyName,
|
||||
new EnrichPolicy(
|
||||
EnrichPolicy.MATCH_TYPE,
|
||||
null,
|
||||
Collections.singletonList(enrichIndexName),
|
||||
"my_key",
|
||||
Collections.singletonList("my_value")
|
||||
)
|
||||
)
|
||||
).actionGet();
|
||||
|
||||
client().execute(
|
||||
ExecuteEnrichPolicyAction.INSTANCE,
|
||||
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
|
||||
).actionGet();
|
||||
|
||||
XContentBuilder pipe1 = JsonXContent.contentBuilder();
|
||||
pipe1.startObject();
|
||||
{
|
||||
pipe1.startArray("processors");
|
||||
{
|
||||
pipe1.startObject();
|
||||
{
|
||||
pipe1.startObject("enrich");
|
||||
{
|
||||
pipe1.field("policy_name", enrichPolicyName);
|
||||
pipe1.field("field", "custom_id");
|
||||
pipe1.field("target_field", "enrich_value_1");
|
||||
}
|
||||
pipe1.endObject();
|
||||
}
|
||||
pipe1.endObject();
|
||||
pipe1.startObject();
|
||||
{
|
||||
pipe1.startObject("pipeline");
|
||||
{
|
||||
pipe1.field("name", enrichPipelineName2);
|
||||
}
|
||||
pipe1.endObject();
|
||||
}
|
||||
pipe1.endObject();
|
||||
}
|
||||
pipe1.endArray();
|
||||
}
|
||||
pipe1.endObject();
|
||||
|
||||
XContentBuilder pipe2 = JsonXContent.contentBuilder();
|
||||
pipe2.startObject();
|
||||
{
|
||||
pipe2.startArray("processors");
|
||||
{
|
||||
pipe2.startObject();
|
||||
{
|
||||
pipe2.startObject("enrich");
|
||||
{
|
||||
pipe2.field("policy_name", enrichPolicyName);
|
||||
pipe2.field("field", "custom_id");
|
||||
pipe2.field("target_field", "enrich_value_2");
|
||||
}
|
||||
pipe2.endObject();
|
||||
}
|
||||
pipe2.endObject();
|
||||
}
|
||||
pipe2.endArray();
|
||||
}
|
||||
pipe2.endObject();
|
||||
|
||||
client().execute(
|
||||
PutPipelineAction.INSTANCE,
|
||||
new PutPipelineRequest(enrichPipelineName1, BytesReference.bytes(pipe1), XContentType.JSON)
|
||||
).actionGet();
|
||||
|
||||
client().execute(
|
||||
PutPipelineAction.INSTANCE,
|
||||
new PutPipelineRequest(enrichPipelineName2, BytesReference.bytes(pipe2), XContentType.JSON)
|
||||
).actionGet();
|
||||
|
||||
client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();
|
||||
|
||||
XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();
|
||||
|
||||
BulkRequest bulk = new BulkRequest(enrichedIndexName);
|
||||
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
|
||||
for (int idx = 0; idx < 50; idx++) {
|
||||
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
|
||||
}
|
||||
|
||||
BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));
|
||||
|
||||
assertTrue(bulkItemResponses.hasFailures());
|
||||
BulkItemResponse.Failure firstFailure = null;
|
||||
int successfulItems = 0;
|
||||
for (BulkItemResponse item : bulkItemResponses.getItems()) {
|
||||
if (item.isFailed() && firstFailure == null) {
|
||||
firstFailure = item.getFailure();
|
||||
} else if (item.isFailed() == false) {
|
||||
successfulItems++;
|
||||
}
|
||||
}
|
||||
assertNotNull(firstFailure);
|
||||
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
|
||||
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
|
||||
|
||||
client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
|
||||
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
|
||||
}
|
||||
}
|
|
@ -35,11 +35,14 @@ import java.util.Comparator;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction.Coordinator;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class CoordinatorTests extends ESTestCase {
|
||||
|
@ -190,31 +193,54 @@ public class CoordinatorTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testQueueing() throws Exception {
|
||||
public void testNoBlockingWhenQueueing() throws Exception {
|
||||
MockLookupFunction lookupFunction = new MockLookupFunction();
|
||||
// Only one request allowed in flight. Queue size maxed at 1.
|
||||
Coordinator coordinator = new Coordinator(lookupFunction, 1, 1, 1);
|
||||
|
||||
// Pre-load the queue to be at capacity and spoof the coordinator state to seem like max requests in flight.
|
||||
coordinator.queue.add(new Coordinator.Slot(new SearchRequest(), ActionListener.wrap(() -> {})));
|
||||
coordinator.remoteRequestsCurrent.incrementAndGet();
|
||||
|
||||
AtomicBoolean completed = new AtomicBoolean(false);
|
||||
// Try to schedule an item into the coordinator, should emit an exception
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
Thread t = new Thread(() -> {
|
||||
coordinator.schedule(searchRequest, ActionListener.wrap(() -> {}));
|
||||
completed.set(true);
|
||||
});
|
||||
t.start();
|
||||
assertBusy(() -> {
|
||||
assertThat(t.getState(), equalTo(Thread.State.WAITING));
|
||||
assertThat(completed.get(), is(false));
|
||||
});
|
||||
final AtomicReference<Exception> capturedException = new AtomicReference<>();
|
||||
coordinator.schedule(searchRequest, ActionListener.wrap(response -> {}, capturedException::set));
|
||||
|
||||
coordinator.coordinateLookups();
|
||||
assertBusy(() -> { assertThat(completed.get(), is(true)); });
|
||||
// Ensure rejection since queue is full
|
||||
Exception rejectionException = capturedException.get();
|
||||
assertThat(rejectionException.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
|
||||
|
||||
// Ensure that nothing was scheduled because max requests is already in flight
|
||||
assertThat(lookupFunction.capturedConsumers, is(empty()));
|
||||
|
||||
// Try to schedule again while max requests is not full. Ensure that despite the rejection, the queued request is sent.
|
||||
coordinator.remoteRequestsCurrent.decrementAndGet();
|
||||
capturedException.set(null);
|
||||
coordinator.schedule(searchRequest, ActionListener.wrap(response -> {}, capturedException::set));
|
||||
rejectionException = capturedException.get();
|
||||
assertThat(rejectionException.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
|
||||
assertThat(lookupFunction.capturedRequests.size(), is(1));
|
||||
assertThat(lookupFunction.capturedConsumers.size(), is(1));
|
||||
|
||||
// Schedule once more now, the queue should be able to accept the item, but will not schedule it yet
|
||||
capturedException.set(null);
|
||||
coordinator.schedule(searchRequest, ActionListener.wrap(response -> {}, capturedException::set));
|
||||
rejectionException = capturedException.get();
|
||||
assertThat(rejectionException, is(nullValue()));
|
||||
assertThat(coordinator.queue.size(), is(1));
|
||||
assertThat(coordinator.remoteRequestsCurrent.get(), is(1));
|
||||
assertThat(lookupFunction.capturedRequests.size(), is(1));
|
||||
assertThat(lookupFunction.capturedConsumers.size(), is(1));
|
||||
|
||||
// Fulfill the captured consumer which will schedule the next item in the queue.
|
||||
lookupFunction.capturedConsumers.get(0)
|
||||
.accept(
|
||||
new MultiSearchResponse(new MultiSearchResponse.Item[] { new MultiSearchResponse.Item(emptySearchResponse(), null) }, 1L),
|
||||
null
|
||||
);
|
||||
|
||||
// Ensure queue was drained and that the item in it was scheduled
|
||||
assertThat(coordinator.queue.size(), equalTo(0));
|
||||
assertThat(lookupFunction.capturedRequests.size(), equalTo(2));
|
||||
assertThat(lookupFunction.capturedRequests.get(1).requests().get(0), sameInstance(searchRequest));
|
||||
|
|
Loading…
Reference in New Issue