Tie break on cluster alias when merging shard search failures (#38715)
A recent test failure triggered an edge case scenario where failures may be coming back with the same shard id, yet from different clusters. This commit adapts the failures comparator to take the cluster alias into account when merging failures as part of CCS requests execution. Also the corresponding test has been split in two: with and without search shard target set to the failure. Closes #38672
This commit is contained in:
parent
6ae7915b9d
commit
90fff54954
|
@ -204,7 +204,25 @@ final class SearchResponseMerger {
|
|||
if (shardId2 == null) {
|
||||
return 1;
|
||||
}
|
||||
return shardId1.compareTo(shardId2);
|
||||
int shardIdCompare = shardId1.compareTo(shardId2);
|
||||
//we could assume that the same shard id cannot come back from multiple clusters as even with same index name and shard index,
|
||||
//the index uuid does not match. But the same cluster can be registered multiple times with different aliases, in which case
|
||||
//we may get failures from the same index, yet with a different cluster alias in their shard target.
|
||||
if (shardIdCompare != 0) {
|
||||
return shardIdCompare;
|
||||
}
|
||||
String clusterAlias1 = o1.shard() == null ? null : o1.shard().getClusterAlias();
|
||||
String clusterAlias2 = o2.shard() == null ? null : o2.shard().getClusterAlias();
|
||||
if (clusterAlias1 == null && clusterAlias2 == null) {
|
||||
return 0;
|
||||
}
|
||||
if (clusterAlias1 == null) {
|
||||
return -1;
|
||||
}
|
||||
if (clusterAlias2 == null) {
|
||||
return 1;
|
||||
}
|
||||
return clusterAlias1.compareTo(clusterAlias2);
|
||||
}
|
||||
|
||||
private ShardId extractShardId(ShardSearchFailure failure) {
|
||||
|
|
|
@ -109,7 +109,14 @@ public class SearchResponseMergerTests extends ESTestCase {
|
|||
SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0);
|
||||
SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE,
|
||||
searchTimeProvider, flag -> null);
|
||||
PriorityQueue<Tuple<ShardId, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
|
||||
PriorityQueue<Tuple<SearchShardTarget, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1,
|
||||
(o1, o2) -> {
|
||||
int compareTo = o1.getShardId().compareTo(o2.getShardId());
|
||||
if (compareTo != 0) {
|
||||
return compareTo;
|
||||
}
|
||||
return o1.getClusterAlias().compareTo(o2.getClusterAlias());
|
||||
}));
|
||||
int numIndices = numResponses * randomIntBetween(1, 3);
|
||||
Iterator<Map.Entry<String, Index[]>> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator();
|
||||
for (int i = 0; i < numResponses; i++) {
|
||||
|
@ -120,15 +127,46 @@ public class SearchResponseMergerTests extends ESTestCase {
|
|||
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures];
|
||||
for (int j = 0; j < numFailures; j++) {
|
||||
ShardId shardId = new ShardId(randomFrom(indices), j);
|
||||
ShardSearchFailure failure;
|
||||
if (randomBoolean()) {
|
||||
SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null);
|
||||
failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget);
|
||||
} else {
|
||||
ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException());
|
||||
elasticsearchException.setShard(shardId);
|
||||
failure = new ShardSearchFailure(elasticsearchException);
|
||||
}
|
||||
SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null);
|
||||
ShardSearchFailure failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget);
|
||||
shardSearchFailures[j] = failure;
|
||||
priorityQueue.add(Tuple.tuple(searchShardTarget, failure));
|
||||
}
|
||||
SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null,
|
||||
1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY);
|
||||
addResponse(merger, searchResponse);
|
||||
}
|
||||
awaitResponsesAdded();
|
||||
assertEquals(numResponses, merger.numResponses());
|
||||
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
|
||||
SearchResponse mergedResponse = merger.getMergedResponse(clusters);
|
||||
assertSame(clusters, mergedResponse.getClusters());
|
||||
assertEquals(numResponses, mergedResponse.getTotalShards());
|
||||
assertEquals(numResponses, mergedResponse.getSuccessfulShards());
|
||||
assertEquals(0, mergedResponse.getSkippedShards());
|
||||
assertEquals(priorityQueue.size(), mergedResponse.getFailedShards());
|
||||
ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures();
|
||||
assertEquals(priorityQueue.size(), shardFailures.length);
|
||||
for (ShardSearchFailure shardFailure : shardFailures) {
|
||||
ShardSearchFailure expected = priorityQueue.poll().v2();
|
||||
assertSame(expected, shardFailure);
|
||||
}
|
||||
}
|
||||
|
||||
public void testMergeShardFailuresNullShardTarget() throws InterruptedException {
|
||||
SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0);
|
||||
SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE,
|
||||
searchTimeProvider, flag -> null);
|
||||
PriorityQueue<Tuple<ShardId, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
|
||||
for (int i = 0; i < numResponses; i++) {
|
||||
int numFailures = randomIntBetween(1, 10);
|
||||
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures];
|
||||
for (int j = 0; j < numFailures; j++) {
|
||||
String index = "index-" + i;
|
||||
ShardId shardId = new ShardId(index, index + "-uuid", j);
|
||||
ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException());
|
||||
elasticsearchException.setShard(shardId);
|
||||
ShardSearchFailure failure = new ShardSearchFailure(elasticsearchException);
|
||||
shardSearchFailures[j] = failure;
|
||||
priorityQueue.add(Tuple.tuple(shardId, failure));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue