diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 33bb5c94aa3..ab6d31fca8f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -56,6 +56,10 @@ import java.util.concurrent.ConcurrentMap; */ public class LocalGatewayNodeAllocation extends NodeAllocation { + static { + IndexMetaData.addDynamicSettings("index.recovery.initial_shards"); + } + private final TransportNodesListGatewayStartedShards listGatewayStartedShards; private final TransportNodesListShardStoreMetaData listShardStoreMetaData; @@ -146,6 +150,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { int requiredAllocation = 1; try { IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index()); + String initialShards = indexMetaData.settings().get("recovery.initial_shards", this.initialShards); if ("quorum".equals(initialShards)) { if (indexMetaData.numberOfReplicas() > 1) { requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java index f0f12bc5089..252aecc37c8 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/QuorumLocalGatewayTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.test.integration.gateway.local; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.node.Node; import org.elasticsearch.node.internal.InternalNode; @@ -51,6 +52,62 @@ public class QuorumLocalGatewayTests extends AbstractNodesTests { closeAllNodes(); } + @Test public void testChangeInitialShardsRecovery() throws Exception { + // clean three nodes + logger.info("--> cleaning nodes"); + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node2", settingsBuilder().put("gateway.type", "local").build()); + buildNode("node3", settingsBuilder().put("gateway.type", "local").build()); + cleanAndCloseNodes(); + + + logger.info("--> starting 3 nodes"); + Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build()); + + logger.info("--> indexing..."); + node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareFlush().execute().actionGet(); + node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareRefresh().execute().actionGet(); + + logger.info("--> running cluster_health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + for (int i = 0; i < 10; i++) { + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + } + + logger.info("--> closing nodes"); + closeAllNodes(); + + logger.info("--> starting 2 nodes back, should not do any recovery (less than quorum)"); + + node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build()); + node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build()); + + Thread.sleep(300); + ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setMasterNodeTimeout("500ms").execute().actionGet(); + assertThat(clusterStateResponse.state().routingTable().index("test").allPrimaryShardsActive(), equalTo(false)); + + logger.info("--> change the recovery.initial_shards setting, and make sure its recovered"); + client("node1").admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).execute().actionGet(); + + logger.info("--> running cluster_health (wait for the shards to startup), 4 shards since we only have 2 nodes"); + clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + for (int i = 0; i < 10; i++) { + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + } + } + @Test public void testQuorumRecovery() throws Exception { // clean three nodes logger.info("--> cleaning nodes");