SearchRequest#allowPartialSearchResults does not handle successful retries (#43095)

When set to false, allowPartialSearchResults option does not check if the
shard failures have been reseted to null. The atomic array, that is used to record
shard failures, is filled with a null value if a successful request on a shard happens
after a failure on a shard of another replica. In this case the atomic array is not empty
but contains only null values so this shouldn't be considered as a failure since all
shards are successful (some replicas have failed but the retries on another replica succeeded).
This change fixes this bug by checking the content of the atomic array and fails the request only
if allowPartialSearchResults is set to false and at least one shard failure is not null.

Closes #40743
This commit is contained in:
Jim Ferenczi 2019-06-12 15:37:05 +02:00 committed by jimczi
parent 7f690e8606
commit 79614aeb2d
3 changed files with 120 additions and 18 deletions

View File

@ -140,24 +140,29 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
} else { } else {
Boolean allowPartialResults = request.allowPartialSearchResults(); Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){ if (allowPartialResults == false && shardFailures.get() != null) {
if (logger.isDebugEnabled()) { // check if there are actual failures in the atomic array since
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures()); // successful retries can reset the failures to null
Throwable cause = shardSearchFailures.length == 0 ? null : ShardOperationFailedException[] shardSearchFailures = buildShardFailures();
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0]; if (shardSearchFailures.length > 0) {
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]", if (logger.isDebugEnabled()) {
shardSearchFailures.length, getName()), cause); int numShardFailures = shardSearchFailures.length;
shardSearchFailures = ExceptionsHelper.groupBy(shardSearchFailures);
Throwable cause = ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
numShardFailures, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
return;
} }
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
executePhase(nextPhase);
} }
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
executePhase(nextPhase);
} }
} }

View File

@ -139,13 +139,13 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
for (int index = 0; index < shardsIts.size(); index++) { for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index); final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) { if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){ if(missingShards.length() > 0){
missingShards.append(", "); missingShards.append(", ");
} }
missingShards.append(shardRoutings.shardId()); missingShards.append(shardRoutings.shardId());
} }
} }
if (missingShards.length() >0) { if (missingShards.length() > 0) {
//Status red - shard is missing all copies and would produce partial results for an index search //Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["+ missingShards + final String msg = "Search rejected due to missing shards ["+ missingShards +
"]. Consider using `allow_partial_search_results` setting to bypass this error."; "]. Consider using `allow_partial_search_results` setting to bypass this error.";

View File

@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class SearchAsyncActionTests extends ESTestCase { public class SearchAsyncActionTests extends ESTestCase {
@ -371,6 +372,102 @@ public class SearchAsyncActionTests extends ESTestCase {
executor.shutdown(); executor.shutdown();
} }
public void testAllowPartialResults() throws InterruptedException {
SearchRequest request = new SearchRequest();
request.allowPartialSearchResults(false);
int numConcurrent = randomIntBetween(1, 5);
request.setMaxConcurrentShardRequests(numConcurrent);
int numShards = randomIntBetween(5, 10);
AtomicBoolean searchPhaseDidRun = new AtomicBoolean(false);
ActionListener<SearchResponse> responseListener = ActionListener.wrap(response -> {},
(e) -> { throw new AssertionError("unexpected", e);} );
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
// for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now
DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS),
numShards, true, primaryNode, replicaNode);
int numShardAttempts = 0;
for (SearchShardIterator it : shardsIter) {
numShardAttempts += it.remaining();
}
CountDownLatch latch = new CountDownLatch(numShardAttempts);
SearchTransportService transportService = new SearchTransportService(null, null);
Map<String, Transport.Connection> lookup = new HashMap<>();
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
AtomicInteger numRequests = new AtomicInteger(0);
AtomicInteger numFailReplicas = new AtomicInteger(0);
AbstractSearchAsyncAction<TestSearchPhaseResult> asyncAction =
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
"test",
logger,
transportService,
(cluster, node) -> {
assert cluster == null : "cluster was not null: " + cluster;
return lookup.get(node); },
aliasFilters,
Collections.emptyMap(),
Collections.emptyMap(),
null,
request,
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
null,
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<TestSearchPhaseResult> listener) {
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per shard copy
return Boolean.TRUE;
});
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
if (shardIt.remaining() > 0) {
numFailReplicas.incrementAndGet();
listener.onFailure(new RuntimeException());
} else {
listener.onResponse(testSearchPhaseResult);
}
}).start();
}
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() {
assertTrue(searchPhaseDidRun.compareAndSet(false, true));
}
};
}
@Override
protected void executeNext(Runnable runnable, Thread originalThread) {
super.executeNext(runnable, originalThread);
latch.countDown();
}
};
asyncAction.start();
latch.await();
assertTrue(searchPhaseDidRun.get());
assertEquals(numShards, numRequests.get());
assertThat(numFailReplicas.get(), greaterThanOrEqualTo(1));
}
static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards, static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,
boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) { boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) {
ArrayList<SearchShardIterator> list = new ArrayList<>(); ArrayList<SearchShardIterator> list = new ArrayList<>();