Async search: don't track fetch failures (#62111)
Fetch failures are currently tracked byy AsyncSearchTask like ordinary shard failures. Though they should be treated differently or they end up causing weird scenarios like total=num_shards and successful=num_shards as the query phase ran fine yet the failed count would reflect the number of shards where fetch failed. Given that partial results only include aggs for now and are complete even if fetch fails, we can ignore fetch failures in async search, as they will be anyways included in the response. They are in fact either received as a failure when all shards fail during fetch, or as part of the final response when only some shards fail during fetch.
This commit is contained in:
parent
fbf0967e20
commit
b680d3fb29
|
@ -363,17 +363,21 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
|
|||
protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
|
||||
// best effort to cancel expired tasks
|
||||
checkCancellation();
|
||||
searchResponse.get().addShardFailure(shardIndex,
|
||||
searchResponse.get().addQueryFailure(shardIndex,
|
||||
// the nodeId is null if all replicas of this shard failed
|
||||
new ShardSearchFailure(exc, shardTarget.getNodeId() != null ? shardTarget : null));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFetchFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
|
||||
// best effort to cancel expired tasks
|
||||
checkCancellation();
|
||||
searchResponse.get().addShardFailure(shardIndex,
|
||||
// the nodeId is null if all replicas of this shard failed
|
||||
new ShardSearchFailure(exc, shardTarget.getNodeId() != null ? shardTarget : null));
|
||||
//ignore fetch failures: they make the shards count confusing if we count them as shard failures because the query
|
||||
// phase ran fine and we don't want to end up with e.g. total: 5 successful: 5 failed: 5.
|
||||
//Given that partial results include only aggs they are not affected by fetch failures. Async search receives the fetch
|
||||
//failures either as an exception (when all shards failed during fetch, in which case async search will return the error
|
||||
//as well as the response obtained after the final reduction) or as part of the final response (if only some shards failed,
|
||||
//in which case the final response already includes results as well as shard fetch failures)
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,7 +35,7 @@ class MutableSearchResponse {
|
|||
private final int totalShards;
|
||||
private final int skippedShards;
|
||||
private final Clusters clusters;
|
||||
private final AtomicArray<ShardSearchFailure> shardFailures;
|
||||
private final AtomicArray<ShardSearchFailure> queryFailures;
|
||||
private final ThreadContext threadContext;
|
||||
|
||||
private boolean isPartial;
|
||||
|
@ -74,7 +74,7 @@ class MutableSearchResponse {
|
|||
this.totalShards = totalShards;
|
||||
this.skippedShards = skippedShards;
|
||||
this.clusters = clusters;
|
||||
this.shardFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
|
||||
this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
|
||||
this.isPartial = true;
|
||||
this.threadContext = threadContext;
|
||||
this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
|
||||
|
@ -110,8 +110,6 @@ class MutableSearchResponse {
|
|||
"notified through onListShards";
|
||||
assert response.getSkippedShards() == skippedShards : "received number of skipped shards differs from the one " +
|
||||
"notified through onListShards";
|
||||
assert response.getFailedShards() == buildShardFailures().length : "number of tracked failures differs from failed shards";
|
||||
// copy the response headers from the current context
|
||||
this.responseHeaders = threadContext.getResponseHeaders();
|
||||
this.finalResponse = response;
|
||||
this.isPartial = false;
|
||||
|
@ -136,11 +134,11 @@ class MutableSearchResponse {
|
|||
/**
|
||||
* Adds a shard failure concurrently (non-blocking).
|
||||
*/
|
||||
void addShardFailure(int shardIndex, ShardSearchFailure failure) {
|
||||
void addQueryFailure(int shardIndex, ShardSearchFailure failure) {
|
||||
synchronized (this) {
|
||||
failIfFrozen();
|
||||
}
|
||||
shardFailures.set(shardIndex, failure);
|
||||
queryFailures.set(shardIndex, failure);
|
||||
}
|
||||
|
||||
private SearchResponse buildResponse(long taskStartTimeNanos, InternalAggregations reducedAggs) {
|
||||
|
@ -148,7 +146,7 @@ class MutableSearchResponse {
|
|||
new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase);
|
||||
long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - taskStartTimeNanos).getMillis();
|
||||
return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards,
|
||||
tookInMillis, buildShardFailures(), clusters);
|
||||
tookInMillis, buildQueryFailures(), clusters);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -202,13 +200,13 @@ class MutableSearchResponse {
|
|||
}
|
||||
}
|
||||
|
||||
private ShardSearchFailure[] buildShardFailures() {
|
||||
if (shardFailures == null) {
|
||||
private ShardSearchFailure[] buildQueryFailures() {
|
||||
if (queryFailures == null) {
|
||||
return ShardSearchFailure.EMPTY_ARRAY;
|
||||
}
|
||||
List<ShardSearchFailure> failures = new ArrayList<>();
|
||||
for (int i = 0; i < shardFailures.length(); i++) {
|
||||
ShardSearchFailure failure = shardFailures.get(i);
|
||||
for (int i = 0; i < queryFailures.length(); i++) {
|
||||
ShardSearchFailure failure = queryFailures.get(i);
|
||||
if (failure != null) {
|
||||
failures.add(failure);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.search;
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.SearchPlugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.fetch.FetchSubPhase;
|
||||
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
|
||||
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
|
||||
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class AsyncSearchSingleNodeTests extends ESSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Arrays.asList(AsyncSearch.class, SubFetchPhasePlugin.class);
|
||||
}
|
||||
|
||||
public void testFetchFailuresAllShards() throws Exception {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
IndexResponse indexResponse = client().index(new IndexRequest("boom" + i).id("boom" + i).source("text", "value")).get();
|
||||
assertEquals(RestStatus.CREATED, indexResponse.status());
|
||||
}
|
||||
client().admin().indices().refresh(new RefreshRequest()).get();
|
||||
|
||||
TermsAggregationBuilder agg = new TermsAggregationBuilder("text").field("text.keyword");
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().aggregation(agg);
|
||||
SubmitAsyncSearchRequest submitAsyncSearchRequest = new SubmitAsyncSearchRequest(sourceBuilder);
|
||||
submitAsyncSearchRequest.setWaitForCompletionTimeout(TimeValue.timeValueSeconds(10));
|
||||
AsyncSearchResponse asyncSearchResponse = client().execute(SubmitAsyncSearchAction.INSTANCE, submitAsyncSearchRequest).actionGet();
|
||||
|
||||
assertFalse(asyncSearchResponse.isRunning());
|
||||
assertTrue(asyncSearchResponse.isPartial());
|
||||
SearchResponse searchResponse = asyncSearchResponse.getSearchResponse();
|
||||
assertEquals(10, searchResponse.getTotalShards());
|
||||
assertEquals(10, searchResponse.getSuccessfulShards());
|
||||
assertEquals(0, searchResponse.getFailedShards());
|
||||
assertEquals(0, searchResponse.getShardFailures().length);
|
||||
assertEquals(10, searchResponse.getHits().getTotalHits().value);
|
||||
assertEquals(0, searchResponse.getHits().getHits().length);
|
||||
StringTerms terms = searchResponse.getAggregations().get("text");
|
||||
assertEquals(1, terms.getBuckets().size());
|
||||
assertEquals(10, terms.getBucketByKey("value").getDocCount());
|
||||
assertNotNull(asyncSearchResponse.getFailure());
|
||||
assertThat(asyncSearchResponse.getFailure(), CoreMatchers.instanceOf(ElasticsearchStatusException.class));
|
||||
ElasticsearchStatusException statusException = (ElasticsearchStatusException) asyncSearchResponse.getFailure();
|
||||
assertEquals(RestStatus.INTERNAL_SERVER_ERROR, statusException.status());
|
||||
assertThat(asyncSearchResponse.getFailure().getCause(), CoreMatchers.instanceOf(SearchPhaseExecutionException.class));
|
||||
SearchPhaseExecutionException phaseExecutionException = (SearchPhaseExecutionException) asyncSearchResponse.getFailure().getCause();
|
||||
assertEquals("fetch", phaseExecutionException.getPhaseName());
|
||||
assertEquals("boom", phaseExecutionException.getCause().getMessage());
|
||||
assertEquals(10, phaseExecutionException.shardFailures().length);
|
||||
for (ShardSearchFailure shardSearchFailure : phaseExecutionException.shardFailures()) {
|
||||
assertEquals("boom", shardSearchFailure.getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testFetchFailuresOnlySomeShards() throws Exception {
|
||||
for (int i = 0; i < 5; i++) {
|
||||
IndexResponse indexResponse = client().index(new IndexRequest("boom" + i).id("boom" + i).source("text", "value")).get();
|
||||
assertEquals(RestStatus.CREATED, indexResponse.status());
|
||||
}
|
||||
for (int i = 0; i < 5; i++) {
|
||||
IndexResponse indexResponse = client().index(new IndexRequest("index" + i).id("index" + i).source("text", "value")).get();
|
||||
assertEquals(RestStatus.CREATED, indexResponse.status());
|
||||
}
|
||||
client().admin().indices().refresh(new RefreshRequest()).get();
|
||||
|
||||
TermsAggregationBuilder agg = new TermsAggregationBuilder("text").field("text.keyword");
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().aggregation(agg);
|
||||
SubmitAsyncSearchRequest submitAsyncSearchRequest = new SubmitAsyncSearchRequest(sourceBuilder);
|
||||
submitAsyncSearchRequest.setWaitForCompletionTimeout(TimeValue.timeValueSeconds(10));
|
||||
AsyncSearchResponse asyncSearchResponse = client().execute(SubmitAsyncSearchAction.INSTANCE, submitAsyncSearchRequest).actionGet();
|
||||
|
||||
assertFalse(asyncSearchResponse.isRunning());
|
||||
assertFalse(asyncSearchResponse.isPartial());
|
||||
assertNull(asyncSearchResponse.getFailure());
|
||||
SearchResponse searchResponse = asyncSearchResponse.getSearchResponse();
|
||||
assertEquals(10, searchResponse.getTotalShards());
|
||||
assertEquals(5, searchResponse.getSuccessfulShards());
|
||||
assertEquals(5, searchResponse.getFailedShards());
|
||||
assertEquals(10, searchResponse.getHits().getTotalHits().value);
|
||||
assertEquals(5, searchResponse.getHits().getHits().length);
|
||||
StringTerms terms = searchResponse.getAggregations().get("text");
|
||||
assertEquals(1, terms.getBuckets().size());
|
||||
assertEquals(10, terms.getBucketByKey("value").getDocCount());
|
||||
assertEquals(5, searchResponse.getShardFailures().length);
|
||||
for (ShardSearchFailure shardFailure : searchResponse.getShardFailures()) {
|
||||
assertEquals("boom", shardFailure.getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public static final class SubFetchPhasePlugin extends Plugin implements SearchPlugin {
|
||||
@Override
|
||||
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
|
||||
return Collections.singletonList(searchContext -> new FetchSubPhaseProcessor() {
|
||||
@Override
|
||||
public void setNextReader(LeafReaderContext readerContext) {}
|
||||
|
||||
@Override
|
||||
public void process(FetchSubPhase.HitContext hitContext) {
|
||||
if (hitContext.hit().getId().startsWith("boom")) {
|
||||
throw new RuntimeException("boom");
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -256,14 +256,14 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
for (int i = 0; i < numShards; i++) {
|
||||
task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1),
|
||||
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
|
||||
assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true);
|
||||
assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true, false);
|
||||
}
|
||||
task.getSearchProgressActionListener().onFinalReduce(shards,
|
||||
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true);
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, false);
|
||||
((AsyncSearchTask.Listener)task.getProgressListener()).onResponse(
|
||||
newSearchResponse(totalShards, totalShards, numSkippedShards));
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, false);
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, false, false);
|
||||
}
|
||||
|
||||
public void testWithFetchFailures() throws InterruptedException {
|
||||
|
@ -283,7 +283,7 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
for (int i = 0; i < numShards; i++) {
|
||||
task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1),
|
||||
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
|
||||
assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true);
|
||||
assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true, false);
|
||||
}
|
||||
task.getSearchProgressActionListener().onFinalReduce(shards,
|
||||
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
|
||||
|
@ -291,15 +291,16 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFetchFailures];
|
||||
for (int i = 0; i < numFetchFailures; i++) {
|
||||
IOException failure = new IOException("boum");
|
||||
//fetch failures are currently ignored, they come back with onFailure or onResponse anyways
|
||||
task.getSearchProgressActionListener().onFetchFailure(i,
|
||||
new SearchShardTarget("0", new ShardId("0", "0", 1), null, OriginalIndices.NONE),
|
||||
failure);
|
||||
shardSearchFailures[i] = new ShardSearchFailure(failure);
|
||||
}
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numFetchFailures, true);
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, false);
|
||||
((AsyncSearchTask.Listener)task.getProgressListener()).onResponse(
|
||||
newSearchResponse(totalShards, totalShards - numFetchFailures, numSkippedShards, shardSearchFailures));
|
||||
assertCompletionListeners(task, totalShards, totalShards - numFetchFailures, numSkippedShards, numFetchFailures, false);
|
||||
assertCompletionListeners(task, totalShards, totalShards - numFetchFailures, numSkippedShards, numFetchFailures, false, false);
|
||||
}
|
||||
|
||||
public void testFatalFailureDuringFetch() throws InterruptedException {
|
||||
|
@ -319,18 +320,19 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
for (int i = 0; i < numShards; i++) {
|
||||
task.getSearchProgressActionListener().onPartialReduce(shards.subList(0, i+1),
|
||||
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
|
||||
assertCompletionListeners(task, totalShards, i + 1 + numSkippedShards, numSkippedShards, 0, true);
|
||||
assertCompletionListeners(task, totalShards, i + 1 + numSkippedShards, numSkippedShards, 0, true, false);
|
||||
}
|
||||
task.getSearchProgressActionListener().onFinalReduce(shards,
|
||||
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
//fetch failures are currently ignored, they come back with onFailure or onResponse anyways
|
||||
task.getSearchProgressActionListener().onFetchFailure(i,
|
||||
new SearchShardTarget("0", new ShardId("0", "0", 1), null, OriginalIndices.NONE),
|
||||
new IOException("boum"));
|
||||
}
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true);
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, false);
|
||||
((AsyncSearchTask.Listener)task.getProgressListener()).onFailure(new IOException("boum"));
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true);
|
||||
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, true);
|
||||
}
|
||||
|
||||
public void testFatalFailureWithNoCause() throws InterruptedException {
|
||||
|
@ -350,7 +352,7 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
|
||||
|
||||
listener.onFailure(new SearchPhaseExecutionException("fetch", "boum", ShardSearchFailure.EMPTY_ARRAY));
|
||||
assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true);
|
||||
assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true, true);
|
||||
}
|
||||
|
||||
public void testAddCompletionListenerScheduleErrorWaitForInitListener() throws InterruptedException {
|
||||
|
@ -415,7 +417,8 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
int expectedSuccessfulShards,
|
||||
int expectedSkippedShards,
|
||||
int expectedShardFailures,
|
||||
boolean isPartial) throws InterruptedException {
|
||||
boolean isPartial,
|
||||
boolean totalFailureExpected) throws InterruptedException {
|
||||
int numThreads = randomIntBetween(1, 10);
|
||||
CountDownLatch latch = new CountDownLatch(numThreads);
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
|
@ -434,6 +437,11 @@ public class AsyncSearchTaskTests extends ESTestCase {
|
|||
assertThat(failure.getCause().getMessage(), equalTo("boum"));
|
||||
}
|
||||
}
|
||||
if (totalFailureExpected) {
|
||||
assertNotNull(resp.getFailure());
|
||||
} else {
|
||||
assertNull(resp.getFailure());
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue