From 7ce7fb33e5d37a0991b6b73965b3938b2dbb4565 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 30 Mar 2011 15:40:11 +0200 Subject: [PATCH] Shard Allocation: Allow to control how many cluster wide concurrent rebalance (relocation) are allowed, default to 3, closes #816. --- .../ConcurrentRebalanceNodeAllocation.java | 56 +++++++ .../routing/allocation/NodeAllocations.java | 1 + .../allocation/ShardAllocationModule.java | 1 + .../ConcurrentRebalanceRoutingTests.java | 158 ++++++++++++++++++ .../allocation/RebalanceAfterActiveTests.java | 1 + .../SingleShardNoReplicasRoutingTests.java | 2 + .../TenShardsOneReplicaRoutingTests.java | 1 + 7 files changed, 220 insertions(+) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java new file mode 100644 index 00000000000..1837cdf108e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceNodeAllocation.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +public class ConcurrentRebalanceNodeAllocation extends NodeAllocation { + + private final int clusterConcurrentRebalance; + + @Inject public ConcurrentRebalanceNodeAllocation(Settings settings) { + super(settings); + this.clusterConcurrentRebalance = componentSettings.getAsInt("cluster_concurrent_rebalance", 2); + logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); + } + + @Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + if (clusterConcurrentRebalance == -1) { + return true; + } + int rebalance = 0; + for (RoutingNode node : allocation.routingNodes()) { + for (MutableShardRouting shard : node) { + if (shard.state() == ShardRoutingState.RELOCATING) { + rebalance++; + } + } + } + if (rebalance >= clusterConcurrentRebalance) { + return false; + } + return true; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index d7107402fe9..d45ce1005b5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -43,6 +43,7 @@ public class NodeAllocations extends NodeAllocation { .add(new ThrottlingNodeAllocation(settings)) .add(new RebalanceOnlyWhenActiveNodeAllocation(settings)) .add(new ClusterRebalanceNodeAllocation(settings)) + .add(new ConcurrentRebalanceNodeAllocation(settings)) .build() ); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java index cf18ca279c0..c228189b397 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java @@ -49,6 +49,7 @@ public class ShardAllocationModule extends AbstractModule { allocationMultibinder.addBinding().to(ThrottlingNodeAllocation.class); allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveNodeAllocation.class); allocationMultibinder.addBinding().to(ClusterRebalanceNodeAllocation.class); + allocationMultibinder.addBinding().to(ConcurrentRebalanceNodeAllocation.class); for (Class allocation : allocations) { allocationMultibinder.addBinding().to(allocation); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java new file mode 100644 index 00000000000..92580dac57e --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -0,0 +1,158 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +@Test +public class ConcurrentRebalanceRoutingTests { + + private final ESLogger logger = Loggers.getLogger(ConcurrentRebalanceRoutingTests.class); + + @Test public void testClusterConcurrentRebalance() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(5)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("start all the primary shards, replicas will start initializing"); + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()) + .put(newNode("node3")).put(newNode("node4")).put(newNode("node5")).put(newNode("node6")).put(newNode("node7")).put(newNode("node8")).put(newNode("node9")).put(newNode("node10"))) + .build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("start the replica shards, rebalancing should start, but, only 3 should be rebalancing"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + // we only allow one relocation at a time + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3)); + + logger.info("finalize this session relocation, 3 more should relocate now"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + // we only allow one relocation at a time + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3)); + + logger.info("finalize this session relocation, 2 more should relocate now"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + // we only allow one relocation at a time + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(2)); + + logger.info("finalize this session relocation, no more relocation"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + // we only allow one relocation at a time + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); + assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(0)); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index f2c06d9498a..09c4757345f 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -50,6 +50,7 @@ public class RebalanceAfterActiveTests { ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index f5ce3e15b18..f3565c70305 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -208,6 +208,7 @@ public class SingleShardNoReplicasRoutingTests { ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); final int numberOfIndices = 50; @@ -319,6 +320,7 @@ public class SingleShardNoReplicasRoutingTests { ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); final int numberOfIndices = 10; diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index 97684f2272f..290065f88e5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -49,6 +49,7 @@ public class TenShardsOneReplicaRoutingTests { ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); logger.info("Building initial routing table");