Cluster Health API: Add `wait_for_nodes` (accepts "N", "<N", ">N", "<=N", and ">=N"), closes #269.

This commit is contained in:
kimchy 2010-07-21 16:29:44 +03:00
parent 2a0e8d4ec9
commit 48d33ec70a
13 changed files with 100 additions and 43 deletions

View File

@ -46,6 +46,8 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
private int waitForActiveShards = -1;
private String waitForNodes = "";
ClusterHealthRequest() {
}
@ -110,6 +112,18 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
return this;
}
public String waitForNodes() {
return waitForNodes;
}
/**
* Waits for N number of nodes. Use "12" for exact mapping, ">12" and "<12" for range.
*/
public ClusterHealthRequest waitForNodes(String waitForNodes) {
this.waitForNodes = waitForNodes;
return this;
}
@Override public ActionRequestValidationException validate() {
return null;
}
@ -131,6 +145,7 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
}
waitForRelocatingShards = in.readInt();
waitForActiveShards = in.readInt();
waitForNodes = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -152,5 +167,6 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
}
out.writeInt(waitForRelocatingShards);
out.writeInt(waitForActiveShards);
out.writeUTF(waitForNodes);
}
}

View File

@ -40,6 +40,8 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
private String clusterName;
int numberOfNodes = 0;
int activeShards = 0;
int relocatingShards = 0;
@ -127,6 +129,14 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
return activePrimaryShards();
}
public int numberOfNodes() {
return this.numberOfNodes;
}
public int getNumberOfNodes() {
return numberOfNodes();
}
/**
* <tt>true</tt> if the waitForXXX has timeout out and did not match.
*/
@ -163,6 +173,7 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
activePrimaryShards = in.readVInt();
activeShards = in.readVInt();
relocatingShards = in.readVInt();
numberOfNodes = in.readVInt();
status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readVInt();
for (int i = 0; i < size; i++) {
@ -185,6 +196,7 @@ public class ClusterHealthResponse implements ActionResponse, Iterable<ClusterIn
out.writeVInt(activePrimaryShards);
out.writeVInt(activeShards);
out.writeVInt(relocatingShards);
out.writeVInt(numberOfNodes);
out.writeByte(status.value());
out.writeVInt(indices.size());
for (ClusterIndexHealth indexHealth : this) {

View File

@ -65,7 +65,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState state) throws ElasticSearchException {
int waitFor = 3;
int waitFor = 4;
if (request.waitForStatus() == null) {
waitFor--;
}
@ -75,6 +75,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (request.waitForActiveShards() == -1) {
waitFor--;
}
if (request.waitForNodes().isEmpty()) {
waitFor--;
}
if (waitFor == 0) {
// no need to wait for anything
return clusterHealth(request);
@ -92,6 +95,34 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (request.waitForActiveShards() != -1 && response.activeShards() >= request.waitForActiveShards()) {
waitForCounter++;
}
if (!request.waitForNodes().isEmpty()) {
if (request.waitForNodes().startsWith(">=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
if (response.numberOfNodes() >= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("M=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
if (response.numberOfNodes() <= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith(">")) {
int expected = Integer.parseInt(request.waitForNodes().substring(1));
if (response.numberOfNodes() > expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("<")) {
int expected = Integer.parseInt(request.waitForNodes().substring(1));
if (response.numberOfNodes() < expected) {
waitForCounter++;
}
} else {
int expected = Integer.parseInt(request.waitForNodes());
if (response.numberOfNodes() == expected) {
waitForCounter++;
}
}
}
if (waitForCounter == waitFor) {
return response;
}
@ -113,7 +144,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
ClusterState clusterState = clusterService.state();
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures());
response.numberOfNodes = clusterState.nodes().size();
request.indices(clusterState.metaData().concreteIndices(request.indices()));
for (String index : request.indices()) {

View File

@ -84,6 +84,14 @@ public class ClusterHealthRequestBuilder extends BaseClusterRequestBuilder<Clust
return this;
}
/**
* Waits for N number of nodes. Use "12" for exact mapping, ">12" and "<12" for range.
*/
public ClusterHealthRequestBuilder setWaitForNodes(String waitForNodes) {
request.waitForNodes(waitForNodes);
return this;
}
@Override protected void doExecute(ActionListener<ClusterHealthResponse> listener) {
client.health(request, listener);
}

View File

@ -276,6 +276,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
rwl.readLock().lock();
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
try {
// this engine always acts as if waitForOperations=true
if (refreshMutex.compareAndSet(false, true)) {

View File

@ -36,10 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.ScheduledRefreshableEngine;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
@ -517,6 +514,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override public void run() {
try {
engine.refresh(new Engine.Refresh(false));
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (RefreshFailedEngineException e) {
if (e.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown

View File

@ -57,6 +57,7 @@ public class RestClusterHealthAction extends BaseRestHandler {
}
clusterHealthRequest.waitForRelocatingShards(request.paramAsInt("wait_for_relocating_shards", clusterHealthRequest.waitForRelocatingShards()));
clusterHealthRequest.waitForActiveShards(request.paramAsInt("wait_for_active_shards", clusterHealthRequest.waitForActiveShards()));
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
String sLevel = request.param("level");
if (sLevel != null) {
if ("cluster".equals("sLevel")) {
@ -85,6 +86,7 @@ public class RestClusterHealthAction extends BaseRestHandler {
builder.field("status", response.status().name().toLowerCase());
builder.field("timed_out", response.timedOut());
builder.field("number_of_nodes", response.numberOfNodes());
builder.field("active_primary_shards", response.activePrimaryShards());
builder.field("active_shards", response.activeShards());
builder.field("relocating_shards", response.relocatingShards());

View File

@ -52,7 +52,7 @@ public class SimpleDataNodesTests extends AbstractNodesTests {
}
startNode("nonData2", settingsBuilder().put("node.data", false).build());
Thread.sleep(500);
assertThat(client("nonData1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
// still no shard should be allocated
try {
@ -64,7 +64,7 @@ public class SimpleDataNodesTests extends AbstractNodesTests {
// now, start a node data, and see that it gets with shards
startNode("data1", settingsBuilder().put("node.data", true).build());
Thread.sleep(500);
assertThat(client("nonData1").admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
IndexResponse indexResponse = client("nonData2").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.id(), equalTo("1"));

View File

@ -89,12 +89,10 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Starting server2");
// start another server
startNode("server2", settings);
Thread.sleep(200);
ClusterService clusterService2 = ((InternalNode) node("server2")).injector().getInstance(ClusterService.class);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
@ -113,12 +111,11 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Starting server3");
// start another server
startNode("server3", settings);
Thread.sleep(200);
ClusterService clusterService3 = ((InternalNode) node("server3")).injector().getInstance(ClusterService.class);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
@ -146,11 +143,9 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Closing server1");
// kill the first server
closeNode("server1");
// wait a bit so it will be discovered as removed
Thread.sleep(200);
// verify health
logger.info("Running Cluster Health");
clusterHealth = client("server2").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
clusterHealth = client("server2").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
@ -219,11 +214,9 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start another server
logger.info("Starting server2");
startNode("server2", settings);
// wait a bit
Thread.sleep(200);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
@ -247,13 +240,11 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
// start another server
logger.info("Starting server3");
startNode("server3");
// wait a bit so assignment will start
Thread.sleep(200);
ClusterService clusterService3 = ((InternalNode) node("server3")).injector().getInstance(ClusterService.class);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("3")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
@ -281,11 +272,9 @@ public class IndexLifecycleActionTests extends AbstractNodesTests {
logger.info("Closing server1");
// kill the first server
closeNode("server1");
// wait a bit so it will be discovered as removed
Thread.sleep(200);
logger.info("Running Cluster Health");
clusterHealth = client("server3").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet();
clusterHealth = client("server3").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));

View File

@ -19,7 +19,6 @@
package org.elasticsearch.test.integration.recovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
@ -57,14 +56,14 @@ public class FullRollingRestartTests extends AbstractNodesTests {
startNode("node3");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
// now start adding nodes
startNode("node4");
startNode("node5");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("5").execute().actionGet().timedOut(), equalTo(false));
client("node1").admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
@ -74,10 +73,10 @@ public class FullRollingRestartTests extends AbstractNodesTests {
// now start shutting nodes down
closeNode("node1");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false));
closeNode("node2");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
@ -87,11 +86,11 @@ public class FullRollingRestartTests extends AbstractNodesTests {
closeNode("node3");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
closeNode("node4");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false));
client("node5").admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.test.integration.recovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -104,7 +103,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
logger.info("--> waiting for GREEN health status ...");
// make sure the cluster state is green, and all has been recovered
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> waiting for 100000 docs to be indexed ...");
while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) {
@ -185,7 +184,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
startNode("node4");
logger.info("--> waiting for GREEN health status ...");
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> waiting for 150000 docs to be indexed ...");
@ -271,7 +270,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
startNode("node4");
logger.info("--> waiting for GREEN health status ...");
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> waiting for 100000 docs to be indexed ...");
@ -285,24 +284,24 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
logger.info("--> shutting down [node1] ...");
closeNode("node1");
logger.info("--> waiting for GREEN health status ...");
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> shutting down [node3] ...");
closeNode("node3");
logger.info("--> waiting for GREEN health status ...");
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN));
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> shutting down [node4] ...");
closeNode("node4");
logger.info("--> waiting for YELLOW health status ...");
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> marking and waiting for indexing threads to stop ...");
stop.set(true);
stopLatch.await();
logger.info("--> indexing threads stopped");
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false));
logger.info("--> refreshing the index");
client("node2").admin().indices().prepareRefresh().execute().actionGet();

View File

@ -86,9 +86,8 @@ public class SimpleRecoveryTests extends AbstractNodesTests {
// now start another one so we move some primaries
startNode("server3");
Thread.sleep(1000);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("3")).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));

View File

@ -75,7 +75,7 @@ public class TransportSearchFailuresTests extends AbstractNodesTests {
}
startNode("server2");
Thread.sleep(500);
assertThat(client("server1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false));
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth("test")