Only allow rebalance operations to run if all shard store data is available

This commit prevents running rebalance operations if the store allocator is
still fetching async shard / store data to prevent pre-mature rebalance decisions
which need to be reverted once shard store data is available. This is typically happening
on rolling restarts which can make those restarts extremely painful.

Closes #14387
This commit is contained in:
Simon Willnauer 2015-11-06 20:18:01 +01:00
parent 97644e3046
commit 01ca95a648
4 changed files with 94 additions and 5 deletions

View File

@ -76,7 +76,20 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
@Override
public boolean rebalance(RoutingAllocation allocation) {
return allocator.rebalance(allocation);
final int numberOfInFlightFetch = gatewayAllocator.getNumberOfInFlightFetch();
if (numberOfInFlightFetch == 0) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
return allocator.rebalance(allocation);
} else {
logger.debug("skip rebalance [{}] shard store fetch operations are still in-flight", numberOfInFlightFetch);
}
return false;
}
@Override

View File

@ -30,10 +30,11 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
@ -624,4 +625,72 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
assertThat(routingNodes.node("node3").isEmpty(), equalTo(true));
}
public void testRebalanceWhileShardFetching() {
final AtomicInteger numFetch = new AtomicInteger(1);
AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() {
@Override
public int getNumberOfInFlightFetch() {
return numFetch.get();
}
});
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("start two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
}
logger.debug("start all the primary shards for test");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test", INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
logger.debug("now, start 1 more node, check that rebalancing will not happen since we have shard sync going on");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node2")))
.build();
RoutingAllocation.Result reroute = strategy.reroute(clusterState);
assertFalse(reroute.changed());
numFetch.set(0);
reroute = strategy.reroute(clusterState);
assertTrue(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
int numStarted = 0;
int numRelocating = 0;
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
if (routingTable.index("test").shard(i).primaryShard().state() == STARTED) {
numStarted++;
} else if (routingTable.index("test").shard(i).primaryShard().state() == RELOCATING) {
numRelocating++;
}
}
assertEquals(numStarted, 1);
assertEquals(numRelocating, 1);
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
@ -79,6 +80,12 @@ public abstract class ESAllocationTestCase extends ESTestCase {
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
}
public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE);
}
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {

View File

@ -32,7 +32,7 @@ public class NoopGatewayAllocator extends GatewayAllocator {
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
private NoopGatewayAllocator() {
protected NoopGatewayAllocator() {
super(Settings.EMPTY, null, null);
}