Shard Allocation: Add node_initial_primaries_recoveries setting, closes #819.

This commit is contained in:
kimchy 2011-03-31 01:04:09 +02:00
parent f90b4e6fee
commit 14d98a7319
5 changed files with 22 additions and 7 deletions

View File

@ -31,13 +31,15 @@ import org.elasticsearch.common.settings.Settings;
*/ */
public class ThrottlingNodeAllocation extends NodeAllocation { public class ThrottlingNodeAllocation extends NodeAllocation {
private final int primariesInitialRecoveries;
private final int concurrentRecoveries; private final int concurrentRecoveries;
@Inject public ThrottlingNodeAllocation(Settings settings) { @Inject public ThrottlingNodeAllocation(Settings settings) {
super(settings); super(settings);
this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4);
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2)); this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2));
logger.debug("using [concurrent_recoveries] with [{}]", concurrentRecoveries); logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);
} }
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
@ -57,7 +59,7 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
primariesInRecovery++; primariesInRecovery++;
} }
} }
if (primariesInRecovery >= concurrentRecoveries) { if (primariesInRecovery >= primariesInitialRecoveries) {
return Decision.THROTTLE; return Decision.THROTTLE;
} else { } else {
return Decision.YES; return Decision.YES;

View File

@ -166,7 +166,10 @@ public class FailedShardsRoutingTests {
} }
@Test public void test10ShardsWith1ReplicaFailure() { @Test public void test10ShardsWith1ReplicaFailure() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.build());
logger.info("Building initial routing table"); logger.info("Building initial routing table");

View File

@ -47,7 +47,10 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests {
@Test public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() { @Test public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.build());
logger.info("Building initial routing table"); logger.info("Building initial routing table");

View File

@ -47,7 +47,8 @@ public class TenShardsOneReplicaRoutingTests {
@Test public void testSingleIndexFirstStartPrimaryThenBackups() { @Test public void testSingleIndexFirstStartPrimaryThenBackups() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build()); .build());

View File

@ -45,7 +45,10 @@ public class ThrottlingAllocationTests {
private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class); private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class);
@Test public void testPrimaryRecoveryThrottling() { @Test public void testPrimaryRecoveryThrottling() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build()); ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build());
logger.info("Building initial routing table"); logger.info("Building initial routing table");
@ -102,7 +105,10 @@ public class ThrottlingAllocationTests {
} }
@Test public void testReplicaAndPrimaryRecoveryThrottling() { @Test public void testReplicaAndPrimaryRecoveryThrottling() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build()); ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build());
logger.info("Building initial routing table"); logger.info("Building initial routing table");