From d5e8a5cd6910ac75042cffc64012b506933490f1 Mon Sep 17 00:00:00 2001 From: Van0SS Date: Tue, 12 Jun 2018 07:34:06 -0400 Subject: [PATCH] REST high-level client: add Cluster Health API (#29331) Relates to #27205 --- .../elasticsearch/client/ClusterClient.java | 34 +++ .../client/RequestConverters.java | 61 ++++++ .../elasticsearch/client/ClusterClientIT.java | 138 ++++++++++++ .../client/RequestConvertersTests.java | 90 ++++++++ .../ClusterClientDocumentationIT.java | 180 +++++++++++++++ .../high-level/cluster/health.asciidoc | 205 ++++++++++++++++++ .../high-level/supported-apis.asciidoc | 2 + docs/reference/cluster/health.asciidoc | 9 + .../cluster/health/ClusterHealthRequest.java | 28 +++ .../cluster/health/ClusterHealthResponse.java | 156 +++++++++++-- .../cluster/health/ClusterIndexHealth.java | 166 +++++++++++--- .../cluster/health/ClusterShardHealth.java | 109 +++++++++- .../cluster/health/ClusterStateHealth.java | 64 +++++- .../health/ClusterHealthResponsesTests.java | 115 +++++++++- .../health/ClusterIndexHealthTests.java | 143 +++++++++++- .../health/ClusterShardHealthTests.java | 111 ++++++++++ .../tasks/CancelTasksResponseTests.java | 3 +- .../tasks/ListTasksResponseTests.java | 3 +- .../test/AbstractSerializingTestCase.java | 10 +- .../AbstractStreamableXContentTestCase.java | 10 +- .../test/AbstractXContentTestCase.java | 20 +- 21 files changed, 1588 insertions(+), 69 deletions(-) create mode 100644 docs/java-rest/high-level/cluster/health.asciidoc create mode 100644 server/src/test/java/org/elasticsearch/cluster/health/ClusterShardHealthTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java index 488579785e0..1e25a40b008 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java @@ -21,12 +21,16 @@ package org.elasticsearch.client; import org.apache.http.Header; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; /** * A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API. @@ -95,4 +99,34 @@ public final class ClusterClient { restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings, ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers); } + + /** + * Get cluster health using the Cluster Health API. + * See + * Cluster Health API on elastic.co + *

+ * If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT + * @param healthRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public ClusterHealthResponse health(ClusterHealthRequest healthRequest, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(healthRequest, RequestConverters::clusterHealth, options, + ClusterHealthResponse::fromXContent, singleton(RestStatus.REQUEST_TIMEOUT.getStatus())); + } + + /** + * Asynchronously get cluster health using the Cluster Health API. + * See + * Cluster Health API on elastic.co + * If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT + * @param healthRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public void healthAsync(ClusterHealthRequest healthRequest, RequestOptions options, ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, RequestConverters::clusterHealth, options, + ClusterHealthResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus())); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index b9e757cea54..e2b61dc41de 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -29,6 +29,7 @@ import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; @@ -74,7 +75,9 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; @@ -706,6 +709,28 @@ final class RequestConverters { return request; } + static Request clusterHealth(ClusterHealthRequest healthRequest) { + String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices(); + String endpoint = new EndpointBuilder() + .addPathPartAsIs("_cluster/health") + .addCommaSeparatedPathParts(indices) + .build(); + Request request = new Request(HttpGet.METHOD_NAME, endpoint); + + new Params(request) + .withWaitForStatus(healthRequest.waitForStatus()) + .withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards()) + .withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards()) + .withWaitForActiveShards(healthRequest.waitForActiveShards()) + .withWaitForNodes(healthRequest.waitForNodes()) + .withWaitForEvents(healthRequest.waitForEvents()) + .withTimeout(healthRequest.timeout()) + .withMasterTimeout(healthRequest.masterNodeTimeout()) + .withLocal(healthRequest.local()) + .withLevel(healthRequest.level()); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); @@ -1124,6 +1149,42 @@ final class RequestConverters { } return this; } + + Params withWaitForStatus(ClusterHealthStatus status) { + if (status != null) { + return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT)); + } + return this; + } + + Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) { + if (waitNoRelocatingShards) { + return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString()); + } + return this; + } + + Params withWaitForNoInitializingShards(boolean waitNoInitShards) { + if (waitNoInitShards) { + return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString()); + } + return this; + } + + Params withWaitForNodes(String waitForNodes) { + return putParam("wait_for_nodes", waitForNodes); + } + + Params withLevel(ClusterHealthRequest.Level level) { + return putParam("level", level.name().toLowerCase(Locale.ROOT)); + } + + Params withWaitForEvents(Priority waitForEvents) { + if (waitForEvents != null) { + return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT)); + } + return this; + } } /** diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java index f1110163b25..7cf9fca07c3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java @@ -20,8 +20,13 @@ package org.elasticsearch.client; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -34,6 +39,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static java.util.Collections.emptyMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -108,4 +114,136 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase { assertThat(exception.getMessage(), equalTo( "Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]")); } + + public void testClusterHealthGreen() throws IOException { + ClusterHealthRequest request = new ClusterHealthRequest(); + request.timeout("5s"); + ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); + + assertThat(response, notNullValue()); + assertThat(response.isTimedOut(), equalTo(false)); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertNoIndices(response); + } + + public void testClusterHealthYellowClusterLevel() throws IOException { + createIndex("index", Settings.EMPTY); + createIndex("index2", Settings.EMPTY); + ClusterHealthRequest request = new ClusterHealthRequest(); + request.timeout("5s"); + request.level(ClusterHealthRequest.Level.CLUSTER); + ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); + + assertYellowShards(response); + assertThat(response.getIndices().size(), equalTo(0)); + } + + public void testClusterHealthYellowIndicesLevel() throws IOException { + createIndex("index", Settings.EMPTY); + createIndex("index2", Settings.EMPTY); + ClusterHealthRequest request = new ClusterHealthRequest(); + request.timeout("5s"); + request.level(ClusterHealthRequest.Level.INDICES); + ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); + + assertYellowShards(response); + assertThat(response.getIndices().size(), equalTo(2)); + for (Map.Entry entry : response.getIndices().entrySet()) { + assertYellowIndex(entry.getKey(), entry.getValue(), true); + } + } + + private static void assertYellowShards(ClusterHealthResponse response) { + assertThat(response, notNullValue()); + assertThat(response.isTimedOut(), equalTo(false)); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(response.getActivePrimaryShards(), equalTo(2)); + assertThat(response.getNumberOfDataNodes(), equalTo(1)); + assertThat(response.getNumberOfNodes(), equalTo(1)); + assertThat(response.getActiveShards(), equalTo(2)); + assertThat(response.getDelayedUnassignedShards(), equalTo(0)); + assertThat(response.getInitializingShards(), equalTo(0)); + assertThat(response.getUnassignedShards(), equalTo(2)); + assertThat(response.getActiveShardsPercent(), equalTo(50d)); + } + + public void testClusterHealthYellowSpecificIndex() throws IOException { + createIndex("index", Settings.EMPTY); + createIndex("index2", Settings.EMPTY); + ClusterHealthRequest request = new ClusterHealthRequest("index"); + request.timeout("5s"); + ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); + + assertThat(response, notNullValue()); + assertThat(response.isTimedOut(), equalTo(false)); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(response.getActivePrimaryShards(), equalTo(1)); + assertThat(response.getNumberOfDataNodes(), equalTo(1)); + assertThat(response.getNumberOfNodes(), equalTo(1)); + assertThat(response.getActiveShards(), equalTo(1)); + assertThat(response.getDelayedUnassignedShards(), equalTo(0)); + assertThat(response.getInitializingShards(), equalTo(0)); + assertThat(response.getUnassignedShards(), equalTo(1)); + assertThat(response.getActiveShardsPercent(), equalTo(50d)); + assertThat(response.getIndices().size(), equalTo(1)); + Map.Entry index = response.getIndices().entrySet().iterator().next(); + assertYellowIndex(index.getKey(), index.getValue(), false); + } + + private static void assertYellowIndex(String indexName, ClusterIndexHealth indexHealth, boolean emptyShards) { + assertThat(indexHealth, notNullValue()); + assertThat(indexHealth.getIndex(),equalTo(indexName)); + assertThat(indexHealth.getActivePrimaryShards(),equalTo(1)); + assertThat(indexHealth.getActiveShards(),equalTo(1)); + assertThat(indexHealth.getNumberOfReplicas(),equalTo(1)); + assertThat(indexHealth.getInitializingShards(),equalTo(0)); + assertThat(indexHealth.getUnassignedShards(),equalTo(1)); + assertThat(indexHealth.getRelocatingShards(),equalTo(0)); + assertThat(indexHealth.getStatus(),equalTo(ClusterHealthStatus.YELLOW)); + if (emptyShards) { + assertThat(indexHealth.getShards().size(), equalTo(0)); + } else { + assertThat(indexHealth.getShards().size(), equalTo(1)); + for (Map.Entry entry : indexHealth.getShards().entrySet()) { + assertYellowShard(entry.getKey(), entry.getValue()); + } + } + } + + private static void assertYellowShard(int shardId, ClusterShardHealth shardHealth) { + assertThat(shardHealth, notNullValue()); + assertThat(shardHealth.getShardId(), equalTo(shardId)); + assertThat(shardHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(shardHealth.getActiveShards(), equalTo(1)); + assertThat(shardHealth.getInitializingShards(), equalTo(0)); + assertThat(shardHealth.getUnassignedShards(), equalTo(1)); + assertThat(shardHealth.getRelocatingShards(), equalTo(0)); + } + + public void testClusterHealthNotFoundIndex() throws IOException { + ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index"); + request.timeout("5s"); + ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync); + + assertThat(response, notNullValue()); + assertThat(response.isTimedOut(), equalTo(true)); + assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT)); + assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED)); + assertNoIndices(response); + } + + private static void assertNoIndices(ClusterHealthResponse response) { + assertThat(response.getIndices(), equalTo(emptyMap())); + assertThat(response.getActivePrimaryShards(), equalTo(0)); + assertThat(response.getNumberOfDataNodes(), equalTo(1)); + assertThat(response.getNumberOfNodes(), equalTo(1)); + assertThat(response.getActiveShards(), equalTo(0)); + assertThat(response.getDelayedUnassignedShards(), equalTo(0)); + assertThat(response.getInitializingShards(), equalTo(0)); + assertThat(response.getUnassignedShards(), equalTo(0)); + assertThat(response.getActiveShardsPercent(), equalTo(100d)); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index d1ef59e0267..1e03e55f61f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -29,6 +29,7 @@ import org.apache.http.entity.ByteArrayEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; @@ -83,8 +84,10 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestConverters.EndpointBuilder; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -125,6 +128,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.RandomObjects; +import org.hamcrest.CoreMatchers; import java.io.IOException; import java.io.InputStream; @@ -1526,6 +1530,85 @@ public class RequestConvertersTests extends ESTestCase { assertEquals(expectedParams, expectedRequest.getParameters()); } + public void testClusterHealth() { + ClusterHealthRequest healthRequest = new ClusterHealthRequest(); + Map expectedParams = new HashMap<>(); + setRandomLocal(healthRequest, expectedParams); + String timeoutType = randomFrom("timeout", "masterTimeout", "both", "none"); + String timeout = randomTimeValue(); + String masterTimeout = randomTimeValue(); + switch (timeoutType) { + case "timeout": + healthRequest.timeout(timeout); + expectedParams.put("timeout", timeout); + // If Master Timeout wasn't set it uses the same value as Timeout + expectedParams.put("master_timeout", timeout); + break; + case "masterTimeout": + expectedParams.put("timeout", "30s"); + healthRequest.masterNodeTimeout(masterTimeout); + expectedParams.put("master_timeout", masterTimeout); + break; + case "both": + healthRequest.timeout(timeout); + expectedParams.put("timeout", timeout); + healthRequest.masterNodeTimeout(timeout); + expectedParams.put("master_timeout", timeout); + break; + case "none": + expectedParams.put("timeout", "30s"); + expectedParams.put("master_timeout", "30s"); + break; + default: + throw new UnsupportedOperationException(); + } + setRandomWaitForActiveShards(healthRequest::waitForActiveShards, expectedParams, "0"); + if (randomBoolean()) { + ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.values()); + healthRequest.level(level); + expectedParams.put("level", level.name().toLowerCase(Locale.ROOT)); + } else { + expectedParams.put("level", "shards"); + } + if (randomBoolean()) { + Priority priority = randomFrom(Priority.values()); + healthRequest.waitForEvents(priority); + expectedParams.put("wait_for_events", priority.name().toLowerCase(Locale.ROOT)); + } + if (randomBoolean()) { + ClusterHealthStatus status = randomFrom(ClusterHealthStatus.values()); + healthRequest.waitForStatus(status); + expectedParams.put("wait_for_status", status.name().toLowerCase(Locale.ROOT)); + } + if (randomBoolean()) { + boolean waitForNoInitializingShards = randomBoolean(); + healthRequest.waitForNoInitializingShards(waitForNoInitializingShards); + if (waitForNoInitializingShards) { + expectedParams.put("wait_for_no_initializing_shards", Boolean.TRUE.toString()); + } + } + if (randomBoolean()) { + boolean waitForNoRelocatingShards = randomBoolean(); + healthRequest.waitForNoRelocatingShards(waitForNoRelocatingShards); + if (waitForNoRelocatingShards) { + expectedParams.put("wait_for_no_relocating_shards", Boolean.TRUE.toString()); + } + } + String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); + healthRequest.indices(indices); + + Request request = RequestConverters.clusterHealth(healthRequest); + assertThat(request, CoreMatchers.notNullValue()); + assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME)); + assertThat(request.getEntity(), nullValue()); + if (indices != null && indices.length > 0) { + assertThat(request.getEndpoint(), equalTo("/_cluster/health/" + String.join(",", indices))); + } else { + assertThat(request.getEndpoint(), equalTo("/_cluster/health")); + } + assertThat(request.getParameters(), equalTo(expectedParams)); + } + public void testRollover() throws IOException { RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10), randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10)); @@ -2104,6 +2187,11 @@ public class RequestConvertersTests extends ESTestCase { } private static void setRandomWaitForActiveShards(Consumer setter, Map expectedParams) { + setRandomWaitForActiveShards(setter, expectedParams, null); + } + + private static void setRandomWaitForActiveShards(Consumer setter,Map expectedParams, + String defaultValue) { if (randomBoolean()) { String waitForActiveShardsString; int waitForActiveShards = randomIntBetween(-1, 5); @@ -2114,6 +2202,8 @@ public class RequestConvertersTests extends ESTestCase { } setter.accept(ActiveShardCount.parseString(waitForActiveShardsString)); expectedParams.put("wait_for_active_shards", waitForActiveShardsString); + } else if (defaultValue != null) { + expectedParams.put("wait_for_active_shards", defaultValue); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java index 75902cf02ba..84a124f764b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java @@ -21,17 +21,26 @@ package org.elasticsearch.client.documentation; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.HashMap; @@ -40,6 +49,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; /** * This class is used to generate the Java Cluster API documentation. @@ -179,4 +189,174 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase } } + public void testClusterHealth() throws IOException { + RestHighLevelClient client = highLevelClient(); + client.indices().create(new CreateIndexRequest("index"), RequestOptions.DEFAULT); + { + // tag::health-request + ClusterHealthRequest request = new ClusterHealthRequest(); + // end::health-request + } + { + // tag::health-request-indices-ctr + ClusterHealthRequest request = new ClusterHealthRequest("index1", "index2"); + // end::health-request-indices-ctr + } + { + // tag::health-request-indices-setter + ClusterHealthRequest request = new ClusterHealthRequest(); + request.indices("index1", "index2"); + // end::health-request-indices-setter + } + ClusterHealthRequest request = new ClusterHealthRequest(); + + // tag::health-request-timeout + request.timeout(TimeValue.timeValueSeconds(50)); // <1> + request.timeout("50s"); // <2> + // end::health-request-timeout + + // tag::health-request-master-timeout + request.masterNodeTimeout(TimeValue.timeValueSeconds(20)); // <1> + request.masterNodeTimeout("20s"); // <2> + // end::health-request-master-timeout + + // tag::health-request-wait-status + request.waitForStatus(ClusterHealthStatus.YELLOW); // <1> + request.waitForYellowStatus(); // <2> + // end::health-request-wait-status + + // tag::health-request-wait-events + request.waitForEvents(Priority.NORMAL); // <1> + // end::health-request-wait-events + + // tag::health-request-level + request.level(ClusterHealthRequest.Level.SHARDS); // <1> + // end::health-request-level + + // tag::health-request-wait-relocation + request.waitForNoRelocatingShards(true); // <1> + // end::health-request-wait-relocation + + // tag::health-request-wait-initializing + request.waitForNoInitializingShards(true); // <1> + // end::health-request-wait-initializing + + // tag::health-request-wait-nodes + request.waitForNodes("2"); // <1> + request.waitForNodes(">=2"); // <2> + request.waitForNodes("le(2)"); // <3> + // end::health-request-wait-nodes + + // tag::health-request-wait-active + request.waitForActiveShards(ActiveShardCount.ALL); // <1> + request.waitForActiveShards(1); // <2> + // end::health-request-wait-active + + // tag::health-request-local + request.local(true); // <1> + // end::health-request-local + + // tag::health-execute + ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); + // end::health-execute + + assertThat(response.isTimedOut(), equalTo(false)); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); + assertThat(response, notNullValue()); + // tag::health-response-general + String clusterName = response.getClusterName(); // <1> + ClusterHealthStatus status = response.getStatus(); // <2> + // end::health-response-general + + // tag::health-response-request-status + boolean timedOut = response.isTimedOut(); // <1> + RestStatus restStatus = response.status(); // <2> + // end::health-response-request-status + + // tag::health-response-nodes + int numberOfNodes = response.getNumberOfNodes(); // <1> + int numberOfDataNodes = response.getNumberOfDataNodes(); // <2> + // end::health-response-nodes + + { + // tag::health-response-shards + int activeShards = response.getActiveShards(); // <1> + int activePrimaryShards = response.getActivePrimaryShards(); // <2> + int relocatingShards = response.getRelocatingShards(); // <3> + int initializingShards = response.getInitializingShards(); // <4> + int unassignedShards = response.getUnassignedShards(); // <5> + int delayedUnassignedShards = response.getDelayedUnassignedShards(); // <6> + double activeShardsPercent = response.getActiveShardsPercent(); // <7> + // end::health-response-shards + } + + // tag::health-response-task + TimeValue taskMaxWaitingTime = response.getTaskMaxWaitingTime(); // <1> + int numberOfPendingTasks = response.getNumberOfPendingTasks(); // <2> + int numberOfInFlightFetch = response.getNumberOfInFlightFetch(); // <3> + // end::health-response-task + + // tag::health-response-indices + Map indices = response.getIndices(); // <1> + // end::health-response-indices + + { + // tag::health-response-index + ClusterIndexHealth index = indices.get("index"); // <1> + ClusterHealthStatus indexStatus = index.getStatus(); + int numberOfShards = index.getNumberOfShards(); + int numberOfReplicas = index.getNumberOfReplicas(); + int activeShards = index.getActiveShards(); + int activePrimaryShards = index.getActivePrimaryShards(); + int initializingShards = index.getInitializingShards(); + int relocatingShards = index.getRelocatingShards(); + int unassignedShards = index.getUnassignedShards(); + // end::health-response-index + + // tag::health-response-shard-details + Map shards = index.getShards(); // <1> + ClusterShardHealth shardHealth = shards.get(0); + int shardId = shardHealth.getShardId(); + ClusterHealthStatus shardStatus = shardHealth.getStatus(); + int active = shardHealth.getActiveShards(); + int initializing = shardHealth.getInitializingShards(); + int unassigned = shardHealth.getUnassignedShards(); + int relocating = shardHealth.getRelocatingShards(); + boolean primaryActive = shardHealth.isPrimaryActive(); + // end::health-response-shard-details + } + } + + public void testClusterHealthAsync() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + ClusterHealthRequest request = new ClusterHealthRequest(); + + // tag::health-execute-listener + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(ClusterHealthResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::health-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::health-execute-async + client.cluster().healthAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::health-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } } diff --git a/docs/java-rest/high-level/cluster/health.asciidoc b/docs/java-rest/high-level/cluster/health.asciidoc new file mode 100644 index 00000000000..6c0f926f15f --- /dev/null +++ b/docs/java-rest/high-level/cluster/health.asciidoc @@ -0,0 +1,205 @@ +[[java-rest-high-cluster-health]] +=== Cluster Health API + +The Cluster Health API allows getting cluster health. + +[[java-rest-high-cluster-health-request]] +==== Cluster Health Request + +A `ClusterHealthRequest`: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request] +-------------------------------------------------- +There are no required parameters. By default, the client will check all indices and will not wait +for any events. + +==== Indices + +Indices which should be checked can be passed in the constructor: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-indices-ctr] +-------------------------------------------------- + +Or using the corresponding setter method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-indices-setter] +-------------------------------------------------- + +==== Other parameters + +Other parameters can be passed only through setter methods: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-timeout] +-------------------------------------------------- +<1> Timeout for the request as a `TimeValue`. Defaults to 30 seconds +<2> As a `String` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-master-timeout] +-------------------------------------------------- +<1> Timeout to connect to the master node as a `TimeValue`. Defaults to the same as `timeout` +<2> As a `String` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-status] +-------------------------------------------------- +<1> The status to wait (e.g. `green`, `yellow`, or `red`). Accepts a `ClusterHealthStatus` value. +<2> Using predefined method + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-events] +-------------------------------------------------- +<1> The priority of the events to wait for. Accepts a `Priority` value. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level] +-------------------------------------------------- +<1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-relocation] +-------------------------------------------------- +<1> Wait for 0 relocating shards. Defaults to `false` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-initializing] +-------------------------------------------------- +<1> Wait for 0 initializing shards. Defaults to `false` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-nodes] +-------------------------------------------------- +<1> Wait for `N` nodes in the cluster. Defaults to `0` +<2> Using `>=N`, `<=N`, `>N` and ` Using `ge(N)`, `le(N)`, `gt(N)`, `lt(N)` notation + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-active] +-------------------------------------------------- + +<1> Wait for all shards to be active in the cluster +<2> Wait for `N` shards to be active in the cluster + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-local] +-------------------------------------------------- +<1> Non-master node can be used for this request. Defaults to `false` + +[[java-rest-high-cluster-health-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-execute] +-------------------------------------------------- + +[[java-rest-high-cluster-health-async]] +==== Asynchronous Execution + +The asynchronous execution of a cluster health request requires both the +`ClusterHealthRequest` instance and an `ActionListener` instance to be +passed to the asynchronous method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-execute-async] +-------------------------------------------------- +<1> The `ClusterHealthRequest` to execute and the `ActionListener` to use +when the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `ClusterHealthResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of a failure. The raised exception is provided as an argument + +[[java-rest-high-cluster-health-response]] +==== Cluster Health Response + +The returned `ClusterHealthResponse` contains the next information about the +cluster: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-general] +-------------------------------------------------- +<1> Name of the cluster +<2> Cluster status (`green`, `yellow` or `red`) + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-request-status] +-------------------------------------------------- +<1> Whether request was timed out while processing +<2> Status of the request (`OK` or `REQUEST_TIMEOUT`). Other errors will be thrown as exceptions + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-nodes] +-------------------------------------------------- +<1> Number of nodes in the cluster +<2> Number of data nodes in the cluster + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-shards] +-------------------------------------------------- +<1> Number of active shards +<2> Number of primary active shards +<3> Number of relocating shards +<4> Number of initializing shards +<5> Number of unassigned shards +<6> Number of unassigned shards that are currently being delayed +<7> Percent of active shards + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-task] +-------------------------------------------------- +<1> Maximum wait time of all tasks in the queue +<2> Number of currently pending tasks +<3> Number of async fetches that are currently ongoing + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-indices] +-------------------------------------------------- +<1> Detailed information about indices in the cluster + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-index] +-------------------------------------------------- +<1> Detailed information about a specific index + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-shard-details] +-------------------------------------------------- +<1> Detailed information about a specific shard \ No newline at end of file diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 465ae20619d..b33c2421b06 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -110,8 +110,10 @@ include::indices/get_templates.asciidoc[] The Java High Level REST Client supports the following Cluster APIs: * <> +* <> include::cluster/put_settings.asciidoc[] +include::cluster/health.asciidoc[] == Ingest APIs The Java High Level REST Client supports the following Ingest APIs: diff --git a/docs/reference/cluster/health.asciidoc b/docs/reference/cluster/health.asciidoc index 87c4e17f452..1e33455d026 100644 --- a/docs/reference/cluster/health.asciidoc +++ b/docs/reference/cluster/health.asciidoc @@ -104,10 +104,19 @@ The cluster health API accepts the following request parameters: Alternatively, it is possible to use `ge(N)`, `le(N)`, `gt(N)` and `lt(N)` notation. +`wait_for_events`:: + Can be one of `immediate`, `urgent`, `high`, `normal`, `low`, `languid`. + Wait until all currently queued events with the given priority are processed. + `timeout`:: A time based parameter controlling how long to wait if one of the wait_for_XXX are provided. Defaults to `30s`. +`master_timeout`:: + A time based parameter controlling how long to wait if the master has not been + discovered yet or disconnected. + If not provided, uses the same value as `timeout`. + `local`:: If `true` returns the local node information and does not provide the state from master node. Default: `false`. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index 50d3bc85357..59a291888d0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.TimeUnit; public class ClusterHealthRequest extends MasterNodeReadRequest implements IndicesRequest.Replaceable { @@ -45,6 +46,11 @@ public class ClusterHealthRequest extends MasterNodeReadRequest PARSER = + new ConstructingObjectParser<>("cluster_health_response", true, + parsedObjects -> { + int i = 0; + // ClusterStateHealth fields + int numberOfNodes = (int) parsedObjects[i++]; + int numberOfDataNodes = (int) parsedObjects[i++]; + int activeShards = (int) parsedObjects[i++]; + int relocatingShards = (int) parsedObjects[i++]; + int activePrimaryShards = (int) parsedObjects[i++]; + int initializingShards = (int) parsedObjects[i++]; + int unassignedShards = (int) parsedObjects[i++]; + double activeShardsPercent = (double) parsedObjects[i++]; + String statusStr = (String) parsedObjects[i++]; + ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr); + @SuppressWarnings("unchecked") List indexList = (List) parsedObjects[i++]; + final Map indices; + if (indexList == null || indexList.isEmpty()) { + indices = emptyMap(); + } else { + indices = new HashMap<>(indexList.size()); + for (ClusterIndexHealth indexHealth : indexList) { + indices.put(indexHealth.getIndex(), indexHealth); + } + } + ClusterStateHealth stateHealth = new ClusterStateHealth(activePrimaryShards, activeShards, relocatingShards, + initializingShards, unassignedShards, numberOfNodes, numberOfDataNodes, activeShardsPercent, status, + indices); + + // ClusterHealthResponse fields + String clusterName = (String) parsedObjects[i++]; + int numberOfPendingTasks = (int) parsedObjects[i++]; + int numberOfInFlightFetch = (int) parsedObjects[i++]; + int delayedUnassignedShards = (int) parsedObjects[i++]; + long taskMaxWaitingTimeMillis = (long) parsedObjects[i++]; + boolean timedOut = (boolean) parsedObjects[i]; + return new ClusterHealthResponse(clusterName, numberOfPendingTasks, numberOfInFlightFetch, delayedUnassignedShards, + TimeValue.timeValueMillis(taskMaxWaitingTimeMillis), timedOut, stateHealth); + }); + + private static final ObjectParser.NamedObjectParser INDEX_PARSER = + (XContentParser parser, Void context, String index) -> ClusterIndexHealth.innerFromXContent(parser, index); + + static { + // ClusterStateHealth fields + PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_NODES)); + PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_DATA_NODES)); + PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS)); + PARSER.declareDouble(constructorArg(), new ParseField(ACTIVE_SHARDS_PERCENT_AS_NUMBER)); + PARSER.declareString(constructorArg(), new ParseField(STATUS)); + // Can be absent if LEVEL == 'cluster' + PARSER.declareNamedObjects(optionalConstructorArg(), INDEX_PARSER, new ParseField(INDICES)); + + // ClusterHealthResponse fields + PARSER.declareString(constructorArg(), new ParseField(CLUSTER_NAME)); + PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_PENDING_TASKS)); + PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_IN_FLIGHT_FETCH)); + PARSER.declareInt(constructorArg(), new ParseField(DELAYED_UNASSIGNED_SHARDS)); + PARSER.declareLong(constructorArg(), new ParseField(TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS)); + PARSER.declareBoolean(constructorArg(), new ParseField(TIMED_OUT)); + } + private String clusterName; private int numberOfPendingTasks = 0; private int numberOfInFlightFetch = 0; @@ -60,11 +156,23 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo this.numberOfPendingTasks = numberOfPendingTasks; this.numberOfInFlightFetch = numberOfInFlightFetch; this.delayedUnassignedShards = delayedUnassignedShards; + this.taskMaxWaitingTime = taskMaxWaitingTime; + this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices); + this.clusterHealthStatus = clusterStateHealth.getStatus(); + } + + /** + * For XContent Parser and serialization tests + */ + ClusterHealthResponse(String clusterName, int numberOfPendingTasks, int numberOfInFlightFetch, int delayedUnassignedShards, + TimeValue taskMaxWaitingTime, boolean timedOut, ClusterStateHealth clusterStateHealth) { this.clusterName = clusterName; this.numberOfPendingTasks = numberOfPendingTasks; this.numberOfInFlightFetch = numberOfInFlightFetch; + this.delayedUnassignedShards = delayedUnassignedShards; this.taskMaxWaitingTime = taskMaxWaitingTime; - this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices); + this.timedOut = timedOut; + this.clusterStateHealth = clusterStateHealth; this.clusterHealthStatus = clusterStateHealth.getStatus(); } @@ -210,25 +318,6 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo return isTimedOut() ? RestStatus.REQUEST_TIMEOUT : RestStatus.OK; } - private static final String CLUSTER_NAME = "cluster_name"; - private static final String STATUS = "status"; - private static final String TIMED_OUT = "timed_out"; - private static final String NUMBER_OF_NODES = "number_of_nodes"; - private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes"; - private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks"; - private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch"; - private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards"; - private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue"; - private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis"; - private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number"; - private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent"; - private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards"; - private static final String ACTIVE_SHARDS = "active_shards"; - private static final String RELOCATING_SHARDS = "relocating_shards"; - private static final String INITIALIZING_SHARDS = "initializing_shards"; - private static final String UNASSIGNED_SHARDS = "unassigned_shards"; - private static final String INDICES = "indices"; - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -254,13 +343,36 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo if (outputIndices) { builder.startObject(INDICES); for (ClusterIndexHealth indexHealth : clusterStateHealth.getIndices().values()) { - builder.startObject(indexHealth.getIndex()); indexHealth.toXContent(builder, params); - builder.endObject(); } builder.endObject(); } builder.endObject(); return builder; } + + public static ClusterHealthResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterHealthResponse that = (ClusterHealthResponse) o; + return Objects.equals(clusterName, that.clusterName) && + numberOfPendingTasks == that.numberOfPendingTasks && + numberOfInFlightFetch == that.numberOfInFlightFetch && + delayedUnassignedShards == that.delayedUnassignedShards && + Objects.equals(taskMaxWaitingTime, that.taskMaxWaitingTime) && + timedOut == that.timedOut && + Objects.equals(clusterStateHealth, that.clusterStateHealth) && + clusterHealthStatus == that.clusterHealthStatus; + } + + @Override + public int hashCode() { + return Objects.hash(clusterName, numberOfPendingTasks, numberOfInFlightFetch, delayedUnassignedShards, taskMaxWaitingTime, + timedOut, clusterStateHealth, clusterHealthStatus); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/health/ClusterIndexHealth.java b/server/src/main/java/org/elasticsearch/cluster/health/ClusterIndexHealth.java index 75c564c2038..c1a52f2ffc5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/health/ClusterIndexHealth.java +++ b/server/src/main/java/org/elasticsearch/cluster/health/ClusterIndexHealth.java @@ -22,19 +22,82 @@ package org.elasticsearch.cluster.health; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; public final class ClusterIndexHealth implements Iterable, Writeable, ToXContentFragment { + private static final String STATUS = "status"; + private static final String NUMBER_OF_SHARDS = "number_of_shards"; + private static final String NUMBER_OF_REPLICAS = "number_of_replicas"; + private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards"; + private static final String ACTIVE_SHARDS = "active_shards"; + private static final String RELOCATING_SHARDS = "relocating_shards"; + private static final String INITIALIZING_SHARDS = "initializing_shards"; + private static final String UNASSIGNED_SHARDS = "unassigned_shards"; + private static final String SHARDS = "shards"; + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("cluster_index_health", true, + (parsedObjects, index) -> { + int i = 0; + int numberOfShards = (int) parsedObjects[i++]; + int numberOfReplicas = (int) parsedObjects[i++]; + int activeShards = (int) parsedObjects[i++]; + int relocatingShards = (int) parsedObjects[i++]; + int initializingShards = (int) parsedObjects[i++]; + int unassignedShards = (int) parsedObjects[i++]; + int activePrimaryShards = (int) parsedObjects[i++]; + String statusStr = (String) parsedObjects[i++]; + ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr); + @SuppressWarnings("unchecked") List shardList = (List) parsedObjects[i]; + final Map shards; + if (shardList == null || shardList.isEmpty()) { + shards = emptyMap(); + } else { + shards = new HashMap<>(shardList.size()); + for (ClusterShardHealth shardHealth : shardList) { + shards.put(shardHealth.getShardId(), shardHealth); + } + } + return new ClusterIndexHealth(index, numberOfShards, numberOfReplicas, activeShards, relocatingShards, + initializingShards, unassignedShards, activePrimaryShards, status, shards); + }); + + public static final ObjectParser.NamedObjectParser SHARD_PARSER = + (XContentParser p, String indexIgnored, String shardId) -> ClusterShardHealth.innerFromXContent(p, Integer.valueOf(shardId)); + + static { + PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_REPLICAS)); + PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS)); + PARSER.declareString(constructorArg(), new ParseField(STATUS)); + // Can be absent if LEVEL == 'indices' or 'cluster' + PARSER.declareNamedObjects(optionalConstructorArg(), SHARD_PARSER, new ParseField(SHARDS)); + } private final String index; private final int numberOfShards; @@ -45,13 +108,14 @@ public final class ClusterIndexHealth implements Iterable, W private final int unassignedShards; private final int activePrimaryShards; private final ClusterHealthStatus status; - private final Map shards = new HashMap<>(); + private final Map shards; public ClusterIndexHealth(final IndexMetaData indexMetaData, final IndexRoutingTable indexRoutingTable) { this.index = indexMetaData.getIndex().getName(); this.numberOfShards = indexMetaData.getNumberOfShards(); this.numberOfReplicas = indexMetaData.getNumberOfReplicas(); + shards = new HashMap<>(); for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { int shardId = shardRoutingTable.shardId().id(); shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable)); @@ -104,12 +168,31 @@ public final class ClusterIndexHealth implements Iterable, W status = ClusterHealthStatus.fromValue(in.readByte()); int size = in.readVInt(); + shards = new HashMap<>(size); for (int i = 0; i < size; i++) { ClusterShardHealth shardHealth = new ClusterShardHealth(in); - shards.put(shardHealth.getId(), shardHealth); + shards.put(shardHealth.getShardId(), shardHealth); } } + /** + * For XContent Parser and serialization tests + */ + ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas, int activeShards, int relocatingShards, + int initializingShards, int unassignedShards, int activePrimaryShards, ClusterHealthStatus status, + Map shards) { + this.index = index; + this.numberOfShards = numberOfShards; + this.numberOfReplicas = numberOfReplicas; + this.activeShards = activeShards; + this.relocatingShards = relocatingShards; + this.initializingShards = initializingShards; + this.unassignedShards = unassignedShards; + this.activePrimaryShards = activePrimaryShards; + this.status = status; + this.shards = shards; + } + public String getIndex() { return index; } @@ -173,19 +256,9 @@ public final class ClusterIndexHealth implements Iterable, W } } - private static final String STATUS = "status"; - private static final String NUMBER_OF_SHARDS = "number_of_shards"; - private static final String NUMBER_OF_REPLICAS = "number_of_replicas"; - private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards"; - private static final String ACTIVE_SHARDS = "active_shards"; - private static final String RELOCATING_SHARDS = "relocating_shards"; - private static final String INITIALIZING_SHARDS = "initializing_shards"; - private static final String UNASSIGNED_SHARDS = "unassigned_shards"; - private static final String SHARDS = "shards"; - private static final String PRIMARY_ACTIVE = "primary_active"; - @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(getIndex()); builder.field(STATUS, getStatus().name().toLowerCase(Locale.ROOT)); builder.field(NUMBER_OF_SHARDS, getNumberOfShards()); builder.field(NUMBER_OF_REPLICAS, getNumberOfReplicas()); @@ -197,22 +270,65 @@ public final class ClusterIndexHealth implements Iterable, W if ("shards".equals(params.param("level", "indices"))) { builder.startObject(SHARDS); - for (ClusterShardHealth shardHealth : shards.values()) { - builder.startObject(Integer.toString(shardHealth.getId())); - - builder.field(STATUS, shardHealth.getStatus().name().toLowerCase(Locale.ROOT)); - builder.field(PRIMARY_ACTIVE, shardHealth.isPrimaryActive()); - builder.field(ACTIVE_SHARDS, shardHealth.getActiveShards()); - builder.field(RELOCATING_SHARDS, shardHealth.getRelocatingShards()); - builder.field(INITIALIZING_SHARDS, shardHealth.getInitializingShards()); - builder.field(UNASSIGNED_SHARDS, shardHealth.getUnassignedShards()); - - builder.endObject(); + shardHealth.toXContent(builder, params); } - builder.endObject(); } + builder.endObject(); return builder; } + + public static ClusterIndexHealth innerFromXContent(XContentParser parser, String index) { + return PARSER.apply(parser, index); + } + + public static ClusterIndexHealth fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + XContentParser.Token token = parser.nextToken(); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); + String index = parser.currentName(); + ClusterIndexHealth parsed = innerFromXContent(parser, index); + ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser::getTokenLocation); + return parsed; + } + + @Override + public String toString() { + return "ClusterIndexHealth{" + + "index='" + index + '\'' + + ", numberOfShards=" + numberOfShards + + ", numberOfReplicas=" + numberOfReplicas + + ", activeShards=" + activeShards + + ", relocatingShards=" + relocatingShards + + ", initializingShards=" + initializingShards + + ", unassignedShards=" + unassignedShards + + ", activePrimaryShards=" + activePrimaryShards + + ", status=" + status + + ", shards.size=" + (shards == null ? "null" : shards.size()) + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterIndexHealth that = (ClusterIndexHealth) o; + return Objects.equals(index, that.index) && + numberOfShards == that.numberOfShards && + numberOfReplicas == that.numberOfReplicas && + activeShards == that.activeShards && + relocatingShards == that.relocatingShards && + initializingShards == that.initializingShards && + unassignedShards == that.unassignedShards && + activePrimaryShards == that.activePrimaryShards && + status == that.status && + Objects.equals(shards, that.shards); + } + + @Override + public int hashCode() { + return Objects.hash(index, numberOfShards, numberOfReplicas, activeShards, relocatingShards, initializingShards, unassignedShards, + activePrimaryShards, status, shards); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/health/ClusterShardHealth.java b/server/src/main/java/org/elasticsearch/cluster/health/ClusterShardHealth.java index 12131b11f3f..1d3a3dcee7b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/health/ClusterShardHealth.java +++ b/server/src/main/java/org/elasticsearch/cluster/health/ClusterShardHealth.java @@ -24,13 +24,54 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Locale; +import java.util.Objects; -public final class ClusterShardHealth implements Writeable { +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +public final class ClusterShardHealth implements Writeable, ToXContentFragment { + private static final String STATUS = "status"; + private static final String ACTIVE_SHARDS = "active_shards"; + private static final String RELOCATING_SHARDS = "relocating_shards"; + private static final String INITIALIZING_SHARDS = "initializing_shards"; + private static final String UNASSIGNED_SHARDS = "unassigned_shards"; + private static final String PRIMARY_ACTIVE = "primary_active"; + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("cluster_shard_health", true, + (parsedObjects, shardId) -> { + int i = 0; + boolean primaryActive = (boolean) parsedObjects[i++]; + int activeShards = (int) parsedObjects[i++]; + int relocatingShards = (int) parsedObjects[i++]; + int initializingShards = (int) parsedObjects[i++]; + int unassignedShards = (int) parsedObjects[i++]; + String statusStr = (String) parsedObjects[i]; + ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr); + return new ClusterShardHealth(shardId, status, activeShards, relocatingShards, initializingShards, unassignedShards, + primaryActive); + }); + + static { + PARSER.declareBoolean(constructorArg(), new ParseField(PRIMARY_ACTIVE)); + PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS)); + PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS)); + PARSER.declareString(constructorArg(), new ParseField(STATUS)); + } private final int shardId; private final ClusterHealthStatus status; @@ -88,7 +129,21 @@ public final class ClusterShardHealth implements Writeable { primaryActive = in.readBoolean(); } - public int getId() { + /** + * For XContent Parser and serialization tests + */ + ClusterShardHealth(int shardId, ClusterHealthStatus status, int activeShards, int relocatingShards, int initializingShards, + int unassignedShards, boolean primaryActive) { + this.shardId = shardId; + this.status = status; + this.activeShards = activeShards; + this.relocatingShards = relocatingShards; + this.initializingShards = initializingShards; + this.unassignedShards = unassignedShards; + this.primaryActive = primaryActive; + } + + public int getShardId() { return shardId; } @@ -155,4 +210,54 @@ public final class ClusterShardHealth implements Writeable { } } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Integer.toString(getShardId())); + builder.field(STATUS, getStatus().name().toLowerCase(Locale.ROOT)); + builder.field(PRIMARY_ACTIVE, isPrimaryActive()); + builder.field(ACTIVE_SHARDS, getActiveShards()); + builder.field(RELOCATING_SHARDS, getRelocatingShards()); + builder.field(INITIALIZING_SHARDS, getInitializingShards()); + builder.field(UNASSIGNED_SHARDS, getUnassignedShards()); + builder.endObject(); + return builder; + } + + static ClusterShardHealth innerFromXContent(XContentParser parser, Integer shardId) { + return PARSER.apply(parser, shardId); + } + + public static ClusterShardHealth fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + XContentParser.Token token = parser.nextToken(); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); + String shardIdStr = parser.currentName(); + ClusterShardHealth parsed = innerFromXContent(parser, Integer.valueOf(shardIdStr)); + ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser::getTokenLocation); + return parsed; + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ClusterShardHealth)) return false; + ClusterShardHealth that = (ClusterShardHealth) o; + return shardId == that.shardId && + activeShards == that.activeShards && + relocatingShards == that.relocatingShards && + initializingShards == that.initializingShards && + unassignedShards == that.unassignedShards && + primaryActive == that.primaryActive && + status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(shardId, status, activeShards, relocatingShards, initializingShards, unassignedShards, primaryActive); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/health/ClusterStateHealth.java b/server/src/main/java/org/elasticsearch/cluster/health/ClusterStateHealth.java index 8aeb110c370..6d36f4dec1d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/health/ClusterStateHealth.java +++ b/server/src/main/java/org/elasticsearch/cluster/health/ClusterStateHealth.java @@ -33,6 +33,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; + public final class ClusterStateHealth implements Iterable, Writeable { @@ -45,7 +47,7 @@ public final class ClusterStateHealth implements Iterable, W private final int unassignedShards; private final double activeShardsPercent; private final ClusterHealthStatus status; - private final Map indices = new HashMap<>(); + private final Map indices; /** * Creates a new ClusterStateHealth instance considering the current cluster state and all indices in the cluster. @@ -65,7 +67,7 @@ public final class ClusterStateHealth implements Iterable, W public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) { numberOfNodes = clusterState.nodes().getSize(); numberOfDataNodes = clusterState.nodes().getDataNodes().size(); - + indices = new HashMap<>(); for (String index : concreteIndices) { IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); IndexMetaData indexMetaData = clusterState.metaData().index(index); @@ -134,6 +136,7 @@ public final class ClusterStateHealth implements Iterable, W numberOfDataNodes = in.readVInt(); status = ClusterHealthStatus.fromValue(in.readByte()); int size = in.readVInt(); + indices = new HashMap<>(size); for (int i = 0; i < size; i++) { ClusterIndexHealth indexHealth = new ClusterIndexHealth(in); indices.put(indexHealth.getIndex(), indexHealth); @@ -141,6 +144,24 @@ public final class ClusterStateHealth implements Iterable, W activeShardsPercent = in.readDouble(); } + /** + * For ClusterHealthResponse's XContent Parser + */ + public ClusterStateHealth(int activePrimaryShards, int activeShards, int relocatingShards, int initializingShards, int unassignedShards, + int numberOfNodes, int numberOfDataNodes, double activeShardsPercent, ClusterHealthStatus status, + Map indices) { + this.activePrimaryShards = activePrimaryShards; + this.activeShards = activeShards; + this.relocatingShards = relocatingShards; + this.initializingShards = initializingShards; + this.unassignedShards = unassignedShards; + this.numberOfNodes = numberOfNodes; + this.numberOfDataNodes = numberOfDataNodes; + this.activeShardsPercent = activeShardsPercent; + this.status = status; + this.indices = indices; + } + public int getActiveShards() { return activeShards; } @@ -202,4 +223,43 @@ public final class ClusterStateHealth implements Iterable, W } out.writeDouble(activeShardsPercent); } + + @Override + public String toString() { + return "ClusterStateHealth{" + + "numberOfNodes=" + numberOfNodes + + ", numberOfDataNodes=" + numberOfDataNodes + + ", activeShards=" + activeShards + + ", relocatingShards=" + relocatingShards + + ", activePrimaryShards=" + activePrimaryShards + + ", initializingShards=" + initializingShards + + ", unassignedShards=" + unassignedShards + + ", activeShardsPercent=" + activeShardsPercent + + ", status=" + status + + ", indices.size=" + (indices == null ? "null" : indices.size()) + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterStateHealth that = (ClusterStateHealth) o; + return numberOfNodes == that.numberOfNodes && + numberOfDataNodes == that.numberOfDataNodes && + activeShards == that.activeShards && + relocatingShards == that.relocatingShards && + activePrimaryShards == that.activePrimaryShards && + initializingShards == that.initializingShards && + unassignedShards == that.unassignedShards && + Double.compare(that.activeShardsPercent, activeShardsPercent) == 0 && + status == that.status && + Objects.equals(indices, that.indices); + } + + @Override + public int hashCode() { + return Objects.hash(numberOfNodes, numberOfDataNodes, activeShards, relocatingShards, activePrimaryShards, initializingShards, + unassignedShards, activeShardsPercent, status, indices); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponsesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponsesTests.java index d0d452df478..8c143881525 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponsesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponsesTests.java @@ -21,26 +21,38 @@ package org.elasticsearch.action.admin.cluster.health; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.cluster.health.ClusterIndexHealthTests; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.hamcrest.Matchers; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; -public class ClusterHealthResponsesTests extends ESTestCase { +public class ClusterHealthResponsesTests extends AbstractStreamableXContentTestCase { + private final ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.values()); - public void testIsTimeout() throws IOException { + public void testIsTimeout() { ClusterHealthResponse res = new ClusterHealthResponse(); for (int i = 0; i < 5; i++) { res.setTimedOut(randomBoolean()); @@ -89,4 +101,101 @@ public class ClusterHealthResponsesTests extends ESTestCase { } return clusterHealth; } + + @Override + protected ClusterHealthResponse doParseInstance(XContentParser parser) { + return ClusterHealthResponse.fromXContent(parser); + } + + @Override + protected ClusterHealthResponse createBlankInstance() { + return new ClusterHealthResponse(); + } + + @Override + protected ClusterHealthResponse createTestInstance() { + int indicesSize = randomInt(20); + Map indices = new HashMap<>(indicesSize); + if ("indices".equals(level) || "shards".equals(level)) { + for (int i = 0; i < indicesSize; i++) { + String indexName = randomAlphaOfLengthBetween(1, 5) + i; + indices.put(indexName, ClusterIndexHealthTests.randomIndexHealth(indexName, level)); + } + } + ClusterStateHealth stateHealth = new ClusterStateHealth(randomInt(100), randomInt(100), randomInt(100), + randomInt(100), randomInt(100), randomInt(100), randomInt(100), + randomDoubleBetween(0d, 100d, true), randomFrom(ClusterHealthStatus.values()), indices); + + return new ClusterHealthResponse(randomAlphaOfLengthBetween(1, 10), randomInt(100), randomInt(100), randomInt(100), + TimeValue.timeValueMillis(randomInt(10000)), randomBoolean(), stateHealth); + } + + @Override + protected ToXContent.Params getToXContentParams() { + return new ToXContent.MapParams(Collections.singletonMap("level", level.name().toLowerCase(Locale.ROOT))); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + // Ignore all paths which looks like "indices.RANDOMINDEXNAME.shards" + private static final Pattern SHARDS_IN_XCONTENT = Pattern.compile("^indices\\.\\w+\\.shards$"); + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return field -> "indices".equals(field) || SHARDS_IN_XCONTENT.matcher(field).find(); + } + + @Override + protected ClusterHealthResponse mutateInstance(ClusterHealthResponse instance) { + String mutate = randomFrom("clusterName", "numberOfPendingTasks","numberOfInFlightFetch", "delayedUnassignedShards", + "taskMaxWaitingTime", "timedOut", "clusterStateHealth"); + switch (mutate) { + case "clusterName": + return new ClusterHealthResponse(instance.getClusterName() + randomAlphaOfLengthBetween(2, 5), + instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(), + instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(), + instance.isTimedOut(), instance.getClusterStateHealth()); + case "numberOfPendingTasks": + return new ClusterHealthResponse(instance.getClusterName(), + instance.getNumberOfPendingTasks() + between(1, 10), instance.getNumberOfInFlightFetch(), + instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(), + instance.isTimedOut(), instance.getClusterStateHealth()); + case "numberOfInFlightFetch": + return new ClusterHealthResponse(instance.getClusterName(), + instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch() + between(1, 10), + instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(), + instance.isTimedOut(), instance.getClusterStateHealth()); + case "delayedUnassignedShards": + return new ClusterHealthResponse(instance.getClusterName(), + instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(), + instance.getDelayedUnassignedShards() + between(1, 10), instance.getTaskMaxWaitingTime(), + instance.isTimedOut(), instance.getClusterStateHealth()); + case "taskMaxWaitingTime": + + return new ClusterHealthResponse(instance.getClusterName(), + instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(), + instance.getDelayedUnassignedShards(), new TimeValue(instance.getTaskMaxWaitingTime().millis() + between(1, 10)), + instance.isTimedOut(), instance.getClusterStateHealth()); + case "timedOut": + return new ClusterHealthResponse(instance.getClusterName(), + instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(), + instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(), + instance.isTimedOut() == false, instance.getClusterStateHealth()); + case "clusterStateHealth": + ClusterStateHealth state = instance.getClusterStateHealth(); + ClusterStateHealth newState = new ClusterStateHealth(state.getActivePrimaryShards() + between(1, 10), + state.getActiveShards(), state.getRelocatingShards(), state.getInitializingShards(), state.getUnassignedShards(), + state.getNumberOfNodes(), state.getNumberOfDataNodes(), state.getActiveShardsPercent(), state.getStatus(), + state.getIndices()); + return new ClusterHealthResponse(instance.getClusterName(), + instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(), + instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(), + instance.isTimedOut(), newState); + default: + throw new UnsupportedOperationException(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterIndexHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterIndexHealthTests.java index 215f28f7275..851ab63297a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/health/ClusterIndexHealthTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterIndexHealthTests.java @@ -19,29 +19,45 @@ package org.elasticsearch.cluster.health; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTableGenerator; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.equalTo; -public class ClusterIndexHealthTests extends ESTestCase { +public class ClusterIndexHealthTests extends AbstractSerializingTestCase { + private final ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.SHARDS, ClusterHealthRequest.Level.INDICES); + public void testClusterIndexHealth() { RoutingTableGenerator routingTableGenerator = new RoutingTableGenerator(); int numberOfShards = randomInt(3) + 1; int numberOfReplicas = randomInt(4); - IndexMetaData indexMetaData = IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas).build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test1").settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas).build(); RoutingTableGenerator.ShardCounter counter = new RoutingTableGenerator.ShardCounter(); IndexRoutingTable indexRoutingTable = routingTableGenerator.genIndexRoutingTable(indexMetaData, counter); ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetaData, indexRoutingTable); - logger.info("index status: {}, expected {}", indexHealth.getStatus(), counter.status()); assertIndexHealth(indexHealth, counter, indexMetaData); } - - private void assertIndexHealth(ClusterIndexHealth indexHealth, RoutingTableGenerator.ShardCounter counter, IndexMetaData indexMetaData) { + private void assertIndexHealth(ClusterIndexHealth indexHealth, RoutingTableGenerator.ShardCounter counter, + IndexMetaData indexMetaData) { assertThat(indexHealth.getStatus(), equalTo(counter.status())); assertThat(indexHealth.getNumberOfShards(), equalTo(indexMetaData.getNumberOfShards())); assertThat(indexHealth.getNumberOfReplicas(), equalTo(indexMetaData.getNumberOfReplicas())); @@ -57,4 +73,119 @@ public class ClusterIndexHealthTests extends ESTestCase { assertThat(totalShards, equalTo(indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas()))); } + + @Override + protected ClusterIndexHealth createTestInstance() { + return randomIndexHealth(randomAlphaOfLengthBetween(1, 10), level); + } + + public static ClusterIndexHealth randomIndexHealth(String indexName, ClusterHealthRequest.Level level) { + Map shards = new HashMap<>(); + if (level == ClusterHealthRequest.Level.SHARDS) { + for (int i = 0; i < randomInt(5); i++) { + shards.put(i, ClusterShardHealthTests.randomShardHealth(i)); + } + } + return new ClusterIndexHealth(indexName, randomInt(1000), randomInt(1000), randomInt(1000), randomInt(1000), + randomInt(1000), randomInt(1000), randomInt(1000), randomFrom(ClusterHealthStatus.values()), shards); + } + + @Override + protected Writeable.Reader instanceReader() { + return ClusterIndexHealth::new; + } + + @Override + protected ClusterIndexHealth doParseInstance(XContentParser parser) throws IOException { + return ClusterIndexHealth.fromXContent(parser); + } + + @Override + protected ToXContent.Params getToXContentParams() { + return new ToXContent.MapParams(Collections.singletonMap("level", level.name().toLowerCase(Locale.ROOT))); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + // Ignore all paths which looks like "RANDOMINDEXNAME.shards" + private static final Pattern SHARDS_IN_XCONTENT = Pattern.compile("^\\w+\\.shards$"); + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return field -> "".equals(field) || SHARDS_IN_XCONTENT.matcher(field).find(); + } + @Override + protected ClusterIndexHealth mutateInstance(ClusterIndexHealth instance) throws IOException { + String mutate = randomFrom("index", "numberOfShards", "numberOfReplicas", "activeShards", "relocatingShards", + "initializingShards", "unassignedShards", "activePrimaryShards", "status", "shards"); + switch (mutate) { + case "index": + return new ClusterIndexHealth(instance.getIndex() + randomAlphaOfLengthBetween(2, 5), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "numberOfShards": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards() + between(1, 10), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "numberOfReplicas": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas() + between(1, 10), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "activeShards": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards() + between(1, 10), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "relocatingShards": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards() + between(1, 10), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "initializingShards": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards() + between(1, 10), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "unassignedShards": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards() + between(1, 10), + instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards()); + case "activePrimaryShards": + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards() + between(1, 10), instance.getStatus(), instance.getShards()); + case "status": + ClusterHealthStatus status = randomFrom( + Arrays.stream(ClusterHealthStatus.values()).filter( + value -> !value.equals(instance.getStatus()) + ).collect(Collectors.toList()) + ); + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), status, instance.getShards()); + case "shards": + Map map; + if (instance.getShards().isEmpty()) { + map = Collections.singletonMap(0, ClusterShardHealthTests.randomShardHealth(0)); + } else { + map = new HashMap<>(instance.getShards()); + map.remove(map.keySet().iterator().next()); + } + return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(), + instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.getActivePrimaryShards(), instance.getStatus(), map); + default: + throw new UnsupportedOperationException(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterShardHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterShardHealthTests.java new file mode 100644 index 00000000000..6ee0fc1ee0a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterShardHealthTests.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.health; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class ClusterShardHealthTests extends AbstractSerializingTestCase { + + @Override + protected ClusterShardHealth doParseInstance(XContentParser parser) throws IOException { + return ClusterShardHealth.fromXContent(parser); + } + + @Override + protected ClusterShardHealth createTestInstance() { + return randomShardHealth(randomInt(1000)); + } + + static ClusterShardHealth randomShardHealth(int id) { + return new ClusterShardHealth(id, randomFrom(ClusterHealthStatus.values()), randomInt(1000), randomInt(1000), + randomInt(1000), randomInt(1000), randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return ClusterShardHealth::new; + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + //don't inject random fields at the root, which contains arbitrary shard ids + return ""::equals; + } + + @Override + protected ClusterShardHealth mutateInstance(final ClusterShardHealth instance) { + String mutate = randomFrom("shardId", "status", "activeShards", "relocatingShards", "initializingShards", + "unassignedShards", "primaryActive"); + switch (mutate) { + case "shardId": + return new ClusterShardHealth(instance.getShardId() + between(1, 10), instance.getStatus(), + instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.isPrimaryActive()); + case "status": + ClusterHealthStatus status = randomFrom( + Arrays.stream(ClusterHealthStatus.values()).filter( + value -> !value.equals(instance.getStatus()) + ).collect(Collectors.toList()) + ); + return new ClusterShardHealth(instance.getShardId(), status, + instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.isPrimaryActive()); + case "activeShards": + return new ClusterShardHealth(instance.getShardId(), instance.getStatus(), + instance.getActiveShards() + between(1, 10), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.isPrimaryActive()); + case "relocatingShards": + return new ClusterShardHealth(instance.getShardId(), instance.getStatus(), + instance.getActiveShards(), instance.getRelocatingShards() + between(1, 10), + instance.getInitializingShards(), instance.getUnassignedShards(), instance.isPrimaryActive()); + case "initializingShards": + return new ClusterShardHealth(instance.getShardId(), instance.getStatus(), + instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards() + between(1, 10), instance.getUnassignedShards(), + instance.isPrimaryActive()); + case "unassignedShards": + return new ClusterShardHealth(instance.getShardId(), instance.getStatus(), + instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards() + between(1, 10), + instance.isPrimaryActive()); + case "primaryActive": + return new ClusterShardHealth(instance.getShardId(), instance.getStatus(), + instance.getActiveShards(), instance.getRelocatingShards(), + instance.getInitializingShards(), instance.getUnassignedShards(), + instance.isPrimaryActive() == false); + default: + throw new UnsupportedOperationException(); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java index 3233edefb30..56b92bb1e25 100644 --- a/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractXContentTestCase; @@ -96,7 +97,7 @@ public class CancelTasksResponseTests extends AbstractXContentTestCase extends EST createParserFunction, CheckedFunction parseFunction, BiConsumer assertEqualsConsumer, - boolean assertToXContentEquivalence) throws IOException { + boolean assertToXContentEquivalence, + ToXContent.Params toXContentParams) throws IOException { for (int runs = 0; runs < numberOfTestRuns; runs++) { T testInstance = instanceSupplier.get(); XContentType xContentType = randomFrom(XContentType.values()); - BytesReference shuffled = toShuffledXContent(testInstance, xContentType, ToXContent.EMPTY_PARAMS, false, createParserFunction, - shuffleFieldsExceptions); + BytesReference shuffled = toShuffledXContent(testInstance, xContentType, toXContentParams,false, + createParserFunction, shuffleFieldsExceptions); BytesReference withRandomFields; if (supportsUnknownFields) { // we add a few random fields to check that parser is lenient on new fields @@ -65,7 +67,8 @@ public abstract class AbstractXContentTestCase extends EST T parsed = parseFunction.apply(parser); assertEqualsConsumer.accept(testInstance, parsed); if (assertToXContentEquivalence) { - assertToXContentEquivalent(shuffled, XContentHelper.toXContent(parsed, xContentType, false), xContentType); + assertToXContentEquivalent(shuffled, XContentHelper.toXContent(parsed, xContentType, toXContentParams, false), + xContentType); } } } @@ -77,7 +80,7 @@ public abstract class AbstractXContentTestCase extends EST public final void testFromXContent() throws IOException { testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(), getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::parseInstance, this::assertEqualInstances, - assertToXContentEquivalence()); + assertToXContentEquivalence(), getToXContentParams()); } /** @@ -127,4 +130,11 @@ public abstract class AbstractXContentTestCase extends EST protected String[] getShuffleFieldsExceptions() { return Strings.EMPTY_ARRAY; } + + /** + * Params that have to be provided when calling calling {@link ToXContent#toXContent(XContentBuilder, ToXContent.Params)} + */ + protected ToXContent.Params getToXContentParams() { + return ToXContent.EMPTY_PARAMS; + } }