Index Settings: Add `index.recovery.initial_shards` controlling the number of shards to exists when using local gateway, closes #1163.
This commit is contained in:
parent
b70694ce63
commit
6e8c7c41fe
|
@ -56,6 +56,10 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
*/
|
*/
|
||||||
public class LocalGatewayNodeAllocation extends NodeAllocation {
|
public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
|
|
||||||
|
static {
|
||||||
|
IndexMetaData.addDynamicSettings("index.recovery.initial_shards");
|
||||||
|
}
|
||||||
|
|
||||||
private final TransportNodesListGatewayStartedShards listGatewayStartedShards;
|
private final TransportNodesListGatewayStartedShards listGatewayStartedShards;
|
||||||
|
|
||||||
private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
|
private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
|
||||||
|
@ -146,6 +150,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
int requiredAllocation = 1;
|
int requiredAllocation = 1;
|
||||||
try {
|
try {
|
||||||
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
|
IndexMetaData indexMetaData = routingNodes.metaData().index(shard.index());
|
||||||
|
String initialShards = indexMetaData.settings().get("recovery.initial_shards", this.initialShards);
|
||||||
if ("quorum".equals(initialShards)) {
|
if ("quorum".equals(initialShards)) {
|
||||||
if (indexMetaData.numberOfReplicas() > 1) {
|
if (indexMetaData.numberOfReplicas() > 1) {
|
||||||
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
|
requiredAllocation = ((1 + indexMetaData.numberOfReplicas()) / 2) + 1;
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.test.integration.gateway.local;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||||
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.gateway.Gateway;
|
import org.elasticsearch.gateway.Gateway;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.node.internal.InternalNode;
|
import org.elasticsearch.node.internal.InternalNode;
|
||||||
|
@ -51,6 +52,62 @@ public class QuorumLocalGatewayTests extends AbstractNodesTests {
|
||||||
closeAllNodes();
|
closeAllNodes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testChangeInitialShardsRecovery() throws Exception {
|
||||||
|
// clean three nodes
|
||||||
|
logger.info("--> cleaning nodes");
|
||||||
|
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
|
||||||
|
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
|
||||||
|
buildNode("node3", settingsBuilder().put("gateway.type", "local").build());
|
||||||
|
cleanAndCloseNodes();
|
||||||
|
|
||||||
|
|
||||||
|
logger.info("--> starting 3 nodes");
|
||||||
|
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
|
||||||
|
Node node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
|
||||||
|
Node node3 = startNode("node3", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 2).build());
|
||||||
|
|
||||||
|
logger.info("--> indexing...");
|
||||||
|
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
|
||||||
|
node1.client().admin().indices().prepareFlush().execute().actionGet();
|
||||||
|
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
|
||||||
|
node1.client().admin().indices().prepareRefresh().execute().actionGet();
|
||||||
|
|
||||||
|
logger.info("--> running cluster_health (wait for the shards to startup)");
|
||||||
|
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(6)).actionGet();
|
||||||
|
logger.info("--> done cluster_health, status " + clusterHealth.status());
|
||||||
|
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||||
|
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> closing nodes");
|
||||||
|
closeAllNodes();
|
||||||
|
|
||||||
|
logger.info("--> starting 2 nodes back, should not do any recovery (less than quorum)");
|
||||||
|
|
||||||
|
node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build());
|
||||||
|
node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").build());
|
||||||
|
|
||||||
|
Thread.sleep(300);
|
||||||
|
ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setMasterNodeTimeout("500ms").execute().actionGet();
|
||||||
|
assertThat(clusterStateResponse.state().routingTable().index("test").allPrimaryShardsActive(), equalTo(false));
|
||||||
|
|
||||||
|
logger.info("--> change the recovery.initial_shards setting, and make sure its recovered");
|
||||||
|
client("node1").admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).execute().actionGet();
|
||||||
|
|
||||||
|
logger.info("--> running cluster_health (wait for the shards to startup), 4 shards since we only have 2 nodes");
|
||||||
|
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(4)).actionGet();
|
||||||
|
logger.info("--> done cluster_health, status " + clusterHealth.status());
|
||||||
|
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||||
|
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void testQuorumRecovery() throws Exception {
|
@Test public void testQuorumRecovery() throws Exception {
|
||||||
// clean three nodes
|
// clean three nodes
|
||||||
logger.info("--> cleaning nodes");
|
logger.info("--> cleaning nodes");
|
||||||
|
|
Loading…
Reference in New Issue