[CORE] Ensure GroupShardsIterator is consistent across requests

GroupShardsIterator is used in many places like the search execution
to determin which shards to query. This can hold shards of one index
as well as shards of multiple indices. The iteration order is used
to assigne a per-request shard ID for each shard that is used as a
tie-breaker when scores are the same. Today the iteration order is
soely depending on the HashMap iteration order which is undefined or
rather implementation dependent. This causes search requests to return
inconsistent results across requests if for instance different nodes
are coordinating the requests.

Simple queries like `match_all` may return results in arbitrary order
if pagination is used or may even return different results for the same
request even though there hasn't been a refresh call and preferences are
used.
This commit is contained in:
Simon Willnauer 2014-09-11 20:59:30 +02:00
parent 929a4a54f7
commit a3f2677b70
7 changed files with 39 additions and 24 deletions

View File

@ -19,19 +19,25 @@
package org.elasticsearch.cluster.routing;
import java.util.Collection;
import java.util.Iterator;
import org.apache.lucene.util.CollectionUtil;
import java.util.*;
/**
* This class implements a compilation of {@link ShardIterator}s. Each {@link ShardIterator}
* iterated by this {@link Iterable} represents a group of shards.
*
* ShardsIterators are always returned in ascending order independently of their order at construction
* time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs.
*/
public class GroupShardsIterator implements Iterable<ShardIterator> {
private final Collection<ShardIterator> iterators;
private final List<ShardIterator> iterators;
public GroupShardsIterator(Collection<ShardIterator> iterators) {
/**
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIterator> iterators) {
CollectionUtil.timSort(iterators);
this.iterators = iterators;
}
@ -72,14 +78,6 @@ public class GroupShardsIterator implements Iterable<ShardIterator> {
return iterators.size();
}
/**
* Return all group iterators
* @return
*/
public Collection<ShardIterator> iterators() {
return iterators;
}
@Override
public Iterator<ShardIterator> iterator() {
return iterators.iterator();

View File

@ -60,4 +60,9 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter
public int hashCode() {
return shardId.hashCode();
}
@Override
public int compareTo(ShardIterator o) {
return shardId.compareTo(o.shardId());
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.index.shard.ShardId;
/**
* Allows to iterate over a set of shard instances (routing) within a shard id group.
*/
public interface ShardIterator extends ShardsIterator {
public interface ShardIterator extends ShardsIterator, Comparable<ShardIterator> {
/**
* The shard id this group relates to.

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.routing.operation.plain;
import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -41,10 +43,7 @@ import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
*
@ -107,7 +106,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
}
set.add(indexShard.shardsRandomIt());
}
return new GroupShardsIterator(set);
return new GroupShardsIterator(Lists.newArrayList(set));
}
@Override
@ -126,7 +125,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
set.add(iterator);
}
}
return new GroupShardsIterator(set);
return new GroupShardsIterator(Lists.newArrayList(set));
}
private static final Map<String, Set<String>> EMPTY_ROUTING = Collections.emptyMap();

View File

@ -30,7 +30,7 @@ import java.io.Serializable;
/**
* Allows for shard level components to be injected with the shard id.
*/
public class ShardId implements Serializable, Streamable {
public class ShardId implements Serializable, Streamable, Comparable<ShardId> {
private Index index;
@ -110,4 +110,12 @@ public class ShardId implements Serializable, Streamable {
index.writeTo(out);
out.writeVInt(shardId);
}
@Override
public int compareTo(ShardId o) {
if (o.getId() == shardId) {
return index.name().compareTo(o.getIndex());
}
return Integer.compare(shardId, o.getId());
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.store;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets;
@ -264,7 +265,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[] {"test"}, false);
for (ShardIterator iterator : shardIterators.iterators()) {
for (ShardIterator iterator : shardIterators) {
ShardRouting routing;
while ((routing = iterator.nextOrNull()) != null) {
if (routing.getId() == shardRouting.getId()) {
@ -449,7 +450,8 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
private ShardRouting corruptRandomFile(final boolean includePerCommitFiles) throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators());
List<ShardIterator> iterators = Lists.newArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.store;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
@ -40,6 +41,7 @@ import java.io.FileFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@ -105,7 +107,8 @@ public class CorruptedTranslogTests extends ElasticsearchIntegrationTest {
private void corruptRandomTranslogFiles() throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators());
List<ShardIterator> iterators = Lists.newArrayList(shardIterators);
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), iterators);
ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());