Ensure that we don't call listener twice when detecting a partial failure in _search (#47694)

This change fixes a bug that can occur when a shard failure is detected while we build
the search response and accept partial failures in set to false. In this case we currently
call onFailure on the provided listener but also continue the search as if the failure didn't
occur. This can lead to a listener called twice, once with onFailure and once with onSuccess
which is forbidden by design.
This commit is contained in:
Jim Ferenczi 2019-10-10 09:59:19 +02:00 committed by jimczi
parent 5825d2df83
commit 3d334a262b
3 changed files with 17 additions and 14 deletions

View File

@ -352,7 +352,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
}
}
private ShardSearchFailure[] buildShardFailures() {
ShardSearchFailure[] buildShardFailures() {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
@ -510,20 +510,23 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
return request;
}
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse,
String scrollId,
ShardSearchFailure[] failures) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
@Override
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId));
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));
}
}
@Override

View File

@ -163,7 +163,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
new ArraySearchPhaseResults<>(10), null, false, new AtomicLong());
String scrollId = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId);
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId, action.buildShardFailures());
assertEquals(scrollId, searchResponse.getScrollId());
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
@ -179,7 +179,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
new IllegalArgumentException());
String scrollId = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId);
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId, action.buildShardFailures());
assertEquals(scrollId, searchResponse.getScrollId());
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
@ -187,7 +187,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
assertSame(searchResponse.getHits(), internalSearchResponse.hits());
}
public void testBuildSearchResponseDisallowPartialFailures() {
public void testSendSearchResponseDisallowPartialFailures() {
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
@ -203,7 +203,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
action.onShardFailure(i, new SearchShardTarget(failureNodeId, failureShardId, failureClusterAlias, OriginalIndices.NONE),
new IllegalArgumentException());
}
action.buildSearchResponse(InternalSearchResponse.empty(), randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10));
action.sendSearchResponse(InternalSearchResponse.empty(), randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10));
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException)exception.get();
assertEquals(0, searchPhaseExecutionException.getSuppressed().length);

View File

@ -148,7 +148,7 @@ public class SearchAsyncActionTests extends ESTestCase {
asyncAction.start();
latch.await();
assertTrue(searchPhaseDidRun.get());
SearchResponse searchResponse = asyncAction.buildSearchResponse(null, null);
SearchResponse searchResponse = asyncAction.buildSearchResponse(null, null, asyncAction.buildShardFailures());
assertEquals(shardsIter.size() - numSkipped, numRequests.get());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(numSkipped, searchResponse.getSkippedShards());