From e3fc1638d86e317686c9e2362d05c7d774ca3b06 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Jul 2020 14:58:46 +0200 Subject: [PATCH] Improve error handling in async search code (#57925) - The exception that we caught when failing to schedule a thread was incorrect. - We may have failures when reducing the response before returning it, which were not handled correctly and may have caused get or submit async search task to not be properly unregistered from the task manager - when the completion listener onFailure method is invoked, the search task has to be unregistered. Not doing so may cause the search task to be stuck in the task manager although it has completed. Closes #58995 --- .../xpack/search/AsyncSearchActionIT.java | 5 +- .../xpack/search/AsyncSearchTask.java | 88 ++++++---- .../xpack/search/MutableSearchResponse.java | 97 ++++++----- .../TransportSubmitAsyncSearchAction.java | 52 +++--- .../search/AsyncSearchIntegTestCase.java | 6 +- .../xpack/search/AsyncSearchTaskTests.java | 154 +++++++++++++++++- .../xpack/search/BlockingQueryBuilder.java | 4 +- .../search/CancellingAggregationBuilder.java | 11 -- .../core/async/AsyncTaskIndexService.java | 34 ++-- 9 files changed, 306 insertions(+), 145 deletions(-) diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index 75ae6a0d48f..6ed976807bd 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -259,7 +259,8 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase { assertNotNull(response.getFailure()); assertFalse(response.isRunning()); Exception exc = response.getFailure(); - assertThat(exc.getMessage(), containsString("no such index")); + assertThat(exc.getMessage(), containsString("error while executing search")); + assertThat(exc.getCause().getMessage(), containsString("no such index")); } public void testCancellation() throws Exception { @@ -410,7 +411,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase { AsyncSearchResponse response = submitAsyncSearch(request); assertFalse(response.isRunning()); assertTrue(response.isPartial()); - assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); assertNotNull(response.getFailure()); ensureTaskNotRunning(response.getId()); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 68f82cde794..78bca8de805 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.search; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; @@ -19,7 +22,6 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -164,8 +166,8 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { } /** - * Creates a listener that listens for an {@link AsyncSearchResponse} and executes the - * consumer when the task is finished or when the provided waitForCompletion + * Creates a listener that listens for an {@link AsyncSearchResponse} and notifies the + * listener when the task is finished or when the provided waitForCompletion * timeout occurs. In such case the consumed {@link AsyncSearchResponse} will contain partial results. */ public void addCompletionListener(ActionListener listener, TimeValue waitForCompletion) { @@ -197,13 +199,13 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { * Creates a listener that listens for an {@link AsyncSearchResponse} and executes the * consumer when the task is finished. */ - public void addCompletionListener(Consumer listener) { + public void addCompletionListener(Consumer listener) { boolean executeImmediately = false; synchronized (this) { if (hasCompleted) { executeImmediately = true; } else { - completionListeners.put(completionId++, listener); + this.completionListeners.put(completionId++, listener); } } if (executeImmediately) { @@ -220,27 +222,31 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { // ensure that we consumes the listener only once AtomicBoolean hasRun = new AtomicBoolean(false); long id = completionId++; - final Cancellable cancellable; try { - cancellable = threadPool.schedule(() -> { - if (hasRun.compareAndSet(false, true)) { - // timeout occurred before completion - removeCompletionListener(id); - listener.onResponse(getResponseWithHeaders()); - } - }, waitForCompletion, "generic"); - } catch (EsRejectedExecutionException exc) { + cancellable = threadPool.schedule( + () -> { + if (hasRun.compareAndSet(false, true)) { + // timeout occurred before completion + removeCompletionListener(id); + listener.onResponse(getResponseWithHeaders()); + } + }, + waitForCompletion, + "generic"); + } catch(Exception exc) { listener.onFailure(exc); return; } - completionListeners.put(id, resp -> { - if (hasRun.compareAndSet(false, true)) { - // completion occurred before timeout - cancellable.cancel(); - listener.onResponse(resp); - } - }); + completionListeners.put( + id, + resp -> { + if (hasRun.compareAndSet(false, true)) { + // completion occurred before timeout + cancellable.cancel(); + listener.onResponse(resp); + } + }); } } if (executeImmediately) { @@ -284,28 +290,29 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { } private void executeCompletionListeners() { + Map> completionsListenersCopy; synchronized (this) { if (hasCompleted) { return; } hasCompleted = true; + completionsListenersCopy = new HashMap<>(this.completionListeners); + this.completionListeners.clear(); } // we don't need to restore the response headers, they should be included in the current // context since we are called by the search action listener. AsyncSearchResponse finalResponse = getResponse(); - for (Consumer listener : completionListeners.values()) { - listener.accept(finalResponse); + for (Consumer consumer : completionsListenersCopy.values()) { + consumer.accept(finalResponse); } - completionListeners.clear(); + } /** * Returns the current {@link AsyncSearchResponse}. */ private AsyncSearchResponse getResponse() { - assert searchResponse.get() != null; - checkCancellation(); - return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis); + return getResponse(false); } /** @@ -313,9 +320,22 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { * in the local thread context. */ private AsyncSearchResponse getResponseWithHeaders() { - assert searchResponse.get() != null; + return getResponse(true); + } + + private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) { + MutableSearchResponse mutableSearchResponse = searchResponse.get(); + assert mutableSearchResponse != null; checkCancellation(); - return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis); + AsyncSearchResponse asyncSearchResponse; + try { + asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders); + } catch(Exception e) { + ElasticsearchException exception = new ElasticsearchStatusException("Async search: error while reducing partial results", + ExceptionsHelper.status(e), e); + asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception); + } + return asyncSearchResponse; } // checks if the search task should be cancelled @@ -405,12 +425,10 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { @Override public void onFailure(Exception exc) { - if (searchResponse.get() == null) { - // if the failure occurred before calling onListShards - searchResponse.compareAndSet(null, - new MutableSearchResponse(-1, -1, null, threadPool.getThreadContext())); - } - searchResponse.get().updateWithFailure(exc); + // if the failure occurred before calling onListShards + searchResponse.compareAndSet(null, new MutableSearchResponse(-1, -1, null, threadPool.getThreadContext())); + searchResponse.get().updateWithFailure(new ElasticsearchStatusException("error while executing search", + ExceptionsHelper.status(exc), exc)); executeInitListeners(); executeCompletionListeners(); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java index f6a2837fd89..2b499e97eba 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java @@ -52,7 +52,6 @@ class MutableSearchResponse { * The response produced by the search API. Once we receive it we stop * building our own {@linkplain SearchResponse}s when get async search * is called, and instead return this. - * @see #findOrBuildResponse(AsyncSearchTask) */ private SearchResponse finalResponse; private ElasticsearchException failure; @@ -123,24 +122,14 @@ class MutableSearchResponse { * Updates the response with a fatal failure. This method preserves the partial response * received from previous updates */ - synchronized void updateWithFailure(Exception exc) { + synchronized void updateWithFailure(ElasticsearchException exc) { failIfFrozen(); // copy the response headers from the current context this.responseHeaders = threadContext.getResponseHeaders(); //note that when search fails, we may have gotten partial results before the failure. In that case async // search will return an error plus the last partial results that were collected. this.isPartial = true; - ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(exc); - if (rootCauses == null || rootCauses.length == 0) { - this.failure = new ElasticsearchException(exc.getMessage(), exc) { - @Override - protected String getExceptionName() { - return getExceptionName(getCause()); - } - }; - } else { - this.failure = rootCauses[0]; - } + this.failure = exc; this.frozen = true; } @@ -154,51 +143,57 @@ class MutableSearchResponse { shardFailures.set(shardIndex, failure); } + private SearchResponse buildResponse(long taskStartTimeNanos, InternalAggregations reducedAggs) { + InternalSearchResponse internal = new InternalSearchResponse( + new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase); + long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - taskStartTimeNanos).getMillis(); + return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards, + tookInMillis, buildShardFailures(), clusters); + } + /** * Creates an {@link AsyncSearchResponse} based on the current state of the mutable response. * The final reduce of the aggregations is executed if needed (partial response). * This method is synchronized to ensure that we don't perform final reduces concurrently. + * This method also restores the response headers in the current thread context when requested, if the final response is available. */ - synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, long expirationTime) { - return new AsyncSearchResponse(task.getExecutionId().getEncoded(), findOrBuildResponse(task), - failure, isPartial, frozen == false, task.getStartTime(), expirationTime); - } - - private SearchResponse findOrBuildResponse(AsyncSearchTask task) { - if (finalResponse != null) { - // We have a final response, use it. - return finalResponse; - } - if (clusters == null) { - // An error occurred before we got the shard list - return null; - } - /* - * Build the response, reducing aggs if we haven't already and - * storing the result of the reduction so we won't have to reduce - * the same aggregation results a second time if nothing has changed. - * This does cost memory because we have a reference to the finally - * reduced aggs sitting around which can't be GCed until we get an update. - */ - InternalAggregations reducedAggs = reducedAggsSource.get(); - reducedAggsSource = () -> reducedAggs; - InternalSearchResponse internal = new InternalSearchResponse( - new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase); - long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - task.getStartTimeNanos()).getMillis(); - return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards, - tookInMillis, buildShardFailures(), clusters); - } - - /** - * Creates an {@link AsyncSearchResponse} based on the current state of the mutable response. - * This method also restores the response headers in the current thread context if the final response is available. - */ - synchronized AsyncSearchResponse toAsyncSearchResponseWithHeaders(AsyncSearchTask task, long expirationTime) { - AsyncSearchResponse resp = toAsyncSearchResponse(task, expirationTime); - if (responseHeaders != null) { + synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, + long expirationTime, + boolean restoreResponseHeaders) { + if (restoreResponseHeaders && responseHeaders != null) { restoreResponseHeadersContext(threadContext, responseHeaders); } - return resp; + SearchResponse searchResponse; + if (finalResponse != null) { + // We have a final response, use it. + searchResponse = finalResponse; + } else if (clusters == null) { + // An error occurred before we got the shard list + searchResponse = null; + } else { + /* + * Build the response, reducing aggs if we haven't already and + * storing the result of the reduction so we won't have to reduce + * the same aggregation results a second time if nothing has changed. + * This does cost memory because we have a reference to the finally + * reduced aggs sitting around which can't be GCed until we get an update. + */ + InternalAggregations reducedAggs = reducedAggsSource.get(); + reducedAggsSource = () -> reducedAggs; + searchResponse = buildResponse(task.getStartTimeNanos(), reducedAggs); + } + return new AsyncSearchResponse(task.getExecutionId().getEncoded(), searchResponse, + failure, isPartial, frozen == false, task.getStartTime(), expirationTime); + } + + synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, + long expirationTime, + ElasticsearchException reduceException) { + if (this.failure != null) { + reduceException.addSuppressed(this.failure); + } + return new AsyncSearchResponse(task.getExecutionId().getEncoded(), buildResponse(task.getStartTimeNanos(), null), + reduceException, isPartial, frozen == false, task.getStartTime(), expirationTime); } private void failIfFrozen() { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 9d958b20df2..b31af656e2a 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -96,9 +96,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction - onFinalResponse(searchTask, finalResponse, () -> { - })); + searchTask.addCompletionListener( + finalResponse -> onFinalResponse(searchTask, finalResponse, () -> {})); } finally { submitListener.onResponse(searchResponse); } @@ -110,11 +109,12 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction listener) { + ActionListener listener){ if (shouldCancel && task.isCancelled() == false) { task.cancelTask(() -> { try { @@ -157,7 +160,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction taskManager.unregister(task)); @@ -175,29 +178,24 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getExecutionId()), exc); + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", + searchTask.getExecutionId().getEncoded()), exc); unregisterTaskAndMoveOn(searchTask, nextAction); })); return; - } + } - try { - store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, - ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), - exc -> { - Throwable cause = ExceptionsHelper.unwrapCause(exc); - if (cause instanceof DocumentMissingException == false && - cause instanceof VersionConflictEngineException == false) { - logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", - searchTask.getExecutionId().getEncoded()), exc); - } - unregisterTaskAndMoveOn(searchTask, nextAction); - })); - } catch (Exception exc) { - logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()), - exc); - unregisterTaskAndMoveOn(searchTask, nextAction); - } + store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, + ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), + exc -> { + Throwable cause = ExceptionsHelper.unwrapCause(exc); + if (cause instanceof DocumentMissingException == false && + cause instanceof VersionConflictEngineException == false) { + logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", + searchTask.getExecutionId().getEncoded()), exc); + } + unregisterTaskAndMoveOn(searchTask, nextAction); + })); } private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) { diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 624a697e764..3ac4dfb7624 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -29,6 +28,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; @@ -67,11 +67,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { @Override public List> getQueries() { return Arrays.asList( - new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in), + new QuerySpec<>(BlockingQueryBuilder.NAME, BlockingQueryBuilder::new, p -> { throw new IllegalStateException("not implemented"); }), - new QuerySpec<>(ThrowingQueryBuilder.NAME, in -> new ThrowingQueryBuilder(in), + new QuerySpec<>(ThrowingQueryBuilder.NAME, ThrowingQueryBuilder::new, p -> { throw new IllegalStateException("not implemented"); })); diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index bc58b470b7d..2bec5198826 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -6,17 +6,26 @@ package org.elasticsearch.xpack.search; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchShard; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.io.stream.DelayableWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -34,16 +43,26 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; public class AsyncSearchTaskTests extends ESTestCase { private ThreadPool threadPool; + private boolean throwOnSchedule = false; @Before public void beforeTest() { - threadPool = new TestThreadPool(getTestName()); + threadPool = new TestThreadPool(getTestName()) { + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { + if (throwOnSchedule) { + throw new RuntimeException(); + } + return super.schedule(command, delay, executor); + } + }; } @After @@ -123,6 +142,90 @@ public class AsyncSearchTaskTests extends ESTestCase { latch.await(); } + public void testGetResponseFailureDuringReduction() throws InterruptedException { + AsyncSearchTask task = createAsyncSearchTask(); + task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(), + SearchResponse.Clusters.EMPTY, false); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1, + Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0))); + //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too + //causing an exception when executing getResponse as part of the completion listener callback + DelayableWriteable.Serialized serializedAggs = DelayableWriteable.referencing(aggs) + .asSerialized(InternalAggregations::new, new NamedWriteableRegistry(Collections.emptyList())); + task.getSearchProgressActionListener().onPartialReduce(Collections.emptyList(), new TotalHits(0, TotalHits.Relation.EQUAL_TO), + serializedAggs, 1); + AtomicReference response = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + task.addCompletionListener(new ActionListener() { + @Override + public void onResponse(AsyncSearchResponse asyncSearchResponse) { + assertTrue(response.compareAndSet(null, asyncSearchResponse)); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("onFailure should not be called"); + } + }, TimeValue.timeValueMillis(10L)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertNotNull(response.get().getSearchResponse()); + assertEquals(0, response.get().getSearchResponse().getTotalShards()); + assertEquals(0, response.get().getSearchResponse().getSuccessfulShards()); + assertEquals(0, response.get().getSearchResponse().getFailedShards()); + assertThat(response.get().getFailure(), instanceOf(ElasticsearchException.class)); + assertEquals("Async search: error while reducing partial results", response.get().getFailure().getMessage()); + assertThat(response.get().getFailure().getCause(), instanceOf(IllegalArgumentException.class)); + assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() + "]", + response.get().getFailure().getCause().getMessage()); + } + + public void testWithFailureAndGetResponseFailureDuringReduction() throws InterruptedException { + AsyncSearchTask task = createAsyncSearchTask(); + task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(), + SearchResponse.Clusters.EMPTY, false); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1, + Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0))); + //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too + //causing an exception when executing getResponse as part of the completion listener callback + DelayableWriteable.Serialized serializedAggs = DelayableWriteable.referencing(aggs) + .asSerialized(InternalAggregations::new, new NamedWriteableRegistry(Collections.emptyList())); + task.getSearchProgressActionListener().onPartialReduce(Collections.emptyList(), new TotalHits(0, TotalHits.Relation.EQUAL_TO), + serializedAggs, 1); + task.getSearchProgressActionListener().onFailure(new CircuitBreakingException("boom", CircuitBreaker.Durability.TRANSIENT)); + AtomicReference response = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + task.addCompletionListener(new ActionListener() { + @Override + public void onResponse(AsyncSearchResponse asyncSearchResponse) { + assertTrue(response.compareAndSet(null, asyncSearchResponse)); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("onFailure should not be called"); + } + }, TimeValue.timeValueMillis(10L)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + AsyncSearchResponse asyncSearchResponse = response.get(); + assertNotNull(response.get().getSearchResponse()); + assertEquals(0, response.get().getSearchResponse().getTotalShards()); + assertEquals(0, response.get().getSearchResponse().getSuccessfulShards()); + assertEquals(0, response.get().getSearchResponse().getFailedShards()); + Exception failure = asyncSearchResponse.getFailure(); + assertThat(failure, instanceOf(ElasticsearchException.class)); + assertEquals("Async search: error while reducing partial results", failure.getMessage()); + assertThat(failure.getCause(), instanceOf(IllegalArgumentException.class)); + assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() + + "]", failure.getCause().getMessage()); + assertEquals(1, failure.getSuppressed().length); + assertThat(failure.getSuppressed()[0], instanceOf(ElasticsearchException.class)); + assertEquals("error while executing search", failure.getSuppressed()[0].getMessage()); + assertThat(failure.getSuppressed()[0].getCause(), instanceOf(CircuitBreakingException.class)); + assertEquals("boom", failure.getSuppressed()[0].getCause().getMessage()); + } + public void testWaitForCompletion() throws InterruptedException { AsyncSearchTask task = createAsyncSearchTask(); int numShards = randomIntBetween(0, 10); @@ -237,6 +340,55 @@ public class AsyncSearchTaskTests extends ESTestCase { assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true); } + public void testAddCompletionListenerScheduleErrorWaitForInitListener() throws InterruptedException { + throwOnSchedule = true; + AsyncSearchTask asyncSearchTask = createAsyncSearchTask(); + AtomicReference failure = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + //onListShards has not been executed, then addCompletionListener has to wait for the + // onListShards call and is executed as init listener + asyncSearchTask.addCompletionListener(new ActionListener() { + @Override + public void onResponse(AsyncSearchResponse asyncSearchResponse) { + throw new AssertionError("onResponse should not be called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(failure.compareAndSet(null, e)); + latch.countDown(); + } + }, TimeValue.timeValueMillis(500L)); + asyncSearchTask.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(), + SearchResponse.Clusters.EMPTY, false); + assertTrue(latch.await(1000, TimeUnit.SECONDS)); + assertThat(failure.get(), instanceOf(RuntimeException.class)); + } + + public void testAddCompletionListenerScheduleErrorInitListenerExecutedImmediately() throws InterruptedException { + throwOnSchedule = true; + AsyncSearchTask asyncSearchTask = createAsyncSearchTask(); + asyncSearchTask.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(), + SearchResponse.Clusters.EMPTY, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + //onListShards has already been executed, then addCompletionListener is executed immediately + asyncSearchTask.addCompletionListener(new ActionListener() { + @Override + public void onResponse(AsyncSearchResponse asyncSearchResponse) { + throw new AssertionError("onResponse should not be called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(failure.compareAndSet(null, e)); + latch.countDown(); + } + }, TimeValue.timeValueMillis(500L)); + assertTrue(latch.await(1000, TimeUnit.SECONDS)); + assertThat(failure.get(), instanceOf(RuntimeException.class)); + } + private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards, ShardSearchFailure... failures) { InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(), diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java index d81edbefca9..f51298564b5 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java @@ -129,11 +129,11 @@ class BlockingQueryBuilder extends AbstractQueryBuilder { * is called. */ static class QueryLatch implements Closeable { - private volatile CountDownLatch countDownLatch; private final Set failedShards = new HashSet<>(); + private volatile CountDownLatch countDownLatch; private int numShardFailures; - QueryLatch(int numShardFailures) { + private QueryLatch(int numShardFailures) { this.countDownLatch = new CountDownLatch(1); this.numShardFailures = numShardFailures; } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java index 3606f32ff81..174dab5e4c0 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java @@ -5,12 +5,10 @@ */ package org.elasticsearch.xpack.search; -import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; @@ -72,16 +70,7 @@ public class CancellingAggregationBuilder extends AbstractAggregationBuilder(NAME, false, (args, name) -> new CancellingAggregationBuilder(name, 0L)); - static CancellingAggregationBuilder fromXContent(String aggName, XContentParser parser) { - try { - return PARSER.apply(parser, aggName); - } catch (IllegalArgumentException e) { - throw new ParsingException(parser.getTokenLocation(), e.getMessage(), e); - } - } - @Override - @SuppressWarnings("unchecked") protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent, AggregatorFactories.Builder subfactoriesBuilder) throws IOException { final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 69cbb300cdc..2819a62ee0e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -183,17 +183,21 @@ public final class AsyncTaskIndexService> { * Stores the final response if the place-holder document is still present (update). */ public void updateResponse(String docId, - Map> responseHeaders, - R response, - ActionListener listener) throws IOException { - Map source = new HashMap<>(); - source.put(RESPONSE_HEADERS_FIELD, responseHeaders); - source.put(RESULT_FIELD, encodeResponse(response)); - UpdateRequest request = new UpdateRequest() - .index(index) - .id(docId) - .doc(source, XContentType.JSON); - client.update(request, listener); + Map> responseHeaders, + R response, + ActionListener listener) { + try { + Map source = new HashMap<>(); + source.put(RESPONSE_HEADERS_FIELD, responseHeaders); + source.put(RESULT_FIELD, encodeResponse(response)); + UpdateRequest request = new UpdateRequest() + .index(index) + .id(docId) + .doc(source, XContentType.JSON); + client.update(request, listener); + } catch(Exception e) { + listener.onFailure(e); + } } /** @@ -215,8 +219,12 @@ public final class AsyncTaskIndexService> { */ public void deleteResponse(AsyncExecutionId asyncExecutionId, ActionListener listener) { - DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); - client.delete(request, listener); + try { + DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); + client.delete(request, listener); + } catch(Exception e) { + listener.onFailure(e); + } } /**