diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 8c1ecdd846a..6ae6655ef6b 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -363,17 +363,21 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) { // best effort to cancel expired tasks checkCancellation(); - searchResponse.get().addShardFailure(shardIndex, + searchResponse.get().addQueryFailure(shardIndex, // the nodeId is null if all replicas of this shard failed new ShardSearchFailure(exc, shardTarget.getNodeId() != null ? shardTarget : null)); } @Override protected void onFetchFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) { + // best effort to cancel expired tasks checkCancellation(); - searchResponse.get().addShardFailure(shardIndex, - // the nodeId is null if all replicas of this shard failed - new ShardSearchFailure(exc, shardTarget.getNodeId() != null ? shardTarget : null)); + //ignore fetch failures: they make the shards count confusing if we count them as shard failures because the query + // phase ran fine and we don't want to end up with e.g. total: 5 successful: 5 failed: 5. + //Given that partial results include only aggs they are not affected by fetch failures. Async search receives the fetch + //failures either as an exception (when all shards failed during fetch, in which case async search will return the error + //as well as the response obtained after the final reduction) or as part of the final response (if only some shards failed, + //in which case the final response already includes results as well as shard fetch failures) } @Override diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java index 2b499e97eba..adf43b54736 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java @@ -35,7 +35,7 @@ class MutableSearchResponse { private final int totalShards; private final int skippedShards; private final Clusters clusters; - private final AtomicArray shardFailures; + private final AtomicArray queryFailures; private final ThreadContext threadContext; private boolean isPartial; @@ -74,7 +74,7 @@ class MutableSearchResponse { this.totalShards = totalShards; this.skippedShards = skippedShards; this.clusters = clusters; - this.shardFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards); + this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards); this.isPartial = true; this.threadContext = threadContext; this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); @@ -110,8 +110,6 @@ class MutableSearchResponse { "notified through onListShards"; assert response.getSkippedShards() == skippedShards : "received number of skipped shards differs from the one " + "notified through onListShards"; - assert response.getFailedShards() == buildShardFailures().length : "number of tracked failures differs from failed shards"; - // copy the response headers from the current context this.responseHeaders = threadContext.getResponseHeaders(); this.finalResponse = response; this.isPartial = false; @@ -136,11 +134,11 @@ class MutableSearchResponse { /** * Adds a shard failure concurrently (non-blocking). */ - void addShardFailure(int shardIndex, ShardSearchFailure failure) { + void addQueryFailure(int shardIndex, ShardSearchFailure failure) { synchronized (this) { failIfFrozen(); } - shardFailures.set(shardIndex, failure); + queryFailures.set(shardIndex, failure); } private SearchResponse buildResponse(long taskStartTimeNanos, InternalAggregations reducedAggs) { @@ -148,7 +146,7 @@ class MutableSearchResponse { new SearchHits(SearchHits.EMPTY, totalHits, Float.NaN), reducedAggs, null, null, false, false, reducePhase); long tookInMillis = TimeValue.timeValueNanos(System.nanoTime() - taskStartTimeNanos).getMillis(); return new SearchResponse(internal, null, totalShards, successfulShards, skippedShards, - tookInMillis, buildShardFailures(), clusters); + tookInMillis, buildQueryFailures(), clusters); } /** @@ -202,13 +200,13 @@ class MutableSearchResponse { } } - private ShardSearchFailure[] buildShardFailures() { - if (shardFailures == null) { + private ShardSearchFailure[] buildQueryFailures() { + if (queryFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } List failures = new ArrayList<>(); - for (int i = 0; i < shardFailures.length(); i++) { - ShardSearchFailure failure = shardFailures.get(i); + for (int i = 0; i < queryFailures.length(); i++) { + ShardSearchFailure failure = queryFailures.get(i); if (failure != null) { failures.add(failure); } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSingleNodeTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSingleNodeTests.java new file mode 100644 index 00000000000..ea905535a00 --- /dev/null +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchSingleNodeTests.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.search; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.FetchSubPhase; +import org.elasticsearch.search.fetch.FetchSubPhaseProcessor; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction; +import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; +import org.hamcrest.CoreMatchers; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class AsyncSearchSingleNodeTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Arrays.asList(AsyncSearch.class, SubFetchPhasePlugin.class); + } + + public void testFetchFailuresAllShards() throws Exception { + for (int i = 0; i < 10; i++) { + IndexResponse indexResponse = client().index(new IndexRequest("boom" + i).id("boom" + i).source("text", "value")).get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + client().admin().indices().refresh(new RefreshRequest()).get(); + + TermsAggregationBuilder agg = new TermsAggregationBuilder("text").field("text.keyword"); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().aggregation(agg); + SubmitAsyncSearchRequest submitAsyncSearchRequest = new SubmitAsyncSearchRequest(sourceBuilder); + submitAsyncSearchRequest.setWaitForCompletionTimeout(TimeValue.timeValueSeconds(10)); + AsyncSearchResponse asyncSearchResponse = client().execute(SubmitAsyncSearchAction.INSTANCE, submitAsyncSearchRequest).actionGet(); + + assertFalse(asyncSearchResponse.isRunning()); + assertTrue(asyncSearchResponse.isPartial()); + SearchResponse searchResponse = asyncSearchResponse.getSearchResponse(); + assertEquals(10, searchResponse.getTotalShards()); + assertEquals(10, searchResponse.getSuccessfulShards()); + assertEquals(0, searchResponse.getFailedShards()); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(10, searchResponse.getHits().getTotalHits().value); + assertEquals(0, searchResponse.getHits().getHits().length); + StringTerms terms = searchResponse.getAggregations().get("text"); + assertEquals(1, terms.getBuckets().size()); + assertEquals(10, terms.getBucketByKey("value").getDocCount()); + assertNotNull(asyncSearchResponse.getFailure()); + assertThat(asyncSearchResponse.getFailure(), CoreMatchers.instanceOf(ElasticsearchStatusException.class)); + ElasticsearchStatusException statusException = (ElasticsearchStatusException) asyncSearchResponse.getFailure(); + assertEquals(RestStatus.INTERNAL_SERVER_ERROR, statusException.status()); + assertThat(asyncSearchResponse.getFailure().getCause(), CoreMatchers.instanceOf(SearchPhaseExecutionException.class)); + SearchPhaseExecutionException phaseExecutionException = (SearchPhaseExecutionException) asyncSearchResponse.getFailure().getCause(); + assertEquals("fetch", phaseExecutionException.getPhaseName()); + assertEquals("boom", phaseExecutionException.getCause().getMessage()); + assertEquals(10, phaseExecutionException.shardFailures().length); + for (ShardSearchFailure shardSearchFailure : phaseExecutionException.shardFailures()) { + assertEquals("boom", shardSearchFailure.getCause().getMessage()); + } + } + + public void testFetchFailuresOnlySomeShards() throws Exception { + for (int i = 0; i < 5; i++) { + IndexResponse indexResponse = client().index(new IndexRequest("boom" + i).id("boom" + i).source("text", "value")).get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + for (int i = 0; i < 5; i++) { + IndexResponse indexResponse = client().index(new IndexRequest("index" + i).id("index" + i).source("text", "value")).get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + } + client().admin().indices().refresh(new RefreshRequest()).get(); + + TermsAggregationBuilder agg = new TermsAggregationBuilder("text").field("text.keyword"); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().aggregation(agg); + SubmitAsyncSearchRequest submitAsyncSearchRequest = new SubmitAsyncSearchRequest(sourceBuilder); + submitAsyncSearchRequest.setWaitForCompletionTimeout(TimeValue.timeValueSeconds(10)); + AsyncSearchResponse asyncSearchResponse = client().execute(SubmitAsyncSearchAction.INSTANCE, submitAsyncSearchRequest).actionGet(); + + assertFalse(asyncSearchResponse.isRunning()); + assertFalse(asyncSearchResponse.isPartial()); + assertNull(asyncSearchResponse.getFailure()); + SearchResponse searchResponse = asyncSearchResponse.getSearchResponse(); + assertEquals(10, searchResponse.getTotalShards()); + assertEquals(5, searchResponse.getSuccessfulShards()); + assertEquals(5, searchResponse.getFailedShards()); + assertEquals(10, searchResponse.getHits().getTotalHits().value); + assertEquals(5, searchResponse.getHits().getHits().length); + StringTerms terms = searchResponse.getAggregations().get("text"); + assertEquals(1, terms.getBuckets().size()); + assertEquals(10, terms.getBucketByKey("value").getDocCount()); + assertEquals(5, searchResponse.getShardFailures().length); + for (ShardSearchFailure shardFailure : searchResponse.getShardFailures()) { + assertEquals("boom", shardFailure.getCause().getMessage()); + } + } + + public static final class SubFetchPhasePlugin extends Plugin implements SearchPlugin { + @Override + public List getFetchSubPhases(FetchPhaseConstructionContext context) { + return Collections.singletonList(searchContext -> new FetchSubPhaseProcessor() { + @Override + public void setNextReader(LeafReaderContext readerContext) {} + + @Override + public void process(FetchSubPhase.HitContext hitContext) { + if (hitContext.hit().getId().startsWith("boom")) { + throw new RuntimeException("boom"); + } + + } + }); + } + } +} diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index 3cd2c682dd1..01f57a07ee8 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -256,14 +256,14 @@ public class AsyncSearchTaskTests extends ESTestCase { for (int i = 0; i < numShards; i++) { task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1), new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0); - assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true); + assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true, false); } task.getSearchProgressActionListener().onFinalReduce(shards, new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0); - assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true); + assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, false); ((AsyncSearchTask.Listener)task.getProgressListener()).onResponse( newSearchResponse(totalShards, totalShards, numSkippedShards)); - assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, false); + assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, false, false); } public void testWithFetchFailures() throws InterruptedException { @@ -283,7 +283,7 @@ public class AsyncSearchTaskTests extends ESTestCase { for (int i = 0; i < numShards; i++) { task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1), new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0); - assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true); + assertCompletionListeners(task, totalShards, 1 + numSkippedShards, numSkippedShards, 0, true, false); } task.getSearchProgressActionListener().onFinalReduce(shards, new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0); @@ -291,15 +291,16 @@ public class AsyncSearchTaskTests extends ESTestCase { ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFetchFailures]; for (int i = 0; i < numFetchFailures; i++) { IOException failure = new IOException("boum"); + //fetch failures are currently ignored, they come back with onFailure or onResponse anyways task.getSearchProgressActionListener().onFetchFailure(i, new SearchShardTarget("0", new ShardId("0", "0", 1), null, OriginalIndices.NONE), failure); shardSearchFailures[i] = new ShardSearchFailure(failure); } - assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numFetchFailures, true); + assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, false); ((AsyncSearchTask.Listener)task.getProgressListener()).onResponse( newSearchResponse(totalShards, totalShards - numFetchFailures, numSkippedShards, shardSearchFailures)); - assertCompletionListeners(task, totalShards, totalShards - numFetchFailures, numSkippedShards, numFetchFailures, false); + assertCompletionListeners(task, totalShards, totalShards - numFetchFailures, numSkippedShards, numFetchFailures, false, false); } public void testFatalFailureDuringFetch() throws InterruptedException { @@ -319,18 +320,19 @@ public class AsyncSearchTaskTests extends ESTestCase { for (int i = 0; i < numShards; i++) { task.getSearchProgressActionListener().onPartialReduce(shards.subList(0, i+1), new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0); - assertCompletionListeners(task, totalShards, i + 1 + numSkippedShards, numSkippedShards, 0, true); + assertCompletionListeners(task, totalShards, i + 1 + numSkippedShards, numSkippedShards, 0, true, false); } task.getSearchProgressActionListener().onFinalReduce(shards, new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0); for (int i = 0; i < numShards; i++) { + //fetch failures are currently ignored, they come back with onFailure or onResponse anyways task.getSearchProgressActionListener().onFetchFailure(i, new SearchShardTarget("0", new ShardId("0", "0", 1), null, OriginalIndices.NONE), new IOException("boum")); } - assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true); + assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, false); ((AsyncSearchTask.Listener)task.getProgressListener()).onFailure(new IOException("boum")); - assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true); + assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, 0, true, true); } public void testFatalFailureWithNoCause() throws InterruptedException { @@ -350,7 +352,7 @@ public class AsyncSearchTaskTests extends ESTestCase { task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false); listener.onFailure(new SearchPhaseExecutionException("fetch", "boum", ShardSearchFailure.EMPTY_ARRAY)); - assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true); + assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true, true); } public void testAddCompletionListenerScheduleErrorWaitForInitListener() throws InterruptedException { @@ -415,7 +417,8 @@ public class AsyncSearchTaskTests extends ESTestCase { int expectedSuccessfulShards, int expectedSkippedShards, int expectedShardFailures, - boolean isPartial) throws InterruptedException { + boolean isPartial, + boolean totalFailureExpected) throws InterruptedException { int numThreads = randomIntBetween(1, 10); CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; i++) { @@ -434,6 +437,11 @@ public class AsyncSearchTaskTests extends ESTestCase { assertThat(failure.getCause().getMessage(), equalTo("boum")); } } + if (totalFailureExpected) { + assertNotNull(resp.getFailure()); + } else { + assertNull(resp.getFailure()); + } latch.countDown(); }