From d5c0a49620a3c081e26bc443a1a925f9496904f2 Mon Sep 17 00:00:00 2001
From: Simon Willnauer
Date: Wed, 22 Oct 2014 10:21:43 +0200
Subject: [PATCH] [ROUTING] Add rebalance enabled allocation decider
This commit adds the ability to enable / disable relocations
on an entire cluster or on individual indices for either:
* `primaries` - only primaries can rebalance
* `replica` - only replicas can rebalance
* `all` - everything can rebalance (default)
* `none` - all rebalances are disabled
similar to the allocation enable / disable functionality.
Relates to #7288
---
.../indices/update-settings.asciidoc | 7 +
docs/reference/modules/cluster.asciidoc | 11 ++
.../decider/EnableAllocationDecider.java | 131 +++++++++++--
.../ClusterDynamicSettingsModule.java | 1 +
.../settings/IndexDynamicSettingsModule.java | 1 +
.../BasicBackwardsCompatibilityTest.java | 14 --
...nableAllocationDeciderIntegrationTest.java | 79 ++++++++
.../decider/EnableAllocationTests.java | 185 +++++++++++++++++-
.../RecoveryBackwardsCompatibilityTests.java | 8 +-
.../SimpleRecoveryLocalGatewayTests.java | 13 +-
.../index/store/CorruptedFileTest.java | 5 +-
.../admin/indices/upgrade/UpgradeTest.java | 9 +-
.../test/ElasticsearchAllocationTestCase.java | 7 +-
.../test/ElasticsearchIntegrationTest.java | 25 +++
14 files changed, 442 insertions(+), 54 deletions(-)
create mode 100644 src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDeciderIntegrationTest.java
diff --git a/docs/reference/indices/update-settings.asciidoc b/docs/reference/indices/update-settings.asciidoc
index 1ebaa6a9d7d..c5e16d77653 100644
--- a/docs/reference/indices/update-settings.asciidoc
+++ b/docs/reference/indices/update-settings.asciidoc
@@ -118,6 +118,13 @@ settings API:
* `new_primaries` - Allows shard allocation only for primary shards for new indices.
* `none` - No shard allocation is allowed.
+`index.routing.rebalance.enable`::
+ Enables shard rebalancing for a specific index. It can be set to:
+ * `all` (default) - Allows shard rebalancing for all shards.
+ * `primaries` - Allows shard rebalancing only for primary shards.
+ * `replicas` - Allows shard rebalancing only for replica shards.
+ * `none` - No shard rebalancing is allowed.
+
`index.routing.allocation.total_shards_per_node`::
Controls the total number of shards (replicas and primaries) allowed to be allocated on a single node. Defaults to unbounded (`-1`).
diff --git a/docs/reference/modules/cluster.asciidoc b/docs/reference/modules/cluster.asciidoc
index 5775bb5777f..d18b22c03ec 100644
--- a/docs/reference/modules/cluster.asciidoc
+++ b/docs/reference/modules/cluster.asciidoc
@@ -46,6 +46,17 @@ Can be set to:
* `new_primaries` - Allows shard allocation only for primary shards for new indices.
* `none` - No shard allocations of any kind are allowed for all indices.
+`cluster.routing.rebalance.enable`::
+
+Controls shard rebalance for all indices, by allowing specific
+kinds of shard to be rebalanced.
+
+Can be set to:
+ * `all` (default) - Allows shard balancing for all kinds of shards.
+ * `primaries` - Allows shard balancing only for primary shards.
+ * `replicas` - Allows shard balancing only for replica shards.
+ * `none` - No shard balancing of any kind are allowed for all indices.
+
`cluster.routing.allocation.same_shard.host`::
Allows to perform a check to prevent allocation of multiple instances
of the same shard on a single host, based on host name and host address.
diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
index df6fe423041..bcbc1231e4b 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
@@ -31,17 +31,32 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.Locale;
/**
- * This allocation decider allows shard allocations via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE}
- * and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE}. The per index settings overrides the cluster wide
- * setting. Depending on the
+ * This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE} /
+ * {@link #CLUSTER_ROUTING_REBALANCE_ENABLE} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE} / {@link #INDEX_ROUTING_REBALANCE_ENABLE}.
+ * The per index settings overrides the cluster wide setting.
*
- * Both settings can have the following values:
+ *
+ * Allocation settings can have the following values (non-casesensitive):
*
- * -
NONE
, no shard allocation is allowed.
- * -
NEW_PRIMARIES
only primary shards of new indices are allowed to be allocated
- * -
PRIMARIES
only primary shards (of any index) are allowed to be allocated
- * -
ALL
all shards are allowed to be allocated
+ * -
NONE
- no shard allocation is allowed.
+ * -
NEW_PRIMARIES
- only primary shards of new indices are allowed to be allocated
+ * -
PRIMARIES
- only primary shards are allowed to be allocated
+ * -
ALL
- all shards are allowed to be allocated
*
+ *
+ *
+ *
+ * Rebalancing settings can have the following values (non-casesensitive):
+ *
+ * -
NONE
- no shard rebalancing is allowed.
+ * -
REPLICAS
- only replica shards are allowed to be balanced
+ * -
PRIMARIES
- only primary shards are allowed to be balanced
+ * -
ALL
- all shards are allowed to be balanced
+ *
+ *
+ *
+ * @see Rebalance
+ * @see Allocation
*/
public class EnableAllocationDecider extends AllocationDecider implements NodeSettingsService.Listener {
@@ -50,12 +65,18 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
public static final String CLUSTER_ROUTING_ALLOCATION_ENABLE = "cluster.routing.allocation.enable";
public static final String INDEX_ROUTING_ALLOCATION_ENABLE = "index.routing.allocation.enable";
- private volatile Allocation enable;
+ public static final String CLUSTER_ROUTING_REBALANCE_ENABLE = "cluster.routing.rebalance.enable";
+ public static final String INDEX_ROUTING_REBALANCE_ENABLE = "index.routing.rebalance.enable";
+
+ private volatile Rebalance enableRebalance;
+ private volatile Allocation enableAllocation;
+
@Inject
public EnableAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
- this.enable = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, Allocation.ALL.name()));
+ this.enableAllocation = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, Allocation.ALL.name()));
+ this.enableRebalance = Rebalance.parse(settings.get(CLUSTER_ROUTING_REBALANCE_ENABLE, Rebalance.ALL.name()));
nodeSettingsService.addListener(this);
}
@@ -71,7 +92,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
if (enableIndexValue != null) {
enable = Allocation.parse(enableIndexValue);
} else {
- enable = this.enable;
+ enable = this.enableAllocation;
}
switch (enable) {
case ALL:
@@ -82,13 +103,13 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
} else {
- return allocation.decision(Decision.NO, NAME, "non-new primary allocations are disallowed");
+ return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden");
}
case PRIMARIES:
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, NAME, "primary allocations are allowed");
} else {
- return allocation.decision(Decision.NO, NAME, "replica allocations are disallowed");
+ return allocation.decision(Decision.NO, NAME, "replica allocations are forbidden");
}
default:
throw new ElasticsearchIllegalStateException("Unknown allocation option");
@@ -96,14 +117,62 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
}
@Override
- public void onRefreshSettings(Settings settings) {
- Allocation enable = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, this.enable.name()));
- if (enable != this.enable) {
- logger.info("updating [cluster.routing.allocation.enable] from [{}] to [{}]", this.enable, enable);
- EnableAllocationDecider.this.enable = enable;
+ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
+ if (allocation.ignoreDisable()) {
+ return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored");
+ }
+
+ Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
+ String enableIndexValue = indexSettings.get(INDEX_ROUTING_REBALANCE_ENABLE);
+ final Rebalance enable;
+ if (enableIndexValue != null) {
+ enable = Rebalance.parse(enableIndexValue);
+ } else {
+ enable = this.enableRebalance;
+ }
+ switch (enable) {
+ case ALL:
+ return allocation.decision(Decision.YES, NAME, "all rebalancing is allowed");
+ case NONE:
+ return allocation.decision(Decision.NO, NAME, "no rebalancing is allowed");
+ case PRIMARIES:
+ if (shardRouting.primary()) {
+ return allocation.decision(Decision.YES, NAME, "primary rebalancing is allowed");
+ } else {
+ return allocation.decision(Decision.NO, NAME, "replica rebalancing is forbidden");
+ }
+ case REPLICAS:
+ if (shardRouting.primary() == false) {
+ return allocation.decision(Decision.YES, NAME, "replica rebalancing is allowed");
+ } else {
+ return allocation.decision(Decision.NO, NAME, "primary rebalancing is forbidden");
+ }
+ default:
+ throw new ElasticsearchIllegalStateException("Unknown rebalance option");
}
}
+ @Override
+ public void onRefreshSettings(Settings settings) {
+ final Allocation enable = Allocation.parse(settings.get(CLUSTER_ROUTING_ALLOCATION_ENABLE, this.enableAllocation.name()));
+ if (enable != this.enableAllocation) {
+ logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_ENABLE, this.enableAllocation, enable);
+ EnableAllocationDecider.this.enableAllocation = enable;
+ }
+
+ final Rebalance enableRebalance = Rebalance.parse(settings.get(CLUSTER_ROUTING_REBALANCE_ENABLE, this.enableRebalance.name()));
+ if (enableRebalance != this.enableRebalance) {
+ logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_REBALANCE_ENABLE, this.enableRebalance, enableRebalance);
+ EnableAllocationDecider.this.enableRebalance = enableRebalance;
+ }
+
+ }
+
+ /**
+ * Allocation values or rather their string representation to be used used with
+ * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE}
+ * via cluster / index settings.
+ */
public enum Allocation {
NONE,
@@ -125,4 +194,30 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
}
}
+ /**
+ * Rebalance values or rather their string representation to be used used with
+ * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE}
+ * via cluster / index settings.
+ */
+ public enum Rebalance {
+
+ NONE,
+ PRIMARIES,
+ REPLICAS,
+ ALL;
+
+ public static Rebalance parse(String strValue) {
+ if (strValue == null) {
+ return null;
+ } else {
+ strValue = strValue.toUpperCase(Locale.ROOT);
+ try {
+ return Rebalance.valueOf(strValue);
+ } catch (IllegalArgumentException e) {
+ throw new ElasticsearchIllegalArgumentException("Illegal rebalance.enable value [" + strValue + "]");
+ }
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
index 8701eb8abcc..35e210f9535 100644
--- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
+++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
@@ -54,6 +54,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
ClusterRebalanceAllocationDecider.ALLOCATION_ALLOW_REBALANCE_VALIDATOR);
clusterDynamicSettings.addDynamicSetting(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, Validator.INTEGER);
clusterDynamicSettings.addDynamicSetting(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE);
+ clusterDynamicSettings.addDynamicSetting(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE);
clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION);
clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);
diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java
index d06aa465352..66debafd521 100644
--- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java
+++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java
@@ -60,6 +60,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
indexDynamicSettings.addDynamicSetting(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE);
+ indexDynamicSettings.addDynamicSetting(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE);
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION);
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);
diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java
index 13e11fce7c2..3d7746b167f 100644
--- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java
+++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java
@@ -248,20 +248,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
}
}
- public void assertAllShardsOnNodes(String index, String pattern) {
- ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
- for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
- for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
- for (ShardRouting shardRouting : indexShardRoutingTable) {
- if (shardRouting.currentNodeId() != null && index.equals(shardRouting.getIndex())) {
- String name = clusterState.nodes().get(shardRouting.currentNodeId()).name();
- assertThat("Allocated on new node: " + name, Regex.simpleMatch(pattern, name), is(true));
- }
- }
- }
- }
- }
-
/**
* Upgrades a single node to the current version
*/
diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDeciderIntegrationTest.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDeciderIntegrationTest.java
new file mode 100644
index 00000000000..d753e8e8655
--- /dev/null
+++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDeciderIntegrationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing.allocation.decider;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+
+import java.util.Set;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Simple integration for {@link EnableAllocationDecider} there is a more exhaustive unittest in
+ * {@link EnableAllocationTests} this test is meant to check if the actual update of the settings
+ * works as expected.
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
+public class EnableAllocationDeciderIntegrationTest extends ElasticsearchIntegrationTest {
+
+ public void testEnableRebalance() throws InterruptedException {
+ final String firstNode = internalCluster().startNode();
+ client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get();
+ // we test with 2 shards since otherwise it's pretty fragile if there are difference in the num or shards such that
+ // all shards are relocated to the second node which is not what we want here. It's solely a test for the settings to take effect
+ final int numShards = 2;
+ assertAcked(prepareCreate("test").setSettings(settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)));
+ assertAcked(prepareCreate("test_1").setSettings(settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)));
+ ensureGreen();
+ assertAllShardsOnNodes("test", firstNode);
+ assertAllShardsOnNodes("test_1", firstNode);
+
+ final String secondNode = internalCluster().startNode();
+ // prevent via index setting but only on index test
+ client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)).get();
+ client().admin().cluster().prepareReroute().get();
+ ensureGreen();
+ assertAllShardsOnNodes("test", firstNode);
+ assertAllShardsOnNodes("test_1", firstNode);
+
+ // now enable the index test to relocate since index settings override cluster settings
+ client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, randomBoolean() ? EnableAllocationDecider.Rebalance.PRIMARIES : EnableAllocationDecider.Rebalance.ALL)).get();
+ logger.info("--> balance index [test]");
+ client().admin().cluster().prepareReroute().get();
+ ensureGreen("test");
+ Set test = assertAllShardsOnNodes("test", firstNode, secondNode);
+ assertThat("index: [test] expected to be rebalanced on both nodes", test.size(), equalTo(2));
+
+ // flip the cluster wide setting such that we can also balance for index test_1 eventually we should have one shard of each index on each node
+ client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, randomBoolean() ? EnableAllocationDecider.Rebalance.PRIMARIES : EnableAllocationDecider.Rebalance.ALL)).get();
+ logger.info("--> balance index [test_1]");
+ client().admin().cluster().prepareReroute().get();
+ ensureGreen("test_1");
+ Set test_1 = assertAllShardsOnNodes("test_1", firstNode, secondNode);
+ assertThat("index: [test_1] expected to be rebalanced on both nodes", test_1.size(), equalTo(2));
+
+ test = assertAllShardsOnNodes("test", firstNode, secondNode);
+ assertThat("index: [test] expected to be rebalanced on both nodes", test.size(), equalTo(2));
+ }
+}
diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java
index 2791f13bc67..31cb52d31d7 100644
--- a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java
+++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationTests.java
@@ -19,20 +19,27 @@
package org.elasticsearch.cluster.routing.allocation.decider;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
-import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
-import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
@@ -145,4 +152,178 @@ public class EnableAllocationTests extends ElasticsearchAllocationTestCase {
assertThat(clusterState.readOnlyRoutingNodes().shardsWithState("disabled", STARTED).size(), equalTo(0));
}
+
+
+
+ @Test
+ public void testEnableClusterBalance() {
+ final boolean useClusterSetting = randomBoolean();
+ final Rebalance allowedOnes = RandomPicks.randomFrom(getRandom(), EnumSet.of(Rebalance.PRIMARIES, Rebalance.REPLICAS, Rebalance.ALL));
+ Settings build = settingsBuilder()
+ .put(CLUSTER_ROUTING_REBALANCE_ENABLE, useClusterSetting ? Rebalance.NONE: RandomPicks.randomFrom(getRandom(), Rebalance.values())) // index settings override cluster settings
+ .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, 3)
+ .build();
+ NodeSettingsService nodeSettingsService = new NodeSettingsService(build);
+ AllocationService strategy = createAllocationService(build, nodeSettingsService, getRandom());
+ Settings indexSettings = useClusterSetting ? ImmutableSettings.EMPTY : settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Rebalance.NONE).build();
+
+ logger.info("Building initial routing table");
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("test").settings(indexSettings).numberOfShards(3).numberOfReplicas(1))
+
+ .put(IndexMetaData.builder("always_disabled").settings(settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Rebalance.NONE)).numberOfShards(1).numberOfReplicas(1))
+ .build();
+
+ RoutingTable routingTable = RoutingTable.builder()
+ .addAsNew(metaData.index("test"))
+ .addAsNew(metaData.index("always_disabled"))
+ .build();
+
+ ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+
+ logger.info("--> adding one nodes and do rerouting");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+ .put(newNode("node1"))
+ .put(newNode("node2"))
+ ).build();
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
+ logger.info("--> start the shards (primaries)");
+ routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4));
+ assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
+
+ routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(8));
+ assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
+
+ logger.info("--> adding one nodes and do rerouting");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+ .put(newNode("node1"))
+ .put(newNode("node2"))
+ .put(newNode("node3"))
+ ).build();
+ ClusterState prevState = clusterState;
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(8));
+ assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(0));
+
+ if (useClusterSetting) {
+ prevState = clusterState;
+ clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).transientSettings(settingsBuilder()
+ .put(CLUSTER_ROUTING_REBALANCE_ENABLE, allowedOnes)
+ .build())).build();
+ } else {
+ prevState = clusterState;
+ IndexMetaData meta = clusterState.getMetaData().index("test");
+ IndexMetaData meta1 = clusterState.getMetaData().index("always_disabled");
+ clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).removeAllIndices().put(IndexMetaData.builder(meta1))
+ .put(IndexMetaData.builder(meta).settings(settingsBuilder().put(meta.getSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, allowedOnes).build())))
+ .build();
+
+ }
+ nodeSettingsService.clusterChanged(new ClusterChangedEvent("foo", clusterState, prevState));
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat("expected 6 shards to be started 2 to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(6));
+ assertThat("expected 2 shards to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(2));
+ List mutableShardRoutings = clusterState.routingNodes().shardsWithState(RELOCATING);
+ switch (allowedOnes) {
+ case PRIMARIES:
+ for (MutableShardRouting routing : mutableShardRoutings) {
+ assertTrue("only primaries are allowed to relocate", routing.primary());
+ assertThat("only test index can rebalance", routing.getIndex(), equalTo("test"));
+ }
+ break;
+ case REPLICAS:
+ for (MutableShardRouting routing : mutableShardRoutings) {
+ assertFalse("only replicas are allowed to relocate", routing.primary());
+ assertThat("only test index can rebalance", routing.getIndex(), equalTo("test"));
+ }
+ break;
+ case ALL:
+ for (MutableShardRouting routing : mutableShardRoutings) {
+ assertThat("only test index can rebalance", routing.getIndex(), equalTo("test"));
+ }
+ break;
+ default:
+ fail("only replicas, primaries or all are allowed");
+ }
+ routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(8));
+ assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
+
+ }
+
+ @Test
+ public void testEnableClusterBalanceNoReplicas() {
+ final boolean useClusterSetting = randomBoolean();
+ Settings build = settingsBuilder()
+ .put(CLUSTER_ROUTING_REBALANCE_ENABLE, useClusterSetting ? Rebalance.NONE: RandomPicks.randomFrom(getRandom(), Rebalance.values())) // index settings override cluster settings
+ .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, 3)
+ .build();
+ NodeSettingsService nodeSettingsService = new NodeSettingsService(build);
+ AllocationService strategy = createAllocationService(build, nodeSettingsService, getRandom());
+ Settings indexSettings = useClusterSetting ? ImmutableSettings.EMPTY : settingsBuilder().put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Rebalance.NONE).build();
+
+ logger.info("Building initial routing table");
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("test").settings(indexSettings).numberOfShards(6).numberOfReplicas(0))
+ .build();
+
+ RoutingTable routingTable = RoutingTable.builder()
+ .addAsNew(metaData.index("test"))
+ .build();
+
+ ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+
+ logger.info("--> adding one nodes and do rerouting");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+ .put(newNode("node1"))
+ .put(newNode("node2"))
+ ).build();
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(6));
+ logger.info("--> start the shards (primaries)");
+ routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(6));
+ assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
+
+ logger.info("--> adding one nodes and do rerouting");
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+ .put(newNode("node1"))
+ .put(newNode("node2"))
+ .put(newNode("node3"))
+ ).build();
+ ClusterState prevState = clusterState;
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(6));
+ assertThat(clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(0));
+ if (useClusterSetting) {
+ prevState = clusterState;
+ clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).transientSettings(settingsBuilder()
+ .put(CLUSTER_ROUTING_REBALANCE_ENABLE, randomBoolean() ? Rebalance.PRIMARIES : Rebalance.ALL)
+ .build())).build();
+ } else {
+ prevState = clusterState;
+ IndexMetaData meta = clusterState.getMetaData().index("test");
+ clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).removeAllIndices()
+ .put(IndexMetaData.builder(meta).settings(settingsBuilder().put(meta.getSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, randomBoolean() ? Rebalance.PRIMARIES : Rebalance.ALL).build()))).build();
+ }
+ nodeSettingsService.clusterChanged(new ClusterChangedEvent("foo", clusterState, prevState));
+ routingTable = strategy.reroute(clusterState).routingTable();
+ clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ assertThat("expected 4 primaries to be started and 2 to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(4));
+ assertThat("expected 2 primaries to relocate useClusterSettings: " + useClusterSetting, clusterState.routingNodes().shardsWithState(RELOCATING).size(), equalTo(2));
+
+ }
+
}
diff --git a/src/test/java/org/elasticsearch/gateway/local/RecoveryBackwardsCompatibilityTests.java b/src/test/java/org/elasticsearch/gateway/local/RecoveryBackwardsCompatibilityTests.java
index e74a864b95a..e88e51fd3c8 100644
--- a/src/test/java/org/elasticsearch/gateway/local/RecoveryBackwardsCompatibilityTests.java
+++ b/src/test/java/org/elasticsearch/gateway/local/RecoveryBackwardsCompatibilityTests.java
@@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@@ -47,8 +46,7 @@ public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsC
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("action.admin.cluster.node.shutdown.delay", "10ms")
- .put("gateway.recover_after_nodes", 2)
- .put(BalancedShardsAllocator.SETTING_THRESHOLD, 100.0f).build(); // use less aggressive settings
+ .put("gateway.recover_after_nodes", 2).build();
}
protected int minExternalNodes() {
@@ -64,7 +62,9 @@ public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsC
@LuceneTestCase.Slow
@TestLogging("discovery.zen:TRACE")
public void testReusePeerRecovery() throws Exception {
- assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)));
+ assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings())
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+ .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
logger.info("--> indexing docs");
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
diff --git a/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java b/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java
index e57f184ec0c..1dcb5811eb7 100644
--- a/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java
+++ b/src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java
@@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@@ -351,7 +350,11 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
.put("action.admin.cluster.node.shutdown.delay", "10ms")
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
.put("gateway.recover_after_nodes", 4)
- .put(MockDirectoryHelper.CRASH_INDEX, false);
+ .put(MockDirectoryHelper.CRASH_INDEX, false)
+ // prevent any rebalance actions during the peer recovery
+ // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
+ // we reuse the files on disk after full restarts for replicas.
+ .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE);
internalCluster().startNodesAsync(4, settings.build()).get();
@@ -368,11 +371,7 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
ensureGreen();
logger.info("--> shutting down the nodes");
- // prevent any rebalance actions during the peer recovery
- // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
- // we reuse the files on disk after full restarts for replicas.
- client().admin().cluster().prepareUpdateSettings()
- .setPersistentSettings(settingsBuilder().put(BalancedShardsAllocator.SETTING_THRESHOLD, 100.0f)).get();
+
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(settingsBuilder()
diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
index 9fc97f419a2..26ee0bc1034 100644
--- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
+++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
@@ -44,6 +44,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
+import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
@@ -80,6 +81,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
@@ -320,6 +322,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
.put("indices.recovery.concurrent_streams", 10)
+ .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
));
ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@@ -422,7 +425,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
// it snapshots and that will write a new segments.X+1 file
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
- .setType("fs").setSettings(ImmutableSettings.settingsBuilder()
+ .setType("fs").setSettings(settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000))));
diff --git a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java
index 9bbbd79b1c1..a7eac841c35 100644
--- a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java
+++ b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java
@@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
@@ -45,7 +44,6 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.elasticsearch.test.rest.client.http.HttpResponse;
import org.elasticsearch.test.rest.json.JsonPath;
-import org.junit.After;
import org.junit.BeforeClass;
import java.net.InetSocketAddress;
@@ -129,17 +127,14 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
logSegmentsState(null);
backwardsCluster().allowOnAllNodes(indexNames);
ensureGreen();
- // set the balancing threshold to something very highish such that no rebalancing happens after the upgrade
- builder = ImmutableSettings.builder();
- builder.put(BalancedShardsAllocator.SETTING_THRESHOLD, 100.0f);
- client().admin().cluster().prepareUpdateSettings().setPersistentSettings(builder).get();
// disable allocation entirely until all nodes are upgraded
builder = ImmutableSettings.builder();
builder.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(builder).get();
backwardsCluster().upgradeAllNodes();
- // we are done - enable allocation again
builder = ImmutableSettings.builder();
+ // disable rebalanceing entirely for the time being otherwise we might get relocations / rebalance from nodes with old segments
+ builder.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE);
builder.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.ALL);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(builder).get();
ensureGreen();
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java
index 9ec94574c44..beec50b7c8b 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java
@@ -57,11 +57,16 @@ public abstract class ElasticsearchAllocationTestCase extends ElasticsearchTestC
}
public static AllocationService createAllocationService(Settings settings, Random random) {
+ return createAllocationService(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), random);
+ }
+
+ public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
return new AllocationService(settings,
- randomAllocationDeciders(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS), random),
+ randomAllocationDeciders(settings, nodeSettingsService, random),
new ShardsAllocators(settings), ClusterInfoService.EMPTY);
}
+
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final ImmutableSet> defaultAllocationDeciders = AllocationDecidersModule.DEFAULT_ALLOCATION_DECIDERS;
final List list = new ArrayList<>();
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
index 1f3dcb725bc..76b3c890a63 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
@@ -55,13 +55,18 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -1691,6 +1696,26 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return new NumShards(numShards, numReplicas);
}
+ /**
+ * Asserts that all shards are allocated on nodes matching the given node pattern.
+ */
+ public Set assertAllShardsOnNodes(String index, String... pattern) {
+ Set nodes = new HashSet<>();
+ ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
+ for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
+ for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
+ for (ShardRouting shardRouting : indexShardRoutingTable) {
+ if (shardRouting.currentNodeId() != null && index.equals(shardRouting.getIndex())) {
+ String name = clusterState.nodes().get(shardRouting.currentNodeId()).name();
+ nodes.add(name);
+ assertThat("Allocated on new node: " + name, Regex.simpleMatch(pattern, name), is(true));
+ }
+ }
+ }
+ }
+ return nodes;
+ }
+
protected static class NumShards {
public final int numPrimaries;
public final int numReplicas;