From 48d33ec70a8ea6f872006e46d1c8e0cdd48a2c81 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 21 Jul 2010 16:29:44 +0300 Subject: [PATCH] Cluster Health API: Add `wait_for_nodes` (accepts "N", "N", "<=N", and ">=N"), closes #269. --- .../cluster/health/ClusterHealthRequest.java | 16 +++++++++ .../cluster/health/ClusterHealthResponse.java | 12 +++++++ .../health/TransportClusterHealthAction.java | 35 +++++++++++++++++-- .../health/ClusterHealthRequestBuilder.java | 8 +++++ .../index/engine/robin/RobinEngine.java | 3 ++ .../shard/service/InternalIndexShard.java | 7 ++-- .../health/RestClusterHealthAction.java | 2 ++ .../datanode/SimpleDataNodesTests.java | 4 +-- .../IndexLifecycleActionTests.java | 23 ++++-------- .../recovery/FullRollingRestartTests.java | 13 ++++--- .../recovery/RecoveryWhileUnderLoadTests.java | 15 ++++---- .../recovery/SimpleRecoveryTests.java | 3 +- .../search/TransportSearchFailuresTests.java | 2 +- 13 files changed, 100 insertions(+), 43 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index b137902474a..8dd124ece5b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -46,6 +46,8 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest { private int waitForActiveShards = -1; + private String waitForNodes = ""; + ClusterHealthRequest() { } @@ -110,6 +112,18 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest { return this; } + public String waitForNodes() { + return waitForNodes; + } + + /** + * Waits for N number of nodes. Use "12" for exact mapping, ">12" and "<12" for range. + */ + public ClusterHealthRequest waitForNodes(String waitForNodes) { + this.waitForNodes = waitForNodes; + return this; + } + @Override public ActionRequestValidationException validate() { return null; } @@ -131,6 +145,7 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest { } waitForRelocatingShards = in.readInt(); waitForActiveShards = in.readInt(); + waitForNodes = in.readUTF(); } @Override public void writeTo(StreamOutput out) throws IOException { @@ -152,5 +167,6 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest { } out.writeInt(waitForRelocatingShards); out.writeInt(waitForActiveShards); + out.writeUTF(waitForNodes); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java index 878da47de34..199f5c7dd1c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -40,6 +40,8 @@ public class ClusterHealthResponse implements ActionResponse, Iterabletrue if the waitForXXX has timeout out and did not match. */ @@ -163,6 +173,7 @@ public class ClusterHealthResponse implements ActionResponse, Iterable= request.waitForActiveShards()) { waitForCounter++; } + if (!request.waitForNodes().isEmpty()) { + if (request.waitForNodes().startsWith(">=")) { + int expected = Integer.parseInt(request.waitForNodes().substring(2)); + if (response.numberOfNodes() >= expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("M=")) { + int expected = Integer.parseInt(request.waitForNodes().substring(2)); + if (response.numberOfNodes() <= expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith(">")) { + int expected = Integer.parseInt(request.waitForNodes().substring(1)); + if (response.numberOfNodes() > expected) { + waitForCounter++; + } + } else if (request.waitForNodes().startsWith("<")) { + int expected = Integer.parseInt(request.waitForNodes().substring(1)); + if (response.numberOfNodes() < expected) { + waitForCounter++; + } + } else { + int expected = Integer.parseInt(request.waitForNodes()); + if (response.numberOfNodes() == expected) { + waitForCounter++; + } + } + } if (waitForCounter == waitFor) { return response; } @@ -113,7 +144,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc ClusterState clusterState = clusterService.state(); RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData()); ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures()); - + response.numberOfNodes = clusterState.nodes().size(); request.indices(clusterState.metaData().concreteIndices(request.indices())); for (String index : request.indices()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/health/ClusterHealthRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/health/ClusterHealthRequestBuilder.java index 7961c3ba7f4..1a40edbee03 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/health/ClusterHealthRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/health/ClusterHealthRequestBuilder.java @@ -84,6 +84,14 @@ public class ClusterHealthRequestBuilder extends BaseClusterRequestBuilder12" and "<12" for range. + */ + public ClusterHealthRequestBuilder setWaitForNodes(String waitForNodes) { + request.waitForNodes(waitForNodes); + return this; + } + @Override protected void doExecute(ActionListener listener) { client.health(request, listener); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index d50018e477d..9caa0058b34 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -276,6 +276,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) rwl.readLock().lock(); + if (indexWriter == null) { + throw new EngineClosedException(shardId); + } try { // this engine always acts as if waitForOperations=true if (refreshMutex.compareAndSet(false, true)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 348a215578e..54f136a12e6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -36,10 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.index.cache.IndexCache; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineException; -import org.elasticsearch.index.engine.RefreshFailedEngineException; -import org.elasticsearch.index.engine.ScheduledRefreshableEngine; +import org.elasticsearch.index.engine.*; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperNotFoundException; import org.elasticsearch.index.mapper.MapperService; @@ -517,6 +514,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public void run() { try { engine.refresh(new Engine.Refresh(false)); + } catch (EngineClosedException e) { + // we are being closed, ignore } catch (RefreshFailedEngineException e) { if (e.getCause() instanceof InterruptedException) { // ignore, we are being shutdown diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java index 037ff0c32e2..1a8e4772570 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/health/RestClusterHealthAction.java @@ -57,6 +57,7 @@ public class RestClusterHealthAction extends BaseRestHandler { } clusterHealthRequest.waitForRelocatingShards(request.paramAsInt("wait_for_relocating_shards", clusterHealthRequest.waitForRelocatingShards())); clusterHealthRequest.waitForActiveShards(request.paramAsInt("wait_for_active_shards", clusterHealthRequest.waitForActiveShards())); + clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes())); String sLevel = request.param("level"); if (sLevel != null) { if ("cluster".equals("sLevel")) { @@ -85,6 +86,7 @@ public class RestClusterHealthAction extends BaseRestHandler { builder.field("status", response.status().name().toLowerCase()); builder.field("timed_out", response.timedOut()); + builder.field("number_of_nodes", response.numberOfNodes()); builder.field("active_primary_shards", response.activePrimaryShards()); builder.field("active_shards", response.activeShards()); builder.field("relocating_shards", response.relocatingShards()); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java index 967180e807f..908406a84a9 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java @@ -52,7 +52,7 @@ public class SimpleDataNodesTests extends AbstractNodesTests { } startNode("nonData2", settingsBuilder().put("node.data", false).build()); - Thread.sleep(500); + assertThat(client("nonData1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); // still no shard should be allocated try { @@ -64,7 +64,7 @@ public class SimpleDataNodesTests extends AbstractNodesTests { // now, start a node data, and see that it gets with shards startNode("data1", settingsBuilder().put("node.data", true).build()); - Thread.sleep(500); + assertThat(client("nonData1").admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); IndexResponse indexResponse = client("nonData2").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); assertThat(indexResponse.id(), equalTo("1")); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java index 5cc344d2d79..76b80ae7e68 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indexlifecycle/IndexLifecycleActionTests.java @@ -89,12 +89,10 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Starting server2"); // start another server startNode("server2", settings); - Thread.sleep(200); - ClusterService clusterService2 = ((InternalNode) node("server2")).injector().getInstance(ClusterService.class); logger.info("Running Cluster Health"); - clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("2")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); @@ -113,12 +111,11 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Starting server3"); // start another server startNode("server3", settings); - Thread.sleep(200); ClusterService clusterService3 = ((InternalNode) node("server3")).injector().getInstance(ClusterService.class); logger.info("Running Cluster Health"); - clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("3").waitForRelocatingShards(0)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); @@ -146,11 +143,9 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Closing server1"); // kill the first server closeNode("server1"); - // wait a bit so it will be discovered as removed - Thread.sleep(200); // verify health logger.info("Running Cluster Health"); - clusterHealth = client("server2").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + clusterHealth = client("server2").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); @@ -219,11 +214,9 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { // start another server logger.info("Starting server2"); startNode("server2", settings); - // wait a bit - Thread.sleep(200); logger.info("Running Cluster Health"); - clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("2")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); @@ -247,13 +240,11 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { // start another server logger.info("Starting server3"); startNode("server3"); - // wait a bit so assignment will start - Thread.sleep(200); ClusterService clusterService3 = ((InternalNode) node("server3")).injector().getInstance(ClusterService.class); logger.info("Running Cluster Health"); - clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0).waitForNodes("3")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); @@ -281,11 +272,9 @@ public class IndexLifecycleActionTests extends AbstractNodesTests { logger.info("Closing server1"); // kill the first server closeNode("server1"); - // wait a bit so it will be discovered as removed - Thread.sleep(200); logger.info("Running Cluster Health"); - clusterHealth = client("server3").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); + clusterHealth = client("server3").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("2").waitForRelocatingShards(0)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java index 6c167df3042..f9e73d0152b 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/FullRollingRestartTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.integration.recovery; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.test.integration.AbstractNodesTests; import org.testng.annotations.AfterMethod; @@ -57,14 +56,14 @@ public class FullRollingRestartTests extends AbstractNodesTests { startNode("node3"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); // now start adding nodes startNode("node4"); startNode("node5"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("5").execute().actionGet().timedOut(), equalTo(false)); client("node1").admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { @@ -74,10 +73,10 @@ public class FullRollingRestartTests extends AbstractNodesTests { // now start shutting nodes down closeNode("node1"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false)); closeNode("node2"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); client("node5").admin().indices().prepareRefresh().execute().actionGet(); @@ -87,11 +86,11 @@ public class FullRollingRestartTests extends AbstractNodesTests { closeNode("node3"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); closeNode("node4"); // make sure the cluster state is green, and all has been recovered - assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(client("node5").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false)); client("node5").admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java index d3b109fcf11..d4466da4f77 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.integration.recovery; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -104,7 +103,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests { logger.info("--> waiting for GREEN health status ..."); // make sure the cluster state is green, and all has been recovered - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> waiting for 100000 docs to be indexed ..."); while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) { @@ -185,7 +184,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests { startNode("node4"); logger.info("--> waiting for GREEN health status ..."); - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> waiting for 150000 docs to be indexed ..."); @@ -271,7 +270,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests { startNode("node4"); logger.info("--> waiting for GREEN health status ..."); - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("4").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> waiting for 100000 docs to be indexed ..."); @@ -285,24 +284,24 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests { logger.info("--> shutting down [node1] ..."); closeNode("node1"); logger.info("--> waiting for GREEN health status ..."); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("3").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> shutting down [node3] ..."); closeNode("node3"); logger.info("--> waiting for GREEN health status ..."); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> shutting down [node4] ..."); closeNode("node4"); logger.info("--> waiting for YELLOW health status ..."); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> marking and waiting for indexing threads to stop ..."); stop.set(true); stopLatch.await(); logger.info("--> indexing threads stopped"); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().setWaitForNodes("1").execute().actionGet().timedOut(), equalTo(false)); logger.info("--> refreshing the index"); client("node2").admin().indices().prepareRefresh().execute().actionGet(); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/SimpleRecoveryTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/SimpleRecoveryTests.java index a60be29d7bc..948f9f0a6c0 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/SimpleRecoveryTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/SimpleRecoveryTests.java @@ -86,9 +86,8 @@ public class SimpleRecoveryTests extends AbstractNodesTests { // now start another one so we move some primaries startNode("server3"); - Thread.sleep(1000); logger.info("Running Cluster Health"); - clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus().waitForNodes("3")).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TransportSearchFailuresTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TransportSearchFailuresTests.java index 6487c12eaed..74837b72d5f 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TransportSearchFailuresTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TransportSearchFailuresTests.java @@ -75,7 +75,7 @@ public class TransportSearchFailuresTests extends AbstractNodesTests { } startNode("server2"); - Thread.sleep(500); + assertThat(client("server1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().timedOut(), equalTo(false)); logger.info("Running Cluster Health"); ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth("test")