From 14d98a7319555331e5a660f4f31122b9225250af Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 31 Mar 2011 01:04:09 +0200 Subject: [PATCH] Shard Allocation: Add node_initial_primaries_recoveries setting, closes #819. --- .../routing/allocation/ThrottlingNodeAllocation.java | 6 ++++-- .../routing/allocation/FailedShardsRoutingTests.java | 5 ++++- .../PrimaryNotRelocatedWhileBeingRecoveredTests.java | 5 ++++- .../allocation/TenShardsOneReplicaRoutingTests.java | 3 ++- .../routing/allocation/ThrottlingAllocationTests.java | 10 ++++++++-- 5 files changed, 22 insertions(+), 7 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java index ba6ec3edd93..f0b58661593 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java @@ -31,13 +31,15 @@ import org.elasticsearch.common.settings.Settings; */ public class ThrottlingNodeAllocation extends NodeAllocation { + private final int primariesInitialRecoveries; private final int concurrentRecoveries; @Inject public ThrottlingNodeAllocation(Settings settings) { super(settings); + this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4); this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2)); - logger.debug("using [concurrent_recoveries] with [{}]", concurrentRecoveries); + logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries); } @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { @@ -57,7 +59,7 @@ public class ThrottlingNodeAllocation extends NodeAllocation { primariesInRecovery++; } } - if (primariesInRecovery >= concurrentRecoveries) { + if (primariesInRecovery >= primariesInitialRecoveries) { return Decision.THROTTLE; } else { return Decision.YES; diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index d3a8eb36f4f..2d513275e85 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -166,7 +166,10 @@ public class FailedShardsRoutingTests { } @Test public void test10ShardsWith1ReplicaFailure() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java index 7ae60091645..3242d796fad 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java @@ -47,7 +47,10 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests { @Test public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index 290065f88e5..eb05c0a0514 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -47,7 +47,8 @@ public class TenShardsOneReplicaRoutingTests { @Test public void testSingleIndexFirstStartPrimaryThenBackups() { ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() - .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index 77a6543b0c9..ebfee63766c 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -45,7 +45,10 @@ public class ThrottlingAllocationTests { private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class); @Test public void testPrimaryRecoveryThrottling() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build()); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 3) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) + .build()); logger.info("Building initial routing table"); @@ -102,7 +105,10 @@ public class ThrottlingAllocationTests { } @Test public void testReplicaAndPrimaryRecoveryThrottling() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build()); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 3) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) + .build()); logger.info("Building initial routing table");