A setting to auto expand the number of replicas of an index (based on data nodes), closes #623.

This commit is contained in:
kimchy 2011-01-12 16:27:36 +02:00
parent f5a9f2d948
commit 85b6a982d4
2 changed files with 148 additions and 14 deletions

View File

@ -20,9 +20,7 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -36,13 +34,56 @@ import static org.elasticsearch.cluster.routing.RoutingTable.*;
/**
* @author kimchy (shay.banon)
*/
public class MetaDataUpdateSettingsService extends AbstractComponent {
public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener {
private final ClusterService clusterService;
@Inject public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
this.clusterService.add(this);
}
@Override public void clusterChanged(ClusterChangedEvent event) {
// update an index with number of replicas based on data nodes if possible
if (!event.state().nodes().localNodeMaster()) {
return;
}
if (!event.nodesChanged()) {
return;
}
for (final IndexMetaData indexMetaData : event.state().metaData()) {
String autoExpandReplicas = indexMetaData.settings().get("index.auto_expand_replicas");
if (autoExpandReplicas != null) {
try {
final int numberOfReplicas = event.state().nodes().dataNodes().size() - 1;
int min = Integer.parseInt(autoExpandReplicas.substring(0, autoExpandReplicas.indexOf('-')));
int max;
String sMax = autoExpandReplicas.substring(autoExpandReplicas.indexOf('-') + 1);
if (sMax.equals("all")) {
max = event.state().nodes().dataNodes().size() - 1;
} else {
max = Integer.parseInt(sMax);
}
if (numberOfReplicas >= min && numberOfReplicas <= max) {
Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
updateSettings(settings, new String[]{indexMetaData.index()}, new Listener() {
@Override public void onSuccess() {
logger.info("[{}] auto expanded replicas to [{}]", indexMetaData.index(), numberOfReplicas);
}
@Override public void onFailure(Throwable t) {
logger.warn("[{}] fail to auto expand replicas to [{}]", indexMetaData.index(), numberOfReplicas);
}
});
}
} catch (Exception e) {
logger.warn("[{}] failed to parse auto expand replicas", e, indexMetaData.index());
}
}
}
}
public void updateSettings(final Settings pSettings, final String[] indices, final Listener listener) {

View File

@ -49,16 +49,6 @@ public class UpdateNumberOfReplicasTests extends AbstractNodesTests {
client1 = getClient1();
client2 = getClient2();
createIndex();
}
protected void createIndex() {
logger.info("Creating index test");
client1.admin().indices().create(createIndexRequest("test")).actionGet();
}
protected String getConcreteIndexName() {
return "test";
}
@AfterMethod public void closeNodes() {
@ -76,6 +66,9 @@ public class UpdateNumberOfReplicasTests extends AbstractNodesTests {
}
@Test public void simpleUpdateNumberOfReplicasTests() throws Exception {
logger.info("Creating index test");
client1.admin().indices().create(createIndexRequest("test")).actionGet();
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
@ -148,4 +141,104 @@ public class UpdateNumberOfReplicasTests extends AbstractNodesTests {
assertThat(countResponse.count(), equalTo(10l));
}
}
@Test public void testAutoExpandNumberOfReplicas0ToData() {
logger.info("--> creating index test with auto expand replicas");
client1.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("number_of_shards", 2).put("auto_expand_replicas", "0-all")).execute().actionGet();
logger.info("--> running cluster health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(1));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(4));
logger.info("--> add another node, should increase the number of replicas");
startNode("node3");
logger.info("--> running cluster health");
clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(6).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(2));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(6));
logger.info("--> closing one node");
closeNode("node3");
logger.info("--> running cluster health");
clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(1));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(4));
logger.info("--> closing another node");
closeNode("node2");
logger.info("--> running cluster health");
clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(2).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(0));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(2));
}
@Test public void testAutoExpandNumberReplicas1ToData() {
logger.info("--> creating index test with auto expand replicas");
client1.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("number_of_shards", 2).put("auto_expand_replicas", "1-all")).execute().actionGet();
logger.info("--> running cluster health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(1));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(4));
logger.info("--> add another node, should increase the number of replicas");
startNode("node3");
logger.info("--> running cluster health");
clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(6).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(2));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(6));
logger.info("--> closing one node");
closeNode("node3");
logger.info("--> running cluster health");
clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(1));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(4));
logger.info("--> closing another node");
closeNode("node2");
logger.info("--> running cluster health");
clusterHealth = client1.admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForActiveShards(2).execute().actionGet();
logger.info("--> done cluster health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(clusterHealth.indices().get("test").activePrimaryShards(), equalTo(2));
assertThat(clusterHealth.indices().get("test").numberOfReplicas(), equalTo(1));
assertThat(clusterHealth.indices().get("test").activeShards(), equalTo(2));
}
}