diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 3b28ca19477..999ea9b2054 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -204,7 +204,25 @@ final class SearchResponseMerger { if (shardId2 == null) { return 1; } - return shardId1.compareTo(shardId2); + int shardIdCompare = shardId1.compareTo(shardId2); + //we could assume that the same shard id cannot come back from multiple clusters as even with same index name and shard index, + //the index uuid does not match. But the same cluster can be registered multiple times with different aliases, in which case + //we may get failures from the same index, yet with a different cluster alias in their shard target. + if (shardIdCompare != 0) { + return shardIdCompare; + } + String clusterAlias1 = o1.shard() == null ? null : o1.shard().getClusterAlias(); + String clusterAlias2 = o2.shard() == null ? null : o2.shard().getClusterAlias(); + if (clusterAlias1 == null && clusterAlias2 == null) { + return 0; + } + if (clusterAlias1 == null) { + return -1; + } + if (clusterAlias2 == null) { + return 1; + } + return clusterAlias1.compareTo(clusterAlias2); } private ShardId extractShardId(ShardSearchFailure failure) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index 712d6a60440..c32ff7b88f8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -109,7 +109,14 @@ public class SearchResponseMergerTests extends ESTestCase { SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, searchTimeProvider, flag -> null); - PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); + PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1, + (o1, o2) -> { + int compareTo = o1.getShardId().compareTo(o2.getShardId()); + if (compareTo != 0) { + return compareTo; + } + return o1.getClusterAlias().compareTo(o2.getClusterAlias()); + })); int numIndices = numResponses * randomIntBetween(1, 3); Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); for (int i = 0; i < numResponses; i++) { @@ -120,15 +127,46 @@ public class SearchResponseMergerTests extends ESTestCase { ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; for (int j = 0; j < numFailures; j++) { ShardId shardId = new ShardId(randomFrom(indices), j); - ShardSearchFailure failure; - if (randomBoolean()) { - SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); - failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget); - } else { - ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException()); - elasticsearchException.setShard(shardId); - failure = new ShardSearchFailure(elasticsearchException); - } + SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); + ShardSearchFailure failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget); + shardSearchFailures[j] = failure; + priorityQueue.add(Tuple.tuple(searchShardTarget, failure)); + } + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, + 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + addResponse(merger, searchResponse); + } + awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(priorityQueue.size(), mergedResponse.getFailedShards()); + ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures(); + assertEquals(priorityQueue.size(), shardFailures.length); + for (ShardSearchFailure shardFailure : shardFailures) { + ShardSearchFailure expected = priorityQueue.poll().v2(); + assertSame(expected, shardFailure); + } + } + + public void testMergeShardFailuresNullShardTarget() throws InterruptedException { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, flag -> null); + PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); + for (int i = 0; i < numResponses; i++) { + int numFailures = randomIntBetween(1, 10); + ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; + for (int j = 0; j < numFailures; j++) { + String index = "index-" + i; + ShardId shardId = new ShardId(index, index + "-uuid", j); + ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException()); + elasticsearchException.setShard(shardId); + ShardSearchFailure failure = new ShardSearchFailure(elasticsearchException); shardSearchFailures[j] = failure; priorityQueue.add(Tuple.tuple(shardId, failure)); }