Fix IndexShardRoutingTable's shard randomization to not throw out-of-bounds

exceptions.

Close #5559
This commit is contained in:
Adrien Grand 2014-03-27 00:03:17 +01:00
parent d3065acf14
commit 800d4624c8
9 changed files with 264 additions and 115 deletions

View File

@ -24,6 +24,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import jsr166y.ThreadLocalRandom;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -36,7 +37,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
@ -58,6 +58,7 @@ import static com.google.common.collect.Lists.newArrayList;
public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
private final String index;
private final ShardShuffler shuffler;
// note, we assume that when the index routing is created, ShardRoutings are created for all possible number of
// shards with state set to UNASSIGNED
@ -66,10 +67,9 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
private final ImmutableList<ShardRouting> allShards;
private final ImmutableList<ShardRouting> allActiveShards;
private final AtomicInteger counter = new AtomicInteger();
IndexRoutingTable(String index, ImmutableOpenIntMap<IndexShardRoutingTable> shards) {
this.index = index;
this.shuffler = new RotationShardShuffler(ThreadLocalRandom.current().nextInt());
this.shards = shards;
ImmutableList.Builder<ShardRouting> allShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> allActiveShards = ImmutableList.builder();
@ -273,14 +273,14 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
* Returns an unordered iterator over all shards (including replicas).
*/
public ShardsIterator randomAllShardsIt() {
return new PlainShardsIterator(allShards, counter.incrementAndGet());
return new PlainShardsIterator(shuffler.shuffle(allShards));
}
/**
* Returns an unordered iterator over all active shards (including replicas).
*/
public ShardsIterator randomAllActiveShardsIt() {
return new PlainShardsIterator(allActiveShards, counter.incrementAndGet());
return new PlainShardsIterator(shuffler.shuffle(allActiveShards));
}
/**

View File

@ -32,7 +32,6 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
@ -45,6 +44,7 @@ import static com.google.common.collect.Lists.newArrayList;
*/
public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ShardShuffler shuffler;
final ShardId shardId;
final ShardRouting primary;
@ -60,15 +60,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
*/
final ImmutableList<ShardRouting> allInitializingShards;
final AtomicInteger counter;
final boolean primaryAllocatedPostApi;
IndexShardRoutingTable(ShardId shardId, ImmutableList<ShardRouting> shards, boolean primaryAllocatedPostApi) {
this.shardId = shardId;
this.shuffler = new RotationShardShuffler(ThreadLocalRandom.current().nextInt());
this.shards = shards;
this.primaryAllocatedPostApi = primaryAllocatedPostApi;
this.counter = new AtomicInteger(ThreadLocalRandom.current().nextInt(shards.size()));
ShardRouting primary = null;
ImmutableList.Builder<ShardRouting> replicas = ImmutableList.builder();
@ -259,27 +257,27 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shards, pickIndex());
return new PlainShardIterator(shardId, shuffler.shuffle(shards));
}
public ShardIterator shardsIt() {
return new PlainShardIterator(shardId, shards);
}
public ShardIterator shardsIt(int index) {
return new PlainShardIterator(shardId, shards, index);
public ShardIterator shardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(shards, seed));
}
public ShardIterator activeShardsRandomIt() {
return new PlainShardIterator(shardId, activeShards, pickIndex());
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards));
}
public ShardIterator activeShardsIt() {
return new PlainShardIterator(shardId, activeShards);
}
public ShardIterator activeShardsIt(int index) {
return new PlainShardIterator(shardId, activeShards, index);
public ShardIterator activeShardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
}
/**
@ -287,33 +285,33 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsRandomIt() {
return activeInitializingShardsIt(pickIndex());
return activeInitializingShardsIt(shuffler.nextSeed());
}
/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsIt(int index) {
public ShardIterator activeInitializingShardsIt(int seed) {
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId, activeShards, index);
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
}
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
addToListFromIndex(activeShards, ordered, index);
ordered.addAll(shuffler.shuffle(activeShards, seed));
ordered.addAll(allInitializingShards);
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator assignedShardsRandomIt() {
return new PlainShardIterator(shardId, assignedShards, pickIndex());
return new PlainShardIterator(shardId, shuffler.shuffle(assignedShards));
}
public ShardIterator assignedShardsIt() {
return new PlainShardIterator(shardId, assignedShards);
}
public ShardIterator assignedShardsIt(int index) {
return new PlainShardIterator(shardId, assignedShards, index);
public ShardIterator assignedShardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(assignedShards, seed));
}
/**
@ -334,14 +332,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
public ShardIterator primaryFirstActiveInitializingShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
int index = Math.abs(pickIndex());
for (int i = 0; i < activeShards.size(); i++) {
int loc = (index + i) % activeShards.size();
ShardRouting shardRouting = activeShards.get(loc);
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
ordered.add(shardRouting);
if (shardRouting.primary()) {
// switch, its the matching node id
ordered.set(i, ordered.get(0));
ordered.set(ordered.size() - 1, ordered.get(0));
ordered.set(0, shardRouting);
}
}
@ -373,14 +368,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
int index = pickIndex();
for (int i = 0; i < activeShards.size(); i++) {
int loc = (index + i) % activeShards.size();
ShardRouting shardRouting = activeShards.get(loc);
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
ordered.add(shardRouting);
if (nodeId.equals(shardRouting.currentNodeId())) {
// switch, its the matching node id
ordered.set(i, ordered.get(0));
ordered.set(ordered.size() - 1, ordered.get(0));
ordered.set(0, shardRouting);
}
}
@ -474,10 +466,10 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
return preferAttributesActiveInitializingShardsIt(attributes, nodes, pickIndex());
return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed());
}
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int index) {
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int seed) {
AttributesKey key = new AttributesKey(attributes);
AttributesRoutings activeRoutings = getActiveAttribute(key, nodes);
AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes);
@ -485,11 +477,10 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
// we now randomize, once between the ones that have the same attributes, and once for the ones that don't
// we don't want to mix between the two!
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeRoutings.totalSize + initializingRoutings.totalSize);
index = Math.abs(index);
addToListFromIndex(activeRoutings.withSameAttribute, ordered, index);
addToListFromIndex(activeRoutings.withoutSameAttribute, ordered, index);
addToListFromIndex(initializingRoutings.withSameAttribute, ordered, index);
addToListFromIndex(initializingRoutings.withoutSameAttribute, ordered, index);
ordered.addAll(shuffler.shuffle(activeRoutings.withSameAttribute, seed));
ordered.addAll(shuffler.shuffle(activeRoutings.withoutSameAttribute, seed));
ordered.addAll(shuffler.shuffle(initializingRoutings.withSameAttribute, seed));
ordered.addAll(shuffler.shuffle(initializingRoutings.withoutSameAttribute, seed));
return new PlainShardIterator(shardId, ordered);
}
@ -525,23 +516,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return shards;
}
/**
* Adds from list to list, starting from the given index (wrapping around if needed).
*/
@SuppressWarnings("unchecked")
private void addToListFromIndex(List from, List to, int index) {
index = Math.abs(index);
for (int i = 0; i < from.size(); i++) {
int loc = (index + i) % from.size();
to.add(from.get(loc));
}
}
// TODO: we can move to random based on ThreadLocalRandom, or make it pluggable
private int pickIndex() {
return Math.abs(counter.incrementAndGet());
}
public static class Builder {
private ShardId shardId;

View File

@ -43,18 +43,6 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter
this.shardId = shardId;
}
/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
*
* @param shardId shard id of the group
* @param shards shards to iterate
* @param index the offset in the shards list to start the iteration from
*/
public PlainShardIterator(ShardId shardId, List<ShardRouting> shards, int index) {
super(shards, index);
this.shardId = shardId;
}
@Override
public ShardId shardId() {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing;
import java.util.List;
import java.util.ListIterator;
/**
* A simple {@link ShardsIterator} that iterates a list or sub-list of
@ -28,76 +29,50 @@ public class PlainShardsIterator implements ShardsIterator {
private final List<ShardRouting> shards;
private final int size;
private final int index;
private final int limit;
private volatile int counter;
private ListIterator<ShardRouting> iterator;
public PlainShardsIterator(List<ShardRouting> shards) {
this(shards, 0);
}
public PlainShardsIterator(List<ShardRouting> shards, int index) {
this.shards = shards;
this.size = shards.size();
if (size == 0) {
this.index = 0;
} else {
this.index = Math.abs(index % size);
}
this.counter = this.index;
this.limit = this.index + size;
this.iterator = shards.listIterator();
}
@Override
public void reset() {
this.counter = this.index;
iterator = shards.listIterator();
}
@Override
public int remaining() {
return limit - counter;
return shards.size() - iterator.nextIndex();
}
@Override
public ShardRouting firstOrNull() {
if (size == 0) {
if (shards.isEmpty()) {
return null;
}
return shards.get(index);
return shards.get(0);
}
@Override
public ShardRouting nextOrNull() {
if (size == 0) {
return null;
}
int counter = (this.counter);
if (counter >= size) {
if (counter >= limit) {
return null;
}
this.counter = counter + 1;
return shards.get(counter - size);
if (iterator.hasNext()) {
return iterator.next();
} else {
this.counter = counter + 1;
return shards.get(counter);
return null;
}
}
@Override
public int size() {
return size;
return shards.size();
}
@Override
public int sizeActive() {
int count = 0;
for (int i = 0; i < size; i++) {
if (shards.get(i).active()) {
for (ShardRouting shard : shards) {
if (shard.active()) {
count++;
}
}
@ -107,8 +82,7 @@ public class PlainShardsIterator implements ShardsIterator {
@Override
public int assignedReplicasIncludingRelocating() {
int count = 0;
for (int i = 0; i < size; i++) {
ShardRouting shard = shards.get(i);
for (ShardRouting shard : shards) {
if (shard.unassigned()) {
continue;
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Basic {@link ShardShuffler} implementation that uses an {@link AtomicInteger} to generate seeds and uses a rotation to permute shards.
*/
public class RotationShardShuffler extends ShardShuffler {
private final AtomicInteger seed;
public RotationShardShuffler(int seed) {
this.seed = new AtomicInteger(seed);
}
@Override
public int nextSeed() {
return seed.getAndIncrement();
}
@Override
public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) {
return CollectionUtils.rotate(shards, seed);
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import java.util.List;
/**
* A shuffler for shards whose primary goal is to balance load.
*/
public abstract class ShardShuffler {
/**
* Return a new seed.
*/
public abstract int nextSeed();
/**
* Return a shuffled view over the list of shards. The behavior of this method must be deterministic: if the same list and the same seed
* are provided twice, then the result needs to be the same.
*/
public abstract List<ShardRouting> shuffle(List<ShardRouting> shards, int seed);
/**
* Equivalent to calling <code>shuffle(shards, nextSeed())</code>.
*/
public List<ShardRouting> shuffle(List<ShardRouting> shards) {
return shuffle(shards, nextSeed());
}
}

View File

@ -24,6 +24,11 @@ import com.carrotsearch.hppc.FloatArrayList;
import com.carrotsearch.hppc.LongArrayList;
import com.google.common.primitives.Longs;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.common.Preconditions;
import java.util.AbstractList;
import java.util.List;
import java.util.RandomAccess;
/** Collections-related utility methods. */
public enum CollectionUtils {
@ -199,4 +204,52 @@ public enum CollectionUtils {
return array == null || array.length == 0;
}
/**
* Return a rotated view of the given list with the given distance.
*/
public static <T> List<T> rotate(final List<T> list, int distance) {
if (list.isEmpty()) {
return list;
}
int d = distance % list.size();
if (d < 0) {
d += list.size();
}
if (d == 0) {
return list;
}
return new RotatedList<>(list, d);
}
private static class RotatedList<T> extends AbstractList<T> implements RandomAccess {
private final List<T> in;
private final int distance;
public RotatedList(List<T> list, int distance) {
Preconditions.checkArgument(distance >= 0 && distance < list.size());
Preconditions.checkArgument(list instanceof RandomAccess);
this.in = list;
this.distance = distance;
}
@Override
public T get(int index) {
int idx = distance + index;
if (idx < 0 || idx >= in.size()) {
idx -= in.size();
}
return in.get(idx);
}
@Override
public int size() {
return in.size();
}
};
}

View File

@ -43,7 +43,8 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
@Test
public void testEmptyIterator() {
ShardIterator shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 0);
ShardShuffler shuffler = new RotationShardShuffler(0);
ShardIterator shardIterator = new PlainShardIterator(new ShardId("test1", 0), shuffler.shuffle(ImmutableList.<ShardRouting>of()));
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
@ -52,7 +53,7 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 1);
shardIterator = new PlainShardIterator(new ShardId("test1", 0), shuffler.shuffle(ImmutableList.<ShardRouting>of()));
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
@ -61,7 +62,7 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 2);
shardIterator = new PlainShardIterator(new ShardId("test1", 0), shuffler.shuffle(ImmutableList.<ShardRouting>of()));
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
@ -70,7 +71,7 @@ public class RoutingIteratorTests extends ElasticsearchAllocationTestCase {
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 3);
shardIterator = new PlainShardIterator(new ShardId("test1", 0), shuffler.shuffle(ImmutableList.<ShardRouting>of()));
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
public class CollectionUtilsTests extends ElasticsearchTestCase {
@Test
public void rotateEmpty() {
assertTrue(CollectionUtils.rotate(ImmutableList.of(), randomInt()).isEmpty());
}
@Test
public void rotate() {
final int iters = scaledRandomIntBetween(10, 100);
for (int k = 0; k < iters; ++k) {
final int size = randomIntBetween(1, 100);
final int distance = randomInt();
List<Object> list = new ArrayList<>();
for (int i = 0; i < size; ++i) {
list.add(new Object());
}
final List<Object> rotated = CollectionUtils.rotate(list, distance);
// check content is the same
assertEquals(rotated.size(), list.size());
assertEquals(Iterables.size(rotated), list.size());
assertEquals(new HashSet<>(rotated), new HashSet<>(list));
// check stability
for (int j = randomInt(4); j >= 0; --j) {
assertEquals(rotated, CollectionUtils.rotate(list, distance));
}
// reverse
if (distance != Integer.MIN_VALUE) {
assertEquals(list, CollectionUtils.rotate(CollectionUtils.rotate(list, distance), -distance));
}
}
}
}