Clarify when shard iterators get sorted (#52810)

Currently we have two ways to create a GroupShardsIterator: one that will resort the iterators based on their natural ordering, and another one that will leave them in their original order. This is currently done through two constructors, one that accepts a single argument which does the sorting, and another which accepts a second boolean argument to control whether sorting should happen or not. This second constructor is only called externally to disable the sorting.

By introducing a specific method to create a sorted shard iterator we clarify and make it easier to track when we do sort and when we do not as the iterators are externally sorted.
This commit is contained in:
Luca Cavanna 2020-02-26 13:58:20 +01:00 committed by GitHub
parent 304e1e69b8
commit 9e38125464
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 66 additions and 34 deletions

View File

@ -116,8 +116,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators, false);
this.shardsIts = new GroupShardsIterator<>(iterators, false);
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result

View File

@ -113,7 +113,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(source);
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()), false);
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()));
}
private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
@ -122,7 +122,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
return IntStream.range(0, shardsIts.size())
.boxed()
.sorted(shardComparator(shardsIts, minAndMaxes, order))
.map(ord -> shardsIts.get(ord))
.map(shardsIts::get)
.collect(Collectors.toList());
}

View File

@ -553,10 +553,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
return new GroupShardsIterator<>(shards);
return GroupShardsIterator.sortAndCreate(shards);
}
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
@ -572,8 +572,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
task,
searchRequest,
iter,
timeProvider,
connectionLookup,
clusterStateVersion,
aliasFilter,
concreteIndexBoosts,
indexRoutings,
listener,
false,
clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() {

View File

@ -35,19 +35,19 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
private final List<ShardIt> iterators;
/**
* Constructs a enw GroupShardsIterator from the given list.
* Constructs a new sorted GroupShardsIterator from the given list. Items are sorted based on their natural ordering.
* @see PlainShardIterator#compareTo(ShardIterator)
* @see org.elasticsearch.action.search.SearchShardIterator#compareTo(ShardIterator)
*/
public GroupShardsIterator(List<ShardIt> iterators) {
this(iterators, true);
public static <ShardIt extends ShardIterator> GroupShardsIterator<ShardIt> sortAndCreate(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
return new GroupShardsIterator<>(iterators);
}
/**
* Constructs a new GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators, boolean useSort) {
if (useSort) {
CollectionUtil.timSort(iterators);
}
public GroupShardsIterator(List<ShardIt> iterators) {
this.iterators = iterators;
}

View File

@ -136,7 +136,7 @@ public class OperationRouting {
set.add(iterator);
}
}
return new GroupShardsIterator<>(new ArrayList<>(set));
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
}
private static final Map<String, Set<String>> EMPTY_ROUTING = Collections.emptyMap();

View File

@ -260,7 +260,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
}
}
}
return new GroupShardsIterator<>(set);
return GroupShardsIterator.sortAndCreate(set);
}
public ShardsIterator allShards(String[] indices) {
@ -321,7 +321,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
}
}
}
return new GroupShardsIterator<>(set);
return GroupShardsIterator.sortAndCreate(set);
}
@Override

View File

@ -69,7 +69,7 @@ public class GroupShardsIteratorTests extends ESTestCase {
ShardId shardId = new ShardId(index, 1);
list.add(new PlainShardIterator(shardId, randomShardRoutings(shardId, 0)));
}
GroupShardsIterator iter = new GroupShardsIterator<>(list);
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
assertEquals(7, iter.totalSizeWith1ForEmpty());
assertEquals(5, iter.size());
assertEquals(6, iter.totalSize());
@ -106,13 +106,24 @@ public class GroupShardsIteratorTests extends ESTestCase {
}
Collections.shuffle(list, random());
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
{
GroupShardsIterator<ShardIterator> unsorted = new GroupShardsIterator<>(list);
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
List<ShardIterator> actualIterators = new ArrayList<>();
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
}
assertEquals(actualIterators, list);
}
{
GroupShardsIterator<ShardIterator> iter = GroupShardsIterator.sortAndCreate(list);
List<ShardIterator> actualIterators = new ArrayList<>();
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
}
CollectionUtil.timSort(actualIterators);
assertEquals(actualIterators, list);
}
CollectionUtil.timSort(actualIterators);
assertEquals(actualIterators, list);
}
public void testOrderingWithSearchShardIterators() {
@ -123,7 +134,7 @@ public class GroupShardsIteratorTests extends ESTestCase {
String[] clusters = generateRandomStringArray(5, 10, false, false);
Arrays.sort(clusters);
List<SearchShardIterator> expected = new ArrayList<>();
List<SearchShardIterator> sorted = new ArrayList<>();
int numShards = randomIntBetween(1, 10);
for (int i = 0; i < numShards; i++) {
for (String index : indices) {
@ -131,23 +142,33 @@ public class GroupShardsIteratorTests extends ESTestCase {
ShardId shardId = new ShardId(index, uuid, i);
SearchShardIterator shardIterator = new SearchShardIterator(null, shardId,
GroupShardsIteratorTests.randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices());
expected.add(shardIterator);
sorted.add(shardIterator);
for (String cluster : clusters) {
SearchShardIterator remoteIterator = new SearchShardIterator(cluster, shardId,
GroupShardsIteratorTests.randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices());
expected.add(remoteIterator);
sorted.add(remoteIterator);
}
}
}
}
List<SearchShardIterator> shuffled = new ArrayList<>(expected);
List<SearchShardIterator> shuffled = new ArrayList<>(sorted);
Collections.shuffle(shuffled, random());
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = new GroupShardsIterator<>(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
{
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = new GroupShardsIterator<>(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
}
assertEquals(shuffled, actualIterators);
}
{
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = GroupShardsIterator.sortAndCreate(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
}
assertEquals(sorted, actualIterators);
}
assertEquals(expected, actualIterators);
}
}