Handle failures with no explicit cause in async search (#58319)

This commit fixes an AOOBE in the handling of fatal
failures in _async_search. If the underlying cause is not found,
this change uses the root failure.

Closes #58311
This commit is contained in:
Jim Ferenczi 2020-06-18 17:58:23 +02:00 committed by jimczi
parent 9dd3d5aa48
commit 1c1a6d4ec8
5 changed files with 157 additions and 2 deletions

View File

@ -6,10 +6,12 @@
package org.elasticsearch.xpack.search;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
@ -395,4 +397,20 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
ensureTaskNotRunning(newResp.getId());
ensureTaskRemoval(newResp.getId());
}
public void testSearchPhaseFailureNoCause() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
request.setKeepOnCompletion(true);
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
request.getSearchRequest().allowPartialSearchResults(false);
request.getSearchRequest()
// AlreadyClosedException are ignored by the coordinating node
.source(new SearchSourceBuilder().query(new ThrowingQueryBuilder(randomLong(), new AlreadyClosedException("boom"), 0)));
AsyncSearchResponse response = submitAsyncSearch(request);
assertFalse(response.isRunning());
assertTrue(response.isPartial());
assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertNotNull(response.getFailure());
ensureTaskNotRunning(response.getId());
}
}

View File

@ -130,7 +130,17 @@ class MutableSearchResponse {
//note that when search fails, we may have gotten partial results before the failure. In that case async
// search will return an error plus the last partial results that were collected.
this.isPartial = true;
this.failure = ElasticsearchException.guessRootCauses(exc)[0];
ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(exc);
if (rootCauses == null || rootCauses.length == 0) {
this.failure = new ElasticsearchException(exc.getMessage(), exc) {
@Override
protected String getExceptionName() {
return getExceptionName(getCause());
}
};
} else {
this.failure = rootCauses[0];
}
this.frozen = true;
}

View File

@ -62,7 +62,12 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
@Override
public List<QuerySpec<?>> getQueries() {
return Collections.singletonList(new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in),
return Arrays.asList(
new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in),
p -> {
throw new IllegalStateException("not implemented");
}),
new QuerySpec<>(ThrowingQueryBuilder.NAME, in -> new ThrowingQueryBuilder(in),
p -> {
throw new IllegalStateException("not implemented");
}));

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.search;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShard;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -216,6 +217,26 @@ public class AsyncSearchTaskTests extends ESTestCase {
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true);
}
public void testFatalFailureWithNoCause() throws InterruptedException {
AsyncSearchTask task = createAsyncSearchTask();
AsyncSearchTask.Listener listener = task.getSearchProgressActionListener();
int numShards = randomIntBetween(0, 10);
List<SearchShard> shards = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
shards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
List<SearchShard> skippedShards = new ArrayList<>();
int numSkippedShards = randomIntBetween(0, 10);
for (int i = 0; i < numSkippedShards; i++) {
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
int totalShards = numShards + numSkippedShards;
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
listener.onFailure(new SearchPhaseExecutionException("fetch", "boum", ShardSearchFailure.EMPTY_ARRAY));
assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true);
}
private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards,
ShardSearchFailure... failures) {
InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(),

View File

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import java.io.IOException;
class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
public static final String NAME = "throw";
private final long randomUID;
private final RuntimeException failure;
private final int shardId;
/**
* Creates a {@link ThrowingQueryBuilder} with the provided <code>randomUID</code>.
*/
ThrowingQueryBuilder(long randomUID, RuntimeException failure, int shardId) {
super();
this.randomUID = randomUID;
this.failure = failure;
this.shardId = shardId;
}
ThrowingQueryBuilder(StreamInput in) throws IOException {
super(in);
this.randomUID = in.readLong();
this.failure = in.readException();
this.shardId = in.readVInt();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeLong(randomUID);
out.writeException(failure);
out.writeVInt(shardId);
}
@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
builder.endObject();
}
@Override
protected Query doToQuery(QueryShardContext context) {
final Query delegate = Queries.newMatchAllQuery();
return new Query() {
@Override
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
if (context.getShardId() == shardId) {
throw failure;
}
return delegate.createWeight(searcher, scoreMode, boost);
}
@Override
public String toString(String field) {
return delegate.toString(field);
}
@Override
public boolean equals(Object obj) {
return false;
}
@Override
public int hashCode() {
return 0;
}
};
}
@Override
protected boolean doEquals(ThrowingQueryBuilder other) {
return false;
}
@Override
protected int doHashCode() {
return 0;
}
@Override
public String getWriteableName() {
return NAME;
}
}