From 90fff54954c0413003f0d3dba3752b3c7b674f55 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 11 Feb 2019 16:00:54 +0100 Subject: [PATCH] Tie break on cluster alias when merging shard search failures (#38715) A recent test failure triggered an edge case scenario where failures may be coming back with the same shard id, yet from different clusters. This commit adapts the failures comparator to take the cluster alias into account when merging failures as part of CCS requests execution. Also the corresponding test has been split in two: with and without search shard target set to the failure. Closes #38672 --- .../action/search/SearchResponseMerger.java | 20 ++++++- .../search/SearchResponseMergerTests.java | 58 +++++++++++++++---- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 3b28ca19477..999ea9b2054 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -204,7 +204,25 @@ final class SearchResponseMerger { if (shardId2 == null) { return 1; } - return shardId1.compareTo(shardId2); + int shardIdCompare = shardId1.compareTo(shardId2); + //we could assume that the same shard id cannot come back from multiple clusters as even with same index name and shard index, + //the index uuid does not match. But the same cluster can be registered multiple times with different aliases, in which case + //we may get failures from the same index, yet with a different cluster alias in their shard target. + if (shardIdCompare != 0) { + return shardIdCompare; + } + String clusterAlias1 = o1.shard() == null ? null : o1.shard().getClusterAlias(); + String clusterAlias2 = o2.shard() == null ? null : o2.shard().getClusterAlias(); + if (clusterAlias1 == null && clusterAlias2 == null) { + return 0; + } + if (clusterAlias1 == null) { + return -1; + } + if (clusterAlias2 == null) { + return 1; + } + return clusterAlias1.compareTo(clusterAlias2); } private ShardId extractShardId(ShardSearchFailure failure) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index 712d6a60440..c32ff7b88f8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -109,7 +109,14 @@ public class SearchResponseMergerTests extends ESTestCase { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, searchTimeProvider, flag -> null); - PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); + PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1, + (o1, o2) -> { + int compareTo = o1.getShardId().compareTo(o2.getShardId()); + if (compareTo != 0) { + return compareTo; + } + return o1.getClusterAlias().compareTo(o2.getClusterAlias()); + })); int numIndices = numResponses * randomIntBetween(1, 3); Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); for (int i = 0; i < numResponses; i++) { @@ -120,15 +127,46 @@ public class SearchResponseMergerTests extends ESTestCase { ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; for (int j = 0; j < numFailures; j++) { ShardId shardId = new ShardId(randomFrom(indices), j); - ShardSearchFailure failure; - if (randomBoolean()) { - SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); - failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget); - } else { - ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException()); - elasticsearchException.setShard(shardId); - failure = new ShardSearchFailure(elasticsearchException); - } + SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); + ShardSearchFailure failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget); + shardSearchFailures[j] = failure; + priorityQueue.add(Tuple.tuple(searchShardTarget, failure)); + } + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, + 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + addResponse(merger, searchResponse); + } + awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(priorityQueue.size(), mergedResponse.getFailedShards()); + ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures(); + assertEquals(priorityQueue.size(), shardFailures.length); + for (ShardSearchFailure shardFailure : shardFailures) { + ShardSearchFailure expected = priorityQueue.poll().v2(); + assertSame(expected, shardFailure); + } + } + + public void testMergeShardFailuresNullShardTarget() throws InterruptedException { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, flag -> null); + PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); + for (int i = 0; i < numResponses; i++) { + int numFailures = randomIntBetween(1, 10); + ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; + for (int j = 0; j < numFailures; j++) { + String index = "index-" + i; + ShardId shardId = new ShardId(index, index + "-uuid", j); + ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException()); + elasticsearchException.setShard(shardId); + ShardSearchFailure failure = new ShardSearchFailure(elasticsearchException); shardSearchFailures[j] = failure; priorityQueue.add(Tuple.tuple(shardId, failure)); }