From a3f2677b70211ee37783379846acb248d199eca3 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 11 Sep 2014 20:59:30 +0200 Subject: [PATCH] [CORE] Ensure GroupShardsIterator is consistent across requests GroupShardsIterator is used in many places like the search execution to determin which shards to query. This can hold shards of one index as well as shards of multiple indices. The iteration order is used to assigne a per-request shard ID for each shard that is used as a tie-breaker when scores are the same. Today the iteration order is soely depending on the HashMap iteration order which is undefined or rather implementation dependent. This causes search requests to return inconsistent results across requests if for instance different nodes are coordinating the requests. Simple queries like `match_all` may return results in arbitrary order if pagination is used or may even return different results for the same request even though there hasn't been a refresh call and preferences are used. --- .../cluster/routing/GroupShardsIterator.java | 24 +++++++++---------- .../cluster/routing/PlainShardIterator.java | 5 ++++ .../cluster/routing/ShardIterator.java | 2 +- .../plain/PlainOperationRouting.java | 11 ++++----- .../elasticsearch/index/shard/ShardId.java | 10 +++++++- .../index/store/CorruptedFileTest.java | 6 +++-- .../index/store/CorruptedTranslogTests.java | 5 +++- 7 files changed, 39 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java b/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java index 384f96bfed8..f391bf3d667 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java @@ -19,19 +19,25 @@ package org.elasticsearch.cluster.routing; -import java.util.Collection; -import java.util.Iterator; +import org.apache.lucene.util.CollectionUtil; + +import java.util.*; /** * This class implements a compilation of {@link ShardIterator}s. Each {@link ShardIterator} * iterated by this {@link Iterable} represents a group of shards. - * + * ShardsIterators are always returned in ascending order independently of their order at construction + * time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs. */ public class GroupShardsIterator implements Iterable { - private final Collection iterators; + private final List iterators; - public GroupShardsIterator(Collection iterators) { + /** + * Constructs a enw GroupShardsIterator from the given list. + */ + public GroupShardsIterator(List iterators) { + CollectionUtil.timSort(iterators); this.iterators = iterators; } @@ -72,14 +78,6 @@ public class GroupShardsIterator implements Iterable { return iterators.size(); } - /** - * Return all group iterators - * @return - */ - public Collection iterators() { - return iterators; - } - @Override public Iterator iterator() { return iterators.iterator(); diff --git a/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java b/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java index 4d5434fe7fa..5950bd35d37 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java @@ -60,4 +60,9 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter public int hashCode() { return shardId.hashCode(); } + + @Override + public int compareTo(ShardIterator o) { + return shardId.compareTo(o.shardId()); + } } diff --git a/src/main/java/org/elasticsearch/cluster/routing/ShardIterator.java b/src/main/java/org/elasticsearch/cluster/routing/ShardIterator.java index ceeb651f3ab..1302f89e449 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/ShardIterator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/ShardIterator.java @@ -24,7 +24,7 @@ import org.elasticsearch.index.shard.ShardId; /** * Allows to iterate over a set of shard instances (routing) within a shard id group. */ -public interface ShardIterator extends ShardsIterator { +public interface ShardIterator extends ShardsIterator, Comparable { /** * The shard id this group relates to. diff --git a/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java b/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java index 3c81c240ead..4376def984d 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java +++ b/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.routing.operation.plain; +import com.google.common.collect.Lists; +import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -41,10 +43,7 @@ import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexMissingException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * @@ -107,7 +106,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio } set.add(indexShard.shardsRandomIt()); } - return new GroupShardsIterator(set); + return new GroupShardsIterator(Lists.newArrayList(set)); } @Override @@ -126,7 +125,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio set.add(iterator); } } - return new GroupShardsIterator(set); + return new GroupShardsIterator(Lists.newArrayList(set)); } private static final Map> EMPTY_ROUTING = Collections.emptyMap(); diff --git a/src/main/java/org/elasticsearch/index/shard/ShardId.java b/src/main/java/org/elasticsearch/index/shard/ShardId.java index b7fe363e0d6..bba823569cc 100644 --- a/src/main/java/org/elasticsearch/index/shard/ShardId.java +++ b/src/main/java/org/elasticsearch/index/shard/ShardId.java @@ -30,7 +30,7 @@ import java.io.Serializable; /** * Allows for shard level components to be injected with the shard id. */ -public class ShardId implements Serializable, Streamable { +public class ShardId implements Serializable, Streamable, Comparable { private Index index; @@ -110,4 +110,12 @@ public class ShardId implements Serializable, Streamable { index.writeTo(out); out.writeVInt(shardId); } + + @Override + public int compareTo(ShardId o) { + if (o.getId() == shardId) { + return index.name().compareTo(o.getIndex()); + } + return Integer.compare(shardId, o.getId()); + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index fdc4ebcd5fd..e26bf78548d 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.store; +import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists; import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.google.common.base.Charsets; @@ -264,7 +265,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { assertThat(response.getStatus(), is(ClusterHealthStatus.RED)); ClusterState state = client().admin().cluster().prepareState().get().getState(); GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[] {"test"}, false); - for (ShardIterator iterator : shardIterators.iterators()) { + for (ShardIterator iterator : shardIterators) { ShardRouting routing; while ((routing = iterator.nextOrNull()) != null) { if (routing.getId() == shardRouting.getId()) { @@ -449,7 +450,8 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { private ShardRouting corruptRandomFile(final boolean includePerCommitFiles) throws IOException { ClusterState state = client().admin().cluster().prepareState().get().getState(); GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false); - ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators()); + List iterators = Lists.newArrayList(shardIterators); + ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), iterators); ShardRouting shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); assertTrue(shardRouting.primary()); diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java index c5491902a54..adbc0684889 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.store; +import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -40,6 +41,7 @@ import java.io.FileFilter; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -105,7 +107,8 @@ public class CorruptedTranslogTests extends ElasticsearchIntegrationTest { private void corruptRandomTranslogFiles() throws IOException { ClusterState state = client().admin().cluster().prepareState().get().getState(); GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false); - ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators()); + List iterators = Lists.newArrayList(shardIterators); + ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), iterators); ShardRouting shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); assertTrue(shardRouting.primary());