diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java
index 75ae6a0d48f..6ed976807bd 100644
--- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java
+++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java
@@ -259,7 +259,8 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
assertNotNull(response.getFailure());
assertFalse(response.isRunning());
Exception exc = response.getFailure();
- assertThat(exc.getMessage(), containsString("no such index"));
+ assertThat(exc.getMessage(), containsString("error while executing search"));
+ assertThat(exc.getCause().getMessage(), containsString("no such index"));
}
public void testCancellation() throws Exception {
@@ -410,7 +411,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
AsyncSearchResponse response = submitAsyncSearch(request);
assertFalse(response.isRunning());
assertTrue(response.isPartial());
- assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
+ assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertNotNull(response.getFailure());
ensureTaskNotRunning(response.getId());
}
diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java
index 68f82cde794..78bca8de805 100644
--- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java
+++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java
@@ -6,6 +6,9 @@
package org.elasticsearch.xpack.search;
import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
@@ -19,7 +22,6 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@@ -164,8 +166,8 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
}
/**
- * Creates a listener that listens for an {@link AsyncSearchResponse} and executes the
- * consumer when the task is finished or when the provided waitForCompletion
+ * Creates a listener that listens for an {@link AsyncSearchResponse} and notifies the
+ * listener when the task is finished or when the provided waitForCompletion
* timeout occurs. In such case the consumed {@link AsyncSearchResponse} will contain partial results.
*/
public void addCompletionListener(ActionListener listener, TimeValue waitForCompletion) {
@@ -197,13 +199,13 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
* Creates a listener that listens for an {@link AsyncSearchResponse} and executes the
* consumer when the task is finished.
*/
- public void addCompletionListener(Consumer listener) {
+ public void addCompletionListener(Consumer listener) {
boolean executeImmediately = false;
synchronized (this) {
if (hasCompleted) {
executeImmediately = true;
} else {
- completionListeners.put(completionId++, listener);
+ this.completionListeners.put(completionId++, listener);
}
}
if (executeImmediately) {
@@ -220,27 +222,31 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
// ensure that we consumes the listener only once
AtomicBoolean hasRun = new AtomicBoolean(false);
long id = completionId++;
-
final Cancellable cancellable;
try {
- cancellable = threadPool.schedule(() -> {
- if (hasRun.compareAndSet(false, true)) {
- // timeout occurred before completion
- removeCompletionListener(id);
- listener.onResponse(getResponseWithHeaders());
- }
- }, waitForCompletion, "generic");
- } catch (EsRejectedExecutionException exc) {
+ cancellable = threadPool.schedule(
+ () -> {
+ if (hasRun.compareAndSet(false, true)) {
+ // timeout occurred before completion
+ removeCompletionListener(id);
+ listener.onResponse(getResponseWithHeaders());
+ }
+ },
+ waitForCompletion,
+ "generic");
+ } catch(Exception exc) {
listener.onFailure(exc);
return;
}
- completionListeners.put(id, resp -> {
- if (hasRun.compareAndSet(false, true)) {
- // completion occurred before timeout
- cancellable.cancel();
- listener.onResponse(resp);
- }
- });
+ completionListeners.put(
+ id,
+ resp -> {
+ if (hasRun.compareAndSet(false, true)) {
+ // completion occurred before timeout
+ cancellable.cancel();
+ listener.onResponse(resp);
+ }
+ });
}
}
if (executeImmediately) {
@@ -284,28 +290,29 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
}
private void executeCompletionListeners() {
+ Map> completionsListenersCopy;
synchronized (this) {
if (hasCompleted) {
return;
}
hasCompleted = true;
+ completionsListenersCopy = new HashMap<>(this.completionListeners);
+ this.completionListeners.clear();
}
// we don't need to restore the response headers, they should be included in the current
// context since we are called by the search action listener.
AsyncSearchResponse finalResponse = getResponse();
- for (Consumer listener : completionListeners.values()) {
- listener.accept(finalResponse);
+ for (Consumer consumer : completionsListenersCopy.values()) {
+ consumer.accept(finalResponse);
}
- completionListeners.clear();
+
}
/**
* Returns the current {@link AsyncSearchResponse}.
*/
private AsyncSearchResponse getResponse() {
- assert searchResponse.get() != null;
- checkCancellation();
- return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
+ return getResponse(false);
}
/**
@@ -313,9 +320,22 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
* in the local thread context.
*/
private AsyncSearchResponse getResponseWithHeaders() {
- assert searchResponse.get() != null;
+ return getResponse(true);
+ }
+
+ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) {
+ MutableSearchResponse mutableSearchResponse = searchResponse.get();
+ assert mutableSearchResponse != null;
checkCancellation();
- return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
+ AsyncSearchResponse asyncSearchResponse;
+ try {
+ asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders);
+ } catch(Exception e) {
+ ElasticsearchException exception = new ElasticsearchStatusException("Async search: error while reducing partial results",
+ ExceptionsHelper.status(e), e);
+ asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception);
+ }
+ return asyncSearchResponse;
}
// checks if the search task should be cancelled
@@ -405,12 +425,10 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
@Override
public void onFailure(Exception exc) {
- if (searchResponse.get() == null) {
- // if the failure occurred before calling onListShards
- searchResponse.compareAndSet(null,
- new MutableSearchResponse(-1, -1, null, threadPool.getThreadContext()));
- }
- searchResponse.get().updateWithFailure(exc);
+ // if the failure occurred before calling onListShards
+ searchResponse.compareAndSet(null, new MutableSearchResponse(-1, -1, null, threadPool.getThreadContext()));
+ searchResponse.get().updateWithFailure(new ElasticsearchStatusException("error while executing search",
+ ExceptionsHelper.status(exc), exc));
executeInitListeners();
executeCompletionListeners();
}
diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java
index f6a2837fd89..2b499e97eba 100644
--- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java
+++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java
@@ -52,7 +52,6 @@ class MutableSearchResponse {
* The response produced by the search API. Once we receive it we stop
* building our own {@linkplain SearchResponse}s when get async search
* is called, and instead return this.
- * @see #findOrBuildResponse(AsyncSearchTask)
*/
private SearchResponse finalResponse;
private ElasticsearchException failure;
@@ -123,24 +122,14 @@ class MutableSearchResponse {
* Updates the response with a fatal failure. This method preserves the partial response
* received from previous updates
*/
- synchronized void updateWithFailure(Exception exc) {
+ synchronized void updateWithFailure(ElasticsearchException exc) {
failIfFrozen();
// copy the response headers from the current context
this.responseHeaders = threadContext.getResponseHeaders();
//note that when search fails, we may have gotten partial results before the failure. In that case async
// search will return an error plus the last partial results that were collected.
this.isPartial = true;
- ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(exc);
- if (rootCauses == null || rootCauses.length == 0) {
- this.failure = new ElasticsearchException(exc.getMessage(), exc) {
- @Override
- protected String getExceptionName() {
- return getExceptionName(getCause());
- }
- };
- } else {
- this.failure = rootCauses[0];
- }
+ this.failure = exc;
this.frozen = true;
}
@@ -154,51 +143,57 @@ class MutableSearchResponse {
shardFailures.set(shardIndex, failure);
}
+ private SearchResponse buildResponse(long taskStartTimeNanos, InternalAggregations reducedAggs) {
+ InternalSearchResponse internal = new InternalSearchResponse(
+ new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase);
+ long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - taskStartTimeNanos).getMillis();
+ return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards,
+ tookInMillis, buildShardFailures(), clusters);
+ }
+
/**
* Creates an {@link AsyncSearchResponse} based on the current state of the mutable response.
* The final reduce of the aggregations is executed if needed (partial response).
* This method is synchronized to ensure that we don't perform final reduces concurrently.
+ * This method also restores the response headers in the current thread context when requested, if the final response is available.
*/
- synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, long expirationTime) {
- return new AsyncSearchResponse(task.getExecutionId().getEncoded(), findOrBuildResponse(task),
- failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
- }
-
- private SearchResponse findOrBuildResponse(AsyncSearchTask task) {
- if (finalResponse != null) {
- // We have a final response, use it.
- return finalResponse;
- }
- if (clusters == null) {
- // An error occurred before we got the shard list
- return null;
- }
- /*
- * Build the response, reducing aggs if we haven't already and
- * storing the result of the reduction so we won't have to reduce
- * the same aggregation results a second time if nothing has changed.
- * This does cost memory because we have a reference to the finally
- * reduced aggs sitting around which can't be GCed until we get an update.
- */
- InternalAggregations reducedAggs = reducedAggsSource.get();
- reducedAggsSource = () -> reducedAggs;
- InternalSearchResponse internal = new InternalSearchResponse(
- new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase);
- long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - task.getStartTimeNanos()).getMillis();
- return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards,
- tookInMillis, buildShardFailures(), clusters);
- }
-
- /**
- * Creates an {@link AsyncSearchResponse} based on the current state of the mutable response.
- * This method also restores the response headers in the current thread context if the final response is available.
- */
- synchronized AsyncSearchResponse toAsyncSearchResponseWithHeaders(AsyncSearchTask task, long expirationTime) {
- AsyncSearchResponse resp = toAsyncSearchResponse(task, expirationTime);
- if (responseHeaders != null) {
+ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
+ long expirationTime,
+ boolean restoreResponseHeaders) {
+ if (restoreResponseHeaders && responseHeaders != null) {
restoreResponseHeadersContext(threadContext, responseHeaders);
}
- return resp;
+ SearchResponse searchResponse;
+ if (finalResponse != null) {
+ // We have a final response, use it.
+ searchResponse = finalResponse;
+ } else if (clusters == null) {
+ // An error occurred before we got the shard list
+ searchResponse = null;
+ } else {
+ /*
+ * Build the response, reducing aggs if we haven't already and
+ * storing the result of the reduction so we won't have to reduce
+ * the same aggregation results a second time if nothing has changed.
+ * This does cost memory because we have a reference to the finally
+ * reduced aggs sitting around which can't be GCed until we get an update.
+ */
+ InternalAggregations reducedAggs = reducedAggsSource.get();
+ reducedAggsSource = () -> reducedAggs;
+ searchResponse = buildResponse(task.getStartTimeNanos(), reducedAggs);
+ }
+ return new AsyncSearchResponse(task.getExecutionId().getEncoded(), searchResponse,
+ failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
+ }
+
+ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
+ long expirationTime,
+ ElasticsearchException reduceException) {
+ if (this.failure != null) {
+ reduceException.addSuppressed(this.failure);
+ }
+ return new AsyncSearchResponse(task.getExecutionId().getEncoded(), buildResponse(task.getStartTimeNanos(), null),
+ reduceException, isPartial, frozen == false, task.getStartTime(), expirationTime);
}
private void failIfFrozen() {
diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java
index 9d958b20df2..b31af656e2a 100644
--- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java
+++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java
@@ -96,9 +96,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction
- onFinalResponse(searchTask, finalResponse, () -> {
- }));
+ searchTask.addCompletionListener(
+ finalResponse -> onFinalResponse(searchTask, finalResponse, () -> {}));
} finally {
submitListener.onResponse(searchResponse);
}
@@ -110,11 +109,12 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction listener) {
+ ActionListener listener){
if (shouldCancel && task.isCancelled() == false) {
task.cancelTask(() -> {
try {
@@ -157,7 +160,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction taskManager.unregister(task));
@@ -175,29 +178,24 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
- logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getExecutionId()), exc);
+ logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]",
+ searchTask.getExecutionId().getEncoded()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
return;
- }
+ }
- try {
- store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
- ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
- exc -> {
- Throwable cause = ExceptionsHelper.unwrapCause(exc);
- if (cause instanceof DocumentMissingException == false &&
- cause instanceof VersionConflictEngineException == false) {
- logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
- searchTask.getExecutionId().getEncoded()), exc);
- }
- unregisterTaskAndMoveOn(searchTask, nextAction);
- }));
- } catch (Exception exc) {
- logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()),
- exc);
- unregisterTaskAndMoveOn(searchTask, nextAction);
- }
+ store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
+ ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
+ exc -> {
+ Throwable cause = ExceptionsHelper.unwrapCause(exc);
+ if (cause instanceof DocumentMissingException == false &&
+ cause instanceof VersionConflictEngineException == false) {
+ logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
+ searchTask.getExecutionId().getEncoded()), exc);
+ }
+ unregisterTaskAndMoveOn(searchTask, nextAction);
+ }));
}
private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) {
diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java
index 624a697e764..3ac4dfb7624 100644
--- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java
+++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java
@@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -29,6 +28,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
@@ -67,11 +67,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
@Override
public List> getQueries() {
return Arrays.asList(
- new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in),
+ new QuerySpec<>(BlockingQueryBuilder.NAME, BlockingQueryBuilder::new,
p -> {
throw new IllegalStateException("not implemented");
}),
- new QuerySpec<>(ThrowingQueryBuilder.NAME, in -> new ThrowingQueryBuilder(in),
+ new QuerySpec<>(ThrowingQueryBuilder.NAME, ThrowingQueryBuilder::new,
p -> {
throw new IllegalStateException("not implemented");
}));
diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java
index bc58b470b7d..2bec5198826 100644
--- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java
+++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java
@@ -6,17 +6,26 @@
package org.elasticsearch.xpack.search;
import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShard;
import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.common.io.stream.DelayableWriteable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.aggregations.BucketOrder;
+import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
@@ -34,16 +43,26 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class AsyncSearchTaskTests extends ESTestCase {
private ThreadPool threadPool;
+ private boolean throwOnSchedule = false;
@Before
public void beforeTest() {
- threadPool = new TestThreadPool(getTestName());
+ threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
+ if (throwOnSchedule) {
+ throw new RuntimeException();
+ }
+ return super.schedule(command, delay, executor);
+ }
+ };
}
@After
@@ -123,6 +142,90 @@ public class AsyncSearchTaskTests extends ESTestCase {
latch.await();
}
+ public void testGetResponseFailureDuringReduction() throws InterruptedException {
+ AsyncSearchTask task = createAsyncSearchTask();
+ task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
+ SearchResponse.Clusters.EMPTY, false);
+ InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
+ Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
+ //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
+ //causing an exception when executing getResponse as part of the completion listener callback
+ DelayableWriteable.Serialized serializedAggs = DelayableWriteable.referencing(aggs)
+ .asSerialized(InternalAggregations::new, new NamedWriteableRegistry(Collections.emptyList()));
+ task.getSearchProgressActionListener().onPartialReduce(Collections.emptyList(), new TotalHits(0, TotalHits.Relation.EQUAL_TO),
+ serializedAggs, 1);
+ AtomicReference response = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ task.addCompletionListener(new ActionListener() {
+ @Override
+ public void onResponse(AsyncSearchResponse asyncSearchResponse) {
+ assertTrue(response.compareAndSet(null, asyncSearchResponse));
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ throw new AssertionError("onFailure should not be called");
+ }
+ }, TimeValue.timeValueMillis(10L));
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertNotNull(response.get().getSearchResponse());
+ assertEquals(0, response.get().getSearchResponse().getTotalShards());
+ assertEquals(0, response.get().getSearchResponse().getSuccessfulShards());
+ assertEquals(0, response.get().getSearchResponse().getFailedShards());
+ assertThat(response.get().getFailure(), instanceOf(ElasticsearchException.class));
+ assertEquals("Async search: error while reducing partial results", response.get().getFailure().getMessage());
+ assertThat(response.get().getFailure().getCause(), instanceOf(IllegalArgumentException.class));
+ assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() + "]",
+ response.get().getFailure().getCause().getMessage());
+ }
+
+ public void testWithFailureAndGetResponseFailureDuringReduction() throws InterruptedException {
+ AsyncSearchTask task = createAsyncSearchTask();
+ task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
+ SearchResponse.Clusters.EMPTY, false);
+ InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
+ Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
+ //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
+ //causing an exception when executing getResponse as part of the completion listener callback
+ DelayableWriteable.Serialized serializedAggs = DelayableWriteable.referencing(aggs)
+ .asSerialized(InternalAggregations::new, new NamedWriteableRegistry(Collections.emptyList()));
+ task.getSearchProgressActionListener().onPartialReduce(Collections.emptyList(), new TotalHits(0, TotalHits.Relation.EQUAL_TO),
+ serializedAggs, 1);
+ task.getSearchProgressActionListener().onFailure(new CircuitBreakingException("boom", CircuitBreaker.Durability.TRANSIENT));
+ AtomicReference response = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ task.addCompletionListener(new ActionListener() {
+ @Override
+ public void onResponse(AsyncSearchResponse asyncSearchResponse) {
+ assertTrue(response.compareAndSet(null, asyncSearchResponse));
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ throw new AssertionError("onFailure should not be called");
+ }
+ }, TimeValue.timeValueMillis(10L));
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ AsyncSearchResponse asyncSearchResponse = response.get();
+ assertNotNull(response.get().getSearchResponse());
+ assertEquals(0, response.get().getSearchResponse().getTotalShards());
+ assertEquals(0, response.get().getSearchResponse().getSuccessfulShards());
+ assertEquals(0, response.get().getSearchResponse().getFailedShards());
+ Exception failure = asyncSearchResponse.getFailure();
+ assertThat(failure, instanceOf(ElasticsearchException.class));
+ assertEquals("Async search: error while reducing partial results", failure.getMessage());
+ assertThat(failure.getCause(), instanceOf(IllegalArgumentException.class));
+ assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() +
+ "]", failure.getCause().getMessage());
+ assertEquals(1, failure.getSuppressed().length);
+ assertThat(failure.getSuppressed()[0], instanceOf(ElasticsearchException.class));
+ assertEquals("error while executing search", failure.getSuppressed()[0].getMessage());
+ assertThat(failure.getSuppressed()[0].getCause(), instanceOf(CircuitBreakingException.class));
+ assertEquals("boom", failure.getSuppressed()[0].getCause().getMessage());
+ }
+
public void testWaitForCompletion() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask();
int numShards = randomIntBetween(0, 10);
@@ -237,6 +340,55 @@ public class AsyncSearchTaskTests extends ESTestCase {
assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true);
}
+ public void testAddCompletionListenerScheduleErrorWaitForInitListener() throws InterruptedException {
+ throwOnSchedule = true;
+ AsyncSearchTask asyncSearchTask = createAsyncSearchTask();
+ AtomicReference failure = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ //onListShards has not been executed, then addCompletionListener has to wait for the
+ // onListShards call and is executed as init listener
+ asyncSearchTask.addCompletionListener(new ActionListener() {
+ @Override
+ public void onResponse(AsyncSearchResponse asyncSearchResponse) {
+ throw new AssertionError("onResponse should not be called");
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ assertTrue(failure.compareAndSet(null, e));
+ latch.countDown();
+ }
+ }, TimeValue.timeValueMillis(500L));
+ asyncSearchTask.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
+ SearchResponse.Clusters.EMPTY, false);
+ assertTrue(latch.await(1000, TimeUnit.SECONDS));
+ assertThat(failure.get(), instanceOf(RuntimeException.class));
+ }
+
+ public void testAddCompletionListenerScheduleErrorInitListenerExecutedImmediately() throws InterruptedException {
+ throwOnSchedule = true;
+ AsyncSearchTask asyncSearchTask = createAsyncSearchTask();
+ asyncSearchTask.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
+ SearchResponse.Clusters.EMPTY, false);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference failure = new AtomicReference<>();
+ //onListShards has already been executed, then addCompletionListener is executed immediately
+ asyncSearchTask.addCompletionListener(new ActionListener() {
+ @Override
+ public void onResponse(AsyncSearchResponse asyncSearchResponse) {
+ throw new AssertionError("onResponse should not be called");
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ assertTrue(failure.compareAndSet(null, e));
+ latch.countDown();
+ }
+ }, TimeValue.timeValueMillis(500L));
+ assertTrue(latch.await(1000, TimeUnit.SECONDS));
+ assertThat(failure.get(), instanceOf(RuntimeException.class));
+ }
+
private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards,
ShardSearchFailure... failures) {
InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(),
diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java
index d81edbefca9..f51298564b5 100644
--- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java
+++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/BlockingQueryBuilder.java
@@ -129,11 +129,11 @@ class BlockingQueryBuilder extends AbstractQueryBuilder {
* is called.
*/
static class QueryLatch implements Closeable {
- private volatile CountDownLatch countDownLatch;
private final Set failedShards = new HashSet<>();
+ private volatile CountDownLatch countDownLatch;
private int numShardFailures;
- QueryLatch(int numShardFailures) {
+ private QueryLatch(int numShardFailures) {
this.countDownLatch = new CountDownLatch(1);
this.numShardFailures = numShardFailures;
}
diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java
index 3606f32ff81..174dab5e4c0 100644
--- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java
+++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/CancellingAggregationBuilder.java
@@ -5,12 +5,10 @@
*/
package org.elasticsearch.xpack.search;
-import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
@@ -72,16 +70,7 @@ public class CancellingAggregationBuilder extends AbstractAggregationBuilder(NAME, false, (args, name) -> new CancellingAggregationBuilder(name, 0L));
- static CancellingAggregationBuilder fromXContent(String aggName, XContentParser parser) {
- try {
- return PARSER.apply(parser, aggName);
- } catch (IllegalArgumentException e) {
- throw new ParsingException(parser.getTokenLocation(), e.getMessage(), e);
- }
- }
-
@Override
- @SuppressWarnings("unchecked")
protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent,
AggregatorFactories.Builder subfactoriesBuilder) throws IOException {
final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery());
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java
index 69cbb300cdc..2819a62ee0e 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java
@@ -183,17 +183,21 @@ public final class AsyncTaskIndexService> {
* Stores the final response if the place-holder document is still present (update).
*/
public void updateResponse(String docId,
- Map> responseHeaders,
- R response,
- ActionListener listener) throws IOException {
- Map source = new HashMap<>();
- source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
- source.put(RESULT_FIELD, encodeResponse(response));
- UpdateRequest request = new UpdateRequest()
- .index(index)
- .id(docId)
- .doc(source, XContentType.JSON);
- client.update(request, listener);
+ Map> responseHeaders,
+ R response,
+ ActionListener listener) {
+ try {
+ Map source = new HashMap<>();
+ source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
+ source.put(RESULT_FIELD, encodeResponse(response));
+ UpdateRequest request = new UpdateRequest()
+ .index(index)
+ .id(docId)
+ .doc(source, XContentType.JSON);
+ client.update(request, listener);
+ } catch(Exception e) {
+ listener.onFailure(e);
+ }
}
/**
@@ -215,8 +219,12 @@ public final class AsyncTaskIndexService> {
*/
public void deleteResponse(AsyncExecutionId asyncExecutionId,
ActionListener listener) {
- DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
- client.delete(request, listener);
+ try {
+ DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
+ client.delete(request, listener);
+ } catch(Exception e) {
+ listener.onFailure(e);
+ }
}
/**