Improve error handling in async search code (#57925)

- The exception that we caught when failing to schedule a thread was incorrect.
- We may have failures when reducing the response before returning it, which were not handled correctly and may have caused get or submit async search task to not be properly unregistered from the task manager
- when the completion listener onFailure method is invoked, the search task has to be unregistered. Not doing so may cause the search task to be stuck in the task manager although it has completed.

Closes #58995
This commit is contained in:
Luca Cavanna 2020-07-03 14:58:46 +02:00
parent ca3da7af85
commit e3fc1638d8
9 changed files with 306 additions and 145 deletions

View File

@ -259,7 +259,8 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
assertNotNull(response.getFailure()); assertNotNull(response.getFailure());
assertFalse(response.isRunning()); assertFalse(response.isRunning());
Exception exc = response.getFailure(); Exception exc = response.getFailure();
assertThat(exc.getMessage(), containsString("no such index")); assertThat(exc.getMessage(), containsString("error while executing search"));
assertThat(exc.getCause().getMessage(), containsString("no such index"));
} }
public void testCancellation() throws Exception { public void testCancellation() throws Exception {
@ -410,7 +411,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
AsyncSearchResponse response = submitAsyncSearch(request); AsyncSearchResponse response = submitAsyncSearch(request);
assertFalse(response.isRunning()); assertFalse(response.isRunning());
assertTrue(response.isPartial()); assertTrue(response.isPartial());
assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertNotNull(response.getFailure()); assertNotNull(response.getFailure());
ensureTaskNotRunning(response.getId()); ensureTaskNotRunning(response.getId());
} }

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.search; package org.elasticsearch.xpack.search;
import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
@ -19,7 +22,6 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
@ -164,8 +166,8 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
} }
/** /**
* Creates a listener that listens for an {@link AsyncSearchResponse} and executes the * Creates a listener that listens for an {@link AsyncSearchResponse} and notifies the
* consumer when the task is finished or when the provided <code>waitForCompletion</code> * listener when the task is finished or when the provided <code>waitForCompletion</code>
* timeout occurs. In such case the consumed {@link AsyncSearchResponse} will contain partial results. * timeout occurs. In such case the consumed {@link AsyncSearchResponse} will contain partial results.
*/ */
public void addCompletionListener(ActionListener<AsyncSearchResponse> listener, TimeValue waitForCompletion) { public void addCompletionListener(ActionListener<AsyncSearchResponse> listener, TimeValue waitForCompletion) {
@ -203,7 +205,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
if (hasCompleted) { if (hasCompleted) {
executeImmediately = true; executeImmediately = true;
} else { } else {
completionListeners.put(completionId++, listener); this.completionListeners.put(completionId++, listener);
} }
} }
if (executeImmediately) { if (executeImmediately) {
@ -220,21 +222,25 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
// ensure that we consumes the listener only once // ensure that we consumes the listener only once
AtomicBoolean hasRun = new AtomicBoolean(false); AtomicBoolean hasRun = new AtomicBoolean(false);
long id = completionId++; long id = completionId++;
final Cancellable cancellable; final Cancellable cancellable;
try { try {
cancellable = threadPool.schedule(() -> { cancellable = threadPool.schedule(
() -> {
if (hasRun.compareAndSet(false, true)) { if (hasRun.compareAndSet(false, true)) {
// timeout occurred before completion // timeout occurred before completion
removeCompletionListener(id); removeCompletionListener(id);
listener.onResponse(getResponseWithHeaders()); listener.onResponse(getResponseWithHeaders());
} }
}, waitForCompletion, "generic"); },
} catch (EsRejectedExecutionException exc) { waitForCompletion,
"generic");
} catch(Exception exc) {
listener.onFailure(exc); listener.onFailure(exc);
return; return;
} }
completionListeners.put(id, resp -> { completionListeners.put(
id,
resp -> {
if (hasRun.compareAndSet(false, true)) { if (hasRun.compareAndSet(false, true)) {
// completion occurred before timeout // completion occurred before timeout
cancellable.cancel(); cancellable.cancel();
@ -284,28 +290,29 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
} }
private void executeCompletionListeners() { private void executeCompletionListeners() {
Map<Long, Consumer<AsyncSearchResponse>> completionsListenersCopy;
synchronized (this) { synchronized (this) {
if (hasCompleted) { if (hasCompleted) {
return; return;
} }
hasCompleted = true; hasCompleted = true;
completionsListenersCopy = new HashMap<>(this.completionListeners);
this.completionListeners.clear();
} }
// we don't need to restore the response headers, they should be included in the current // we don't need to restore the response headers, they should be included in the current
// context since we are called by the search action listener. // context since we are called by the search action listener.
AsyncSearchResponse finalResponse = getResponse(); AsyncSearchResponse finalResponse = getResponse();
for (Consumer<AsyncSearchResponse> listener : completionListeners.values()) { for (Consumer<AsyncSearchResponse> consumer : completionsListenersCopy.values()) {
listener.accept(finalResponse); consumer.accept(finalResponse);
} }
completionListeners.clear();
} }
/** /**
* Returns the current {@link AsyncSearchResponse}. * Returns the current {@link AsyncSearchResponse}.
*/ */
private AsyncSearchResponse getResponse() { private AsyncSearchResponse getResponse() {
assert searchResponse.get() != null; return getResponse(false);
checkCancellation();
return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
} }
/** /**
@ -313,9 +320,22 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
* in the local thread context. * in the local thread context.
*/ */
private AsyncSearchResponse getResponseWithHeaders() { private AsyncSearchResponse getResponseWithHeaders() {
assert searchResponse.get() != null; return getResponse(true);
}
private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) {
MutableSearchResponse mutableSearchResponse = searchResponse.get();
assert mutableSearchResponse != null;
checkCancellation(); checkCancellation();
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis); AsyncSearchResponse asyncSearchResponse;
try {
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders);
} catch(Exception e) {
ElasticsearchException exception = new ElasticsearchStatusException("Async search: error while reducing partial results",
ExceptionsHelper.status(e), e);
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception);
}
return asyncSearchResponse;
} }
// checks if the search task should be cancelled // checks if the search task should be cancelled
@ -405,12 +425,10 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
@Override @Override
public void onFailure(Exception exc) { public void onFailure(Exception exc) {
if (searchResponse.get() == null) {
// if the failure occurred before calling onListShards // if the failure occurred before calling onListShards
searchResponse.compareAndSet(null, searchResponse.compareAndSet(null, new MutableSearchResponse(-1, -1, null, threadPool.getThreadContext()));
new MutableSearchResponse(-1, -1, null, threadPool.getThreadContext())); searchResponse.get().updateWithFailure(new ElasticsearchStatusException("error while executing search",
} ExceptionsHelper.status(exc), exc));
searchResponse.get().updateWithFailure(exc);
executeInitListeners(); executeInitListeners();
executeCompletionListeners(); executeCompletionListeners();
} }

View File

@ -52,7 +52,6 @@ class MutableSearchResponse {
* The response produced by the search API. Once we receive it we stop * The response produced by the search API. Once we receive it we stop
* building our own {@linkplain SearchResponse}s when get async search * building our own {@linkplain SearchResponse}s when get async search
* is called, and instead return this. * is called, and instead return this.
* @see #findOrBuildResponse(AsyncSearchTask)
*/ */
private SearchResponse finalResponse; private SearchResponse finalResponse;
private ElasticsearchException failure; private ElasticsearchException failure;
@ -123,24 +122,14 @@ class MutableSearchResponse {
* Updates the response with a fatal failure. This method preserves the partial response * Updates the response with a fatal failure. This method preserves the partial response
* received from previous updates * received from previous updates
*/ */
synchronized void updateWithFailure(Exception exc) { synchronized void updateWithFailure(ElasticsearchException exc) {
failIfFrozen(); failIfFrozen();
// copy the response headers from the current context // copy the response headers from the current context
this.responseHeaders = threadContext.getResponseHeaders(); this.responseHeaders = threadContext.getResponseHeaders();
//note that when search fails, we may have gotten partial results before the failure. In that case async //note that when search fails, we may have gotten partial results before the failure. In that case async
// search will return an error plus the last partial results that were collected. // search will return an error plus the last partial results that were collected.
this.isPartial = true; this.isPartial = true;
ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(exc); this.failure = exc;
if (rootCauses == null || rootCauses.length == 0) {
this.failure = new ElasticsearchException(exc.getMessage(), exc) {
@Override
protected String getExceptionName() {
return getExceptionName(getCause());
}
};
} else {
this.failure = rootCauses[0];
}
this.frozen = true; this.frozen = true;
} }
@ -154,25 +143,34 @@ class MutableSearchResponse {
shardFailures.set(shardIndex, failure); shardFailures.set(shardIndex, failure);
} }
private SearchResponse buildResponse(long taskStartTimeNanos, InternalAggregations reducedAggs) {
InternalSearchResponse internal = new InternalSearchResponse(
new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase);
long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - taskStartTimeNanos).getMillis();
return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards,
tookInMillis, buildShardFailures(), clusters);
}
/** /**
* Creates an {@link AsyncSearchResponse} based on the current state of the mutable response. * Creates an {@link AsyncSearchResponse} based on the current state of the mutable response.
* The final reduce of the aggregations is executed if needed (partial response). * The final reduce of the aggregations is executed if needed (partial response).
* This method is synchronized to ensure that we don't perform final reduces concurrently. * This method is synchronized to ensure that we don't perform final reduces concurrently.
* This method also restores the response headers in the current thread context when requested, if the final response is available.
*/ */
synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, long expirationTime) { synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
return new AsyncSearchResponse(task.getExecutionId().getEncoded(), findOrBuildResponse(task), long expirationTime,
failure, isPartial, frozen == false, task.getStartTime(), expirationTime); boolean restoreResponseHeaders) {
if (restoreResponseHeaders && responseHeaders != null) {
restoreResponseHeadersContext(threadContext, responseHeaders);
} }
SearchResponse searchResponse;
private SearchResponse findOrBuildResponse(AsyncSearchTask task) {
if (finalResponse != null) { if (finalResponse != null) {
// We have a final response, use it. // We have a final response, use it.
return finalResponse; searchResponse = finalResponse;
} } else if (clusters == null) {
if (clusters == null) {
// An error occurred before we got the shard list // An error occurred before we got the shard list
return null; searchResponse = null;
} } else {
/* /*
* Build the response, reducing aggs if we haven't already and * Build the response, reducing aggs if we haven't already and
* storing the result of the reduction so we won't have to reduce * storing the result of the reduction so we won't have to reduce
@ -182,23 +180,20 @@ class MutableSearchResponse {
*/ */
InternalAggregations reducedAggs = reducedAggsSource.get(); InternalAggregations reducedAggs = reducedAggsSource.get();
reducedAggsSource = () -> reducedAggs; reducedAggsSource = () -> reducedAggs;
InternalSearchResponse internal = new InternalSearchResponse( searchResponse = buildResponse(task.getStartTimeNanos(), reducedAggs);
new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase); }
long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - task.getStartTimeNanos()).getMillis(); return new AsyncSearchResponse(task.getExecutionId().getEncoded(), searchResponse,
return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards, failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
tookInMillis, buildShardFailures(), clusters);
} }
/** synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
* Creates an {@link AsyncSearchResponse} based on the current state of the mutable response. long expirationTime,
* This method also restores the response headers in the current thread context if the final response is available. ElasticsearchException reduceException) {
*/ if (this.failure != null) {
synchronized AsyncSearchResponse toAsyncSearchResponseWithHeaders(AsyncSearchTask task, long expirationTime) { reduceException.addSuppressed(this.failure);
AsyncSearchResponse resp = toAsyncSearchResponse(task, expirationTime);
if (responseHeaders != null) {
restoreResponseHeadersContext(threadContext, responseHeaders);
} }
return resp; return new AsyncSearchResponse(task.getExecutionId().getEncoded(), buildResponse(task.getStartTimeNanos(), null),
reduceException, isPartial, frozen == false, task.getStartTime(), expirationTime);
} }
private void failIfFrozen() { private void failIfFrozen() {

View File

@ -96,9 +96,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
if (searchResponse.isRunning()) { if (searchResponse.isRunning()) {
try { try {
// store the final response on completion unless the submit is cancelled // store the final response on completion unless the submit is cancelled
searchTask.addCompletionListener(finalResponse -> searchTask.addCompletionListener(
onFinalResponse(searchTask, finalResponse, () -> { finalResponse -> onFinalResponse(searchTask, finalResponse, () -> {}));
}));
} finally { } finally {
submitListener.onResponse(searchResponse); submitListener.onResponse(searchResponse);
} }
@ -110,11 +109,12 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
@Override @Override
public void onFailure(Exception exc) { public void onFailure(Exception exc) {
onFatalFailure(searchTask, exc, searchResponse.isRunning(), onFatalFailure(searchTask, exc, searchResponse.isRunning(),
"unable to store initial response", submitListener); "fatal failure: unable to store initial response", submitListener);
} }
}); });
} catch (Exception exc) { } catch (Exception exc) {
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener); onFatalFailure(searchTask, exc, searchResponse.isRunning(),
"fatal failure: generic error", submitListener);
} }
} else { } else {
// the task completed within the timeout so the response is sent back to the user // the task completed within the timeout so the response is sent back to the user
@ -126,7 +126,10 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
@Override @Override
public void onFailure(Exception exc) { public void onFailure(Exception exc) {
submitListener.onFailure(exc); //this will only ever be called if there is an issue scheduling the thread that executes
//the completion listener once the wait for completion timeout expires.
onFatalFailure(searchTask, exc, true,
"fatal failure: addCompletionListener", submitListener);
} }
}, request.getWaitForCompletionTimeout()); }, request.getWaitForCompletionTimeout());
} }
@ -149,7 +152,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
} }
private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shouldCancel, String cancelReason, private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shouldCancel, String cancelReason,
ActionListener<AsyncSearchResponse> listener) { ActionListener<AsyncSearchResponse> listener){
if (shouldCancel && task.isCancelled() == false) { if (shouldCancel && task.isCancelled() == false) {
task.cancelTask(() -> { task.cancelTask(() -> {
try { try {
@ -157,7 +160,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
} finally { } finally {
listener.onFailure(error); listener.onFailure(error);
} }
}, "fatal failure: " + cancelReason); }, cancelReason);
} else { } else {
try { try {
task.addCompletionListener(finalResponse -> taskManager.unregister(task)); task.addCompletionListener(finalResponse -> taskManager.unregister(task));
@ -175,13 +178,13 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap( store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction), resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> { exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getExecutionId()), exc); logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]",
searchTask.getExecutionId().getEncoded()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction); unregisterTaskAndMoveOn(searchTask, nextAction);
})); }));
return; return;
} }
try {
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> { exc -> {
@ -193,11 +196,6 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
} }
unregisterTaskAndMoveOn(searchTask, nextAction); unregisterTaskAndMoveOn(searchTask, nextAction);
})); }));
} catch (Exception exc) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()),
exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}
} }
private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) { private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) {

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -29,6 +28,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.async.AsyncResultsIndexPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
@ -67,11 +67,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
@Override @Override
public List<QuerySpec<?>> getQueries() { public List<QuerySpec<?>> getQueries() {
return Arrays.asList( return Arrays.asList(
new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in), new QuerySpec<>(BlockingQueryBuilder.NAME, BlockingQueryBuilder::new,
p -> { p -> {
throw new IllegalStateException("not implemented"); throw new IllegalStateException("not implemented");
}), }),
new QuerySpec<>(ThrowingQueryBuilder.NAME, in -> new ThrowingQueryBuilder(in), new QuerySpec<>(ThrowingQueryBuilder.NAME, ThrowingQueryBuilder::new,
p -> { p -> {
throw new IllegalStateException("not implemented"); throw new IllegalStateException("not implemented");
})); }));

View File

@ -6,17 +6,26 @@
package org.elasticsearch.xpack.search; package org.elasticsearch.xpack.search;
import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShard; import org.elasticsearch.action.search.SearchShard;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -34,16 +43,26 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
public class AsyncSearchTaskTests extends ESTestCase { public class AsyncSearchTaskTests extends ESTestCase {
private ThreadPool threadPool; private ThreadPool threadPool;
private boolean throwOnSchedule = false;
@Before @Before
public void beforeTest() { public void beforeTest() {
threadPool = new TestThreadPool(getTestName()); threadPool = new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
if (throwOnSchedule) {
throw new RuntimeException();
}
return super.schedule(command, delay, executor);
}
};
} }
@After @After
@ -123,6 +142,90 @@ public class AsyncSearchTaskTests extends ESTestCase {
latch.await(); latch.await();
} }
public void testGetResponseFailureDuringReduction() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask();
task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
SearchResponse.Clusters.EMPTY, false);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
//providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
//causing an exception when executing getResponse as part of the completion listener callback
DelayableWriteable.Serialized<InternalAggregations> serializedAggs = DelayableWriteable.referencing(aggs)
.asSerialized(InternalAggregations::new, new NamedWriteableRegistry(Collections.emptyList()));
task.getSearchProgressActionListener().onPartialReduce(Collections.emptyList(), new TotalHits(0, TotalHits.Relation.EQUAL_TO),
serializedAggs, 1);
AtomicReference<AsyncSearchResponse> response = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
task.addCompletionListener(new ActionListener<AsyncSearchResponse>() {
@Override
public void onResponse(AsyncSearchResponse asyncSearchResponse) {
assertTrue(response.compareAndSet(null, asyncSearchResponse));
latch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("onFailure should not be called");
}
}, TimeValue.timeValueMillis(10L));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNotNull(response.get().getSearchResponse());
assertEquals(0, response.get().getSearchResponse().getTotalShards());
assertEquals(0, response.get().getSearchResponse().getSuccessfulShards());
assertEquals(0, response.get().getSearchResponse().getFailedShards());
assertThat(response.get().getFailure(), instanceOf(ElasticsearchException.class));
assertEquals("Async search: error while reducing partial results", response.get().getFailure().getMessage());
assertThat(response.get().getFailure().getCause(), instanceOf(IllegalArgumentException.class));
assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() + "]",
response.get().getFailure().getCause().getMessage());
}
public void testWithFailureAndGetResponseFailureDuringReduction() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask();
task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
SearchResponse.Clusters.EMPTY, false);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
//providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
//causing an exception when executing getResponse as part of the completion listener callback
DelayableWriteable.Serialized<InternalAggregations> serializedAggs = DelayableWriteable.referencing(aggs)
.asSerialized(InternalAggregations::new, new NamedWriteableRegistry(Collections.emptyList()));
task.getSearchProgressActionListener().onPartialReduce(Collections.emptyList(), new TotalHits(0, TotalHits.Relation.EQUAL_TO),
serializedAggs, 1);
task.getSearchProgressActionListener().onFailure(new CircuitBreakingException("boom", CircuitBreaker.Durability.TRANSIENT));
AtomicReference<AsyncSearchResponse> response = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
task.addCompletionListener(new ActionListener<AsyncSearchResponse>() {
@Override
public void onResponse(AsyncSearchResponse asyncSearchResponse) {
assertTrue(response.compareAndSet(null, asyncSearchResponse));
latch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("onFailure should not be called");
}
}, TimeValue.timeValueMillis(10L));
assertTrue(latch.await(1, TimeUnit.SECONDS));
AsyncSearchResponse asyncSearchResponse = response.get();
assertNotNull(response.get().getSearchResponse());
assertEquals(0, response.get().getSearchResponse().getTotalShards());
assertEquals(0, response.get().getSearchResponse().getSuccessfulShards());
assertEquals(0, response.get().getSearchResponse().getFailedShards());
Exception failure = asyncSearchResponse.getFailure();
assertThat(failure, instanceOf(ElasticsearchException.class));
assertEquals("Async search: error while reducing partial results", failure.getMessage());
assertThat(failure.getCause(), instanceOf(IllegalArgumentException.class));
assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() +
"]", failure.getCause().getMessage());
assertEquals(1, failure.getSuppressed().length);
assertThat(failure.getSuppressed()[0], instanceOf(ElasticsearchException.class));
assertEquals("error while executing search", failure.getSuppressed()[0].getMessage());
assertThat(failure.getSuppressed()[0].getCause(), instanceOf(CircuitBreakingException.class));
assertEquals("boom", failure.getSuppressed()[0].getCause().getMessage());
}
public void testWaitForCompletion() throws InterruptedException { public void testWaitForCompletion() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask(); AsyncSearchTask task = createAsyncSearchTask();
int numShards = randomIntBetween(0, 10); int numShards = randomIntBetween(0, 10);
@ -237,6 +340,55 @@ public class AsyncSearchTaskTests extends ESTestCase {
assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true); assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true);
} }
public void testAddCompletionListenerScheduleErrorWaitForInitListener() throws InterruptedException {
throwOnSchedule = true;
AsyncSearchTask asyncSearchTask = createAsyncSearchTask();
AtomicReference<Exception> failure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
//onListShards has not been executed, then addCompletionListener has to wait for the
// onListShards call and is executed as init listener
asyncSearchTask.addCompletionListener(new ActionListener<AsyncSearchResponse>() {
@Override
public void onResponse(AsyncSearchResponse asyncSearchResponse) {
throw new AssertionError("onResponse should not be called");
}
@Override
public void onFailure(Exception e) {
assertTrue(failure.compareAndSet(null, e));
latch.countDown();
}
}, TimeValue.timeValueMillis(500L));
asyncSearchTask.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
SearchResponse.Clusters.EMPTY, false);
assertTrue(latch.await(1000, TimeUnit.SECONDS));
assertThat(failure.get(), instanceOf(RuntimeException.class));
}
public void testAddCompletionListenerScheduleErrorInitListenerExecutedImmediately() throws InterruptedException {
throwOnSchedule = true;
AsyncSearchTask asyncSearchTask = createAsyncSearchTask();
asyncSearchTask.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
SearchResponse.Clusters.EMPTY, false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> failure = new AtomicReference<>();
//onListShards has already been executed, then addCompletionListener is executed immediately
asyncSearchTask.addCompletionListener(new ActionListener<AsyncSearchResponse>() {
@Override
public void onResponse(AsyncSearchResponse asyncSearchResponse) {
throw new AssertionError("onResponse should not be called");
}
@Override
public void onFailure(Exception e) {
assertTrue(failure.compareAndSet(null, e));
latch.countDown();
}
}, TimeValue.timeValueMillis(500L));
assertTrue(latch.await(1000, TimeUnit.SECONDS));
assertThat(failure.get(), instanceOf(RuntimeException.class));
}
private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards, private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards,
ShardSearchFailure... failures) { ShardSearchFailure... failures) {
InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(), InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(),

View File

@ -129,11 +129,11 @@ class BlockingQueryBuilder extends AbstractQueryBuilder<BlockingQueryBuilder> {
* is called. * is called.
*/ */
static class QueryLatch implements Closeable { static class QueryLatch implements Closeable {
private volatile CountDownLatch countDownLatch;
private final Set<Integer> failedShards = new HashSet<>(); private final Set<Integer> failedShards = new HashSet<>();
private volatile CountDownLatch countDownLatch;
private int numShardFailures; private int numShardFailures;
QueryLatch(int numShardFailures) { private QueryLatch(int numShardFailures) {
this.countDownLatch = new CountDownLatch(1); this.countDownLatch = new CountDownLatch(1);
this.numShardFailures = numShardFailures; this.numShardFailures = numShardFailures;
} }

View File

@ -5,12 +5,10 @@
*/ */
package org.elasticsearch.xpack.search; package org.elasticsearch.xpack.search;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
@ -72,16 +70,7 @@ public class CancellingAggregationBuilder extends AbstractAggregationBuilder<Can
new ConstructingObjectParser<>(NAME, false, (args, name) -> new CancellingAggregationBuilder(name, 0L)); new ConstructingObjectParser<>(NAME, false, (args, name) -> new CancellingAggregationBuilder(name, 0L));
static CancellingAggregationBuilder fromXContent(String aggName, XContentParser parser) {
try {
return PARSER.apply(parser, aggName);
} catch (IllegalArgumentException e) {
throw new ParsingException(parser.getTokenLocation(), e.getMessage(), e);
}
}
@Override @Override
@SuppressWarnings("unchecked")
protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent, protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent,
AggregatorFactories.Builder subfactoriesBuilder) throws IOException { AggregatorFactories.Builder subfactoriesBuilder) throws IOException {
final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery()); final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery());

View File

@ -185,7 +185,8 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
public void updateResponse(String docId, public void updateResponse(String docId,
Map<String, List<String>> responseHeaders, Map<String, List<String>> responseHeaders,
R response, R response,
ActionListener<UpdateResponse> listener) throws IOException { ActionListener<UpdateResponse> listener) {
try {
Map<String, Object> source = new HashMap<>(); Map<String, Object> source = new HashMap<>();
source.put(RESPONSE_HEADERS_FIELD, responseHeaders); source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
source.put(RESULT_FIELD, encodeResponse(response)); source.put(RESULT_FIELD, encodeResponse(response));
@ -194,6 +195,9 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
.id(docId) .id(docId)
.doc(source, XContentType.JSON); .doc(source, XContentType.JSON);
client.update(request, listener); client.update(request, listener);
} catch(Exception e) {
listener.onFailure(e);
}
} }
/** /**
@ -215,8 +219,12 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
*/ */
public void deleteResponse(AsyncExecutionId asyncExecutionId, public void deleteResponse(AsyncExecutionId asyncExecutionId,
ActionListener<DeleteResponse> listener) { ActionListener<DeleteResponse> listener) {
try {
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
client.delete(request, listener); client.delete(request, listener);
} catch(Exception e) {
listener.onFailure(e);
}
} }
/** /**