From 01ca95a64860f0ff5a389cda40ea6c1f273201de Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 6 Nov 2015 20:18:01 +0100 Subject: [PATCH] Only allow rebalance operations to run if all shard store data is available This commit prevents running rebalance operations if the store allocator is still fetching async shard / store data to prevent pre-mature rebalance decisions which need to be reverted once shard store data is available. This is typically happening on rolling restarts which can make those restarts extremely painful. Closes #14387 --- .../allocator/ShardsAllocators.java | 15 +++- .../ClusterRebalanceRoutingTests.java | 75 ++++++++++++++++++- .../test/ESAllocationTestCase.java | 7 ++ .../test/gateway/NoopGatewayAllocator.java | 2 +- 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index 8fb65bbfe9b..99134334191 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -76,7 +76,20 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat @Override public boolean rebalance(RoutingAllocation allocation) { - return allocator.rebalance(allocation); + final int numberOfInFlightFetch = gatewayAllocator.getNumberOfInFlightFetch(); + if (numberOfInFlightFetch == 0) { + /* + * see https://github.com/elastic/elasticsearch/issues/14387 + * if we allow rebalance operations while we are still fetching shard store data + * we might end up with unnecessary rebalance operations which can be super confusion/frustrating + * since once the fetches come back we might just move all the shards back again. + * Therefore we only do a rebalance if we have fetched all information. + */ + return allocator.rebalance(allocation); + } else { + logger.debug("skip rebalance [{}] shard store fetch operations are still in-flight", numberOfInFlightFetch); + } + return false; } @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java index af71688e19a..9177bd067e5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java @@ -30,10 +30,11 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ESAllocationTestCase; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; @@ -624,4 +625,72 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase { assertThat(routingNodes.node("node3").isEmpty(), equalTo(true)); } + + public void testRebalanceWhileShardFetching() { + final AtomicInteger numFetch = new AtomicInteger(1); + AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() { + @Override + public int getNumberOfInFlightFetch() { + return numFetch.get(); + } + }); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + logger.info("start two nodes"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + } + + logger.debug("start all the primary shards for test"); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test", INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + } + + logger.debug("now, start 1 more node, check that rebalancing will not happen since we have shard sync going on"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .put(newNode("node2"))) + .build(); + + RoutingAllocation.Result reroute = strategy.reroute(clusterState); + assertFalse(reroute.changed()); + numFetch.set(0); + reroute = strategy.reroute(clusterState); + assertTrue(reroute.changed()); + routingTable = reroute.routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + int numStarted = 0; + int numRelocating = 0; + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1)); + if (routingTable.index("test").shard(i).primaryShard().state() == STARTED) { + numStarted++; + } else if (routingTable.index("test").shard(i).primaryShard().state() == RELOCATING) { + numRelocating++; + } + } + assertEquals(numStarted, 1); + assertEquals(numRelocating, 1); + + } } \ No newline at end of file diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java index d12f4d68752..767b1683f2c 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.gateway.NoopGatewayAllocator; @@ -79,6 +80,12 @@ public abstract class ESAllocationTestCase extends ESTestCase { new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); } + public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { + return new AllocationService(settings, + randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), + new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE); + } + public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) { diff --git a/test-framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java b/test-framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java index ac582511032..825b203022d 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java +++ b/test-framework/src/main/java/org/elasticsearch/test/gateway/NoopGatewayAllocator.java @@ -32,7 +32,7 @@ public class NoopGatewayAllocator extends GatewayAllocator { public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator(); - private NoopGatewayAllocator() { + protected NoopGatewayAllocator() { super(Settings.EMPTY, null, null); }