REST high-level client: add Cluster Health API (#29331)

Relates to #27205
This commit is contained in:
Van0SS 2018-06-12 07:34:06 -04:00 committed by Luca Cavanna
parent 5f84e18c72
commit d5e8a5cd69
21 changed files with 1588 additions and 69 deletions

View File

@ -21,12 +21,16 @@ package org.elasticsearch.client;
import org.apache.http.Header; import org.apache.http.Header;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
/** /**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API. * A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API.
@ -95,4 +99,34 @@ public final class ClusterClient {
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings, restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings,
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers); ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
} }
/**
* Get cluster health using the Cluster Health API.
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* <p>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
* @param healthRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public ClusterHealthResponse health(ClusterHealthRequest healthRequest, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(healthRequest, RequestConverters::clusterHealth, options,
ClusterHealthResponse::fromXContent, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
}
/**
* Asynchronously get cluster health using the Cluster Health API.
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
* @param healthRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void healthAsync(ClusterHealthRequest healthRequest, RequestOptions options, ActionListener<ClusterHealthResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, RequestConverters::clusterHealth, options,
ClusterHealthResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
@ -74,7 +75,9 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -706,6 +709,28 @@ final class RequestConverters {
return request; return request;
} }
static Request clusterHealth(ClusterHealthRequest healthRequest) {
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_cluster/health")
.addCommaSeparatedPathParts(indices)
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
new Params(request)
.withWaitForStatus(healthRequest.waitForStatus())
.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards())
.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards())
.withWaitForActiveShards(healthRequest.waitForActiveShards())
.withWaitForNodes(healthRequest.waitForNodes())
.withWaitForEvents(healthRequest.waitForEvents())
.withTimeout(healthRequest.timeout())
.withMasterTimeout(healthRequest.masterNodeTimeout())
.withLocal(healthRequest.local())
.withLevel(healthRequest.level());
return request;
}
static Request rollover(RolloverRequest rolloverRequest) throws IOException { static Request rollover(RolloverRequest rolloverRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build(); .addPathPart(rolloverRequest.getNewIndexName()).build();
@ -1124,6 +1149,42 @@ final class RequestConverters {
} }
return this; return this;
} }
Params withWaitForStatus(ClusterHealthStatus status) {
if (status != null) {
return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT));
}
return this;
}
Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) {
if (waitNoRelocatingShards) {
return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString());
}
return this;
}
Params withWaitForNoInitializingShards(boolean waitNoInitShards) {
if (waitNoInitShards) {
return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString());
}
return this;
}
Params withWaitForNodes(String waitForNodes) {
return putParam("wait_for_nodes", waitForNodes);
}
Params withLevel(ClusterHealthRequest.Level level) {
return putParam("level", level.name().toLowerCase(Locale.ROOT));
}
Params withWaitForEvents(Priority waitForEvents) {
if (waitForEvents != null) {
return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT));
}
return this;
}
} }
/** /**

View File

@ -20,8 +20,13 @@
package org.elasticsearch.client; package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -34,6 +39,7 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -108,4 +114,136 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
assertThat(exception.getMessage(), equalTo( assertThat(exception.getMessage(), equalTo(
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]")); "Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
} }
public void testClusterHealthGreen() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertNoIndices(response);
}
public void testClusterHealthYellowClusterLevel() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.CLUSTER);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
assertYellowShards(response);
assertThat(response.getIndices().size(), equalTo(0));
}
public void testClusterHealthYellowIndicesLevel() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.INDICES);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
assertYellowShards(response);
assertThat(response.getIndices().size(), equalTo(2));
for (Map.Entry<String, ClusterIndexHealth> entry : response.getIndices().entrySet()) {
assertYellowIndex(entry.getKey(), entry.getValue(), true);
}
}
private static void assertYellowShards(ClusterHealthResponse response) {
assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(2));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(2));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(2));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
}
public void testClusterHealthYellowSpecificIndex() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest("index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(1));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(1));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(1));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
assertThat(response.getIndices().size(), equalTo(1));
Map.Entry<String, ClusterIndexHealth> index = response.getIndices().entrySet().iterator().next();
assertYellowIndex(index.getKey(), index.getValue(), false);
}
private static void assertYellowIndex(String indexName, ClusterIndexHealth indexHealth, boolean emptyShards) {
assertThat(indexHealth, notNullValue());
assertThat(indexHealth.getIndex(),equalTo(indexName));
assertThat(indexHealth.getActivePrimaryShards(),equalTo(1));
assertThat(indexHealth.getActiveShards(),equalTo(1));
assertThat(indexHealth.getNumberOfReplicas(),equalTo(1));
assertThat(indexHealth.getInitializingShards(),equalTo(0));
assertThat(indexHealth.getUnassignedShards(),equalTo(1));
assertThat(indexHealth.getRelocatingShards(),equalTo(0));
assertThat(indexHealth.getStatus(),equalTo(ClusterHealthStatus.YELLOW));
if (emptyShards) {
assertThat(indexHealth.getShards().size(), equalTo(0));
} else {
assertThat(indexHealth.getShards().size(), equalTo(1));
for (Map.Entry<Integer, ClusterShardHealth> entry : indexHealth.getShards().entrySet()) {
assertYellowShard(entry.getKey(), entry.getValue());
}
}
}
private static void assertYellowShard(int shardId, ClusterShardHealth shardHealth) {
assertThat(shardHealth, notNullValue());
assertThat(shardHealth.getShardId(), equalTo(shardId));
assertThat(shardHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(shardHealth.getActiveShards(), equalTo(1));
assertThat(shardHealth.getInitializingShards(), equalTo(0));
assertThat(shardHealth.getUnassignedShards(), equalTo(1));
assertThat(shardHealth.getRelocatingShards(), equalTo(0));
}
public void testClusterHealthNotFoundIndex() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(true));
assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
assertNoIndices(response);
}
private static void assertNoIndices(ClusterHealthResponse response) {
assertThat(response.getIndices(), equalTo(emptyMap()));
assertThat(response.getActivePrimaryShards(), equalTo(0));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(0));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(0));
assertThat(response.getActiveShardsPercent(), equalTo(100d));
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
@ -83,8 +84,10 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestConverters.EndpointBuilder; import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -125,6 +128,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects; import org.elasticsearch.test.RandomObjects;
import org.hamcrest.CoreMatchers;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -1526,6 +1530,85 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters()); assertEquals(expectedParams, expectedRequest.getParameters());
} }
public void testClusterHealth() {
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
Map<String, String> expectedParams = new HashMap<>();
setRandomLocal(healthRequest, expectedParams);
String timeoutType = randomFrom("timeout", "masterTimeout", "both", "none");
String timeout = randomTimeValue();
String masterTimeout = randomTimeValue();
switch (timeoutType) {
case "timeout":
healthRequest.timeout(timeout);
expectedParams.put("timeout", timeout);
// If Master Timeout wasn't set it uses the same value as Timeout
expectedParams.put("master_timeout", timeout);
break;
case "masterTimeout":
expectedParams.put("timeout", "30s");
healthRequest.masterNodeTimeout(masterTimeout);
expectedParams.put("master_timeout", masterTimeout);
break;
case "both":
healthRequest.timeout(timeout);
expectedParams.put("timeout", timeout);
healthRequest.masterNodeTimeout(timeout);
expectedParams.put("master_timeout", timeout);
break;
case "none":
expectedParams.put("timeout", "30s");
expectedParams.put("master_timeout", "30s");
break;
default:
throw new UnsupportedOperationException();
}
setRandomWaitForActiveShards(healthRequest::waitForActiveShards, expectedParams, "0");
if (randomBoolean()) {
ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.values());
healthRequest.level(level);
expectedParams.put("level", level.name().toLowerCase(Locale.ROOT));
} else {
expectedParams.put("level", "shards");
}
if (randomBoolean()) {
Priority priority = randomFrom(Priority.values());
healthRequest.waitForEvents(priority);
expectedParams.put("wait_for_events", priority.name().toLowerCase(Locale.ROOT));
}
if (randomBoolean()) {
ClusterHealthStatus status = randomFrom(ClusterHealthStatus.values());
healthRequest.waitForStatus(status);
expectedParams.put("wait_for_status", status.name().toLowerCase(Locale.ROOT));
}
if (randomBoolean()) {
boolean waitForNoInitializingShards = randomBoolean();
healthRequest.waitForNoInitializingShards(waitForNoInitializingShards);
if (waitForNoInitializingShards) {
expectedParams.put("wait_for_no_initializing_shards", Boolean.TRUE.toString());
}
}
if (randomBoolean()) {
boolean waitForNoRelocatingShards = randomBoolean();
healthRequest.waitForNoRelocatingShards(waitForNoRelocatingShards);
if (waitForNoRelocatingShards) {
expectedParams.put("wait_for_no_relocating_shards", Boolean.TRUE.toString());
}
}
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
healthRequest.indices(indices);
Request request = RequestConverters.clusterHealth(healthRequest);
assertThat(request, CoreMatchers.notNullValue());
assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME));
assertThat(request.getEntity(), nullValue());
if (indices != null && indices.length > 0) {
assertThat(request.getEndpoint(), equalTo("/_cluster/health/" + String.join(",", indices)));
} else {
assertThat(request.getEndpoint(), equalTo("/_cluster/health"));
}
assertThat(request.getParameters(), equalTo(expectedParams));
}
public void testRollover() throws IOException { public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10), RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10)); randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
@ -2104,6 +2187,11 @@ public class RequestConvertersTests extends ESTestCase {
} }
private static void setRandomWaitForActiveShards(Consumer<ActiveShardCount> setter, Map<String, String> expectedParams) { private static void setRandomWaitForActiveShards(Consumer<ActiveShardCount> setter, Map<String, String> expectedParams) {
setRandomWaitForActiveShards(setter, expectedParams, null);
}
private static void setRandomWaitForActiveShards(Consumer<ActiveShardCount> setter,Map<String, String> expectedParams,
String defaultValue) {
if (randomBoolean()) { if (randomBoolean()) {
String waitForActiveShardsString; String waitForActiveShardsString;
int waitForActiveShards = randomIntBetween(-1, 5); int waitForActiveShards = randomIntBetween(-1, 5);
@ -2114,6 +2202,8 @@ public class RequestConvertersTests extends ESTestCase {
} }
setter.accept(ActiveShardCount.parseString(waitForActiveShardsString)); setter.accept(ActiveShardCount.parseString(waitForActiveShardsString));
expectedParams.put("wait_for_active_shards", waitForActiveShardsString); expectedParams.put("wait_for_active_shards", waitForActiveShardsString);
} else if (defaultValue != null) {
expectedParams.put("wait_for_active_shards", defaultValue);
} }
} }

View File

@ -21,17 +21,26 @@ package org.elasticsearch.client.documentation;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -40,6 +49,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
/** /**
* This class is used to generate the Java Cluster API documentation. * This class is used to generate the Java Cluster API documentation.
@ -179,4 +189,174 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
} }
} }
public void testClusterHealth() throws IOException {
RestHighLevelClient client = highLevelClient();
client.indices().create(new CreateIndexRequest("index"), RequestOptions.DEFAULT);
{
// tag::health-request
ClusterHealthRequest request = new ClusterHealthRequest();
// end::health-request
}
{
// tag::health-request-indices-ctr
ClusterHealthRequest request = new ClusterHealthRequest("index1", "index2");
// end::health-request-indices-ctr
}
{
// tag::health-request-indices-setter
ClusterHealthRequest request = new ClusterHealthRequest();
request.indices("index1", "index2");
// end::health-request-indices-setter
}
ClusterHealthRequest request = new ClusterHealthRequest();
// tag::health-request-timeout
request.timeout(TimeValue.timeValueSeconds(50)); // <1>
request.timeout("50s"); // <2>
// end::health-request-timeout
// tag::health-request-master-timeout
request.masterNodeTimeout(TimeValue.timeValueSeconds(20)); // <1>
request.masterNodeTimeout("20s"); // <2>
// end::health-request-master-timeout
// tag::health-request-wait-status
request.waitForStatus(ClusterHealthStatus.YELLOW); // <1>
request.waitForYellowStatus(); // <2>
// end::health-request-wait-status
// tag::health-request-wait-events
request.waitForEvents(Priority.NORMAL); // <1>
// end::health-request-wait-events
// tag::health-request-level
request.level(ClusterHealthRequest.Level.SHARDS); // <1>
// end::health-request-level
// tag::health-request-wait-relocation
request.waitForNoRelocatingShards(true); // <1>
// end::health-request-wait-relocation
// tag::health-request-wait-initializing
request.waitForNoInitializingShards(true); // <1>
// end::health-request-wait-initializing
// tag::health-request-wait-nodes
request.waitForNodes("2"); // <1>
request.waitForNodes(">=2"); // <2>
request.waitForNodes("le(2)"); // <3>
// end::health-request-wait-nodes
// tag::health-request-wait-active
request.waitForActiveShards(ActiveShardCount.ALL); // <1>
request.waitForActiveShards(1); // <2>
// end::health-request-wait-active
// tag::health-request-local
request.local(true); // <1>
// end::health-request-local
// tag::health-execute
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
// end::health-execute
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response, notNullValue());
// tag::health-response-general
String clusterName = response.getClusterName(); // <1>
ClusterHealthStatus status = response.getStatus(); // <2>
// end::health-response-general
// tag::health-response-request-status
boolean timedOut = response.isTimedOut(); // <1>
RestStatus restStatus = response.status(); // <2>
// end::health-response-request-status
// tag::health-response-nodes
int numberOfNodes = response.getNumberOfNodes(); // <1>
int numberOfDataNodes = response.getNumberOfDataNodes(); // <2>
// end::health-response-nodes
{
// tag::health-response-shards
int activeShards = response.getActiveShards(); // <1>
int activePrimaryShards = response.getActivePrimaryShards(); // <2>
int relocatingShards = response.getRelocatingShards(); // <3>
int initializingShards = response.getInitializingShards(); // <4>
int unassignedShards = response.getUnassignedShards(); // <5>
int delayedUnassignedShards = response.getDelayedUnassignedShards(); // <6>
double activeShardsPercent = response.getActiveShardsPercent(); // <7>
// end::health-response-shards
}
// tag::health-response-task
TimeValue taskMaxWaitingTime = response.getTaskMaxWaitingTime(); // <1>
int numberOfPendingTasks = response.getNumberOfPendingTasks(); // <2>
int numberOfInFlightFetch = response.getNumberOfInFlightFetch(); // <3>
// end::health-response-task
// tag::health-response-indices
Map<String, ClusterIndexHealth> indices = response.getIndices(); // <1>
// end::health-response-indices
{
// tag::health-response-index
ClusterIndexHealth index = indices.get("index"); // <1>
ClusterHealthStatus indexStatus = index.getStatus();
int numberOfShards = index.getNumberOfShards();
int numberOfReplicas = index.getNumberOfReplicas();
int activeShards = index.getActiveShards();
int activePrimaryShards = index.getActivePrimaryShards();
int initializingShards = index.getInitializingShards();
int relocatingShards = index.getRelocatingShards();
int unassignedShards = index.getUnassignedShards();
// end::health-response-index
// tag::health-response-shard-details
Map<Integer, ClusterShardHealth> shards = index.getShards(); // <1>
ClusterShardHealth shardHealth = shards.get(0);
int shardId = shardHealth.getShardId();
ClusterHealthStatus shardStatus = shardHealth.getStatus();
int active = shardHealth.getActiveShards();
int initializing = shardHealth.getInitializingShards();
int unassigned = shardHealth.getUnassignedShards();
int relocating = shardHealth.getRelocatingShards();
boolean primaryActive = shardHealth.isPrimaryActive();
// end::health-response-shard-details
}
}
public void testClusterHealthAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
ClusterHealthRequest request = new ClusterHealthRequest();
// tag::health-execute-listener
ActionListener<ClusterHealthResponse> listener =
new ActionListener<ClusterHealthResponse>() {
@Override
public void onResponse(ClusterHealthResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::health-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::health-execute-async
client.cluster().healthAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::health-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
} }

View File

@ -0,0 +1,205 @@
[[java-rest-high-cluster-health]]
=== Cluster Health API
The Cluster Health API allows getting cluster health.
[[java-rest-high-cluster-health-request]]
==== Cluster Health Request
A `ClusterHealthRequest`:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request]
--------------------------------------------------
There are no required parameters. By default, the client will check all indices and will not wait
for any events.
==== Indices
Indices which should be checked can be passed in the constructor:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-indices-ctr]
--------------------------------------------------
Or using the corresponding setter method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-indices-setter]
--------------------------------------------------
==== Other parameters
Other parameters can be passed only through setter methods:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-timeout]
--------------------------------------------------
<1> Timeout for the request as a `TimeValue`. Defaults to 30 seconds
<2> As a `String`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-master-timeout]
--------------------------------------------------
<1> Timeout to connect to the master node as a `TimeValue`. Defaults to the same as `timeout`
<2> As a `String`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-status]
--------------------------------------------------
<1> The status to wait (e.g. `green`, `yellow`, or `red`). Accepts a `ClusterHealthStatus` value.
<2> Using predefined method
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-events]
--------------------------------------------------
<1> The priority of the events to wait for. Accepts a `Priority` value.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level]
--------------------------------------------------
<1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-relocation]
--------------------------------------------------
<1> Wait for 0 relocating shards. Defaults to `false`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-initializing]
--------------------------------------------------
<1> Wait for 0 initializing shards. Defaults to `false`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-nodes]
--------------------------------------------------
<1> Wait for `N` nodes in the cluster. Defaults to `0`
<2> Using `>=N`, `<=N`, `>N` and `<N` notation
<3> Using `ge(N)`, `le(N)`, `gt(N)`, `lt(N)` notation
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wait-active]
--------------------------------------------------
<1> Wait for all shards to be active in the cluster
<2> Wait for `N` shards to be active in the cluster
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-local]
--------------------------------------------------
<1> Non-master node can be used for this request. Defaults to `false`
[[java-rest-high-cluster-health-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-execute]
--------------------------------------------------
[[java-rest-high-cluster-health-async]]
==== Asynchronous Execution
The asynchronous execution of a cluster health request requires both the
`ClusterHealthRequest` instance and an `ActionListener` instance to be
passed to the asynchronous method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-execute-async]
--------------------------------------------------
<1> The `ClusterHealthRequest` to execute and the `ActionListener` to use
when the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `ClusterHealthResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of a failure. The raised exception is provided as an argument
[[java-rest-high-cluster-health-response]]
==== Cluster Health Response
The returned `ClusterHealthResponse` contains the next information about the
cluster:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-general]
--------------------------------------------------
<1> Name of the cluster
<2> Cluster status (`green`, `yellow` or `red`)
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-request-status]
--------------------------------------------------
<1> Whether request was timed out while processing
<2> Status of the request (`OK` or `REQUEST_TIMEOUT`). Other errors will be thrown as exceptions
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-nodes]
--------------------------------------------------
<1> Number of nodes in the cluster
<2> Number of data nodes in the cluster
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-shards]
--------------------------------------------------
<1> Number of active shards
<2> Number of primary active shards
<3> Number of relocating shards
<4> Number of initializing shards
<5> Number of unassigned shards
<6> Number of unassigned shards that are currently being delayed
<7> Percent of active shards
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-task]
--------------------------------------------------
<1> Maximum wait time of all tasks in the queue
<2> Number of currently pending tasks
<3> Number of async fetches that are currently ongoing
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-indices]
--------------------------------------------------
<1> Detailed information about indices in the cluster
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-index]
--------------------------------------------------
<1> Detailed information about a specific index
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-response-shard-details]
--------------------------------------------------
<1> Detailed information about a specific shard

View File

@ -110,8 +110,10 @@ include::indices/get_templates.asciidoc[]
The Java High Level REST Client supports the following Cluster APIs: The Java High Level REST Client supports the following Cluster APIs:
* <<java-rest-high-cluster-put-settings>> * <<java-rest-high-cluster-put-settings>>
* <<java-rest-high-cluster-health>>
include::cluster/put_settings.asciidoc[] include::cluster/put_settings.asciidoc[]
include::cluster/health.asciidoc[]
== Ingest APIs == Ingest APIs
The Java High Level REST Client supports the following Ingest APIs: The Java High Level REST Client supports the following Ingest APIs:

View File

@ -104,10 +104,19 @@ The cluster health API accepts the following request parameters:
Alternatively, it is possible to use `ge(N)`, `le(N)`, `gt(N)` and Alternatively, it is possible to use `ge(N)`, `le(N)`, `gt(N)` and
`lt(N)` notation. `lt(N)` notation.
`wait_for_events`::
Can be one of `immediate`, `urgent`, `high`, `normal`, `low`, `languid`.
Wait until all currently queued events with the given priority are processed.
`timeout`:: `timeout`::
A time based parameter controlling how long to wait if one of A time based parameter controlling how long to wait if one of
the wait_for_XXX are provided. Defaults to `30s`. the wait_for_XXX are provided. Defaults to `30s`.
`master_timeout`::
A time based parameter controlling how long to wait if the master has not been
discovered yet or disconnected.
If not provided, uses the same value as `timeout`.
`local`:: `local`::
If `true` returns the local node information and does not provide If `true` returns the local node information and does not provide
the state from master node. Default: `false`. the state from master node. Default: `false`.

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthRequest> implements IndicesRequest.Replaceable { public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthRequest> implements IndicesRequest.Replaceable {
@ -45,6 +46,11 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = ""; private String waitForNodes = "";
private Priority waitForEvents = null; private Priority waitForEvents = null;
/**
* Only used by the high-level REST Client. Controls the details level of the health information returned.
* The default value is 'shards' so it is backward compatible with the transport client behaviour.
*/
private Level level = Level.SHARDS;
public ClusterHealthRequest() { public ClusterHealthRequest() {
} }
@ -242,6 +248,24 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
return this.waitForEvents; return this.waitForEvents;
} }
/**
* Set the level of detail for the health information to be returned.
* Only used by the high-level REST Client
* The default value is 'shards' so it is backward compatible with the transport client behaviour.
*/
public void level(Level level) {
this.level = Objects.requireNonNull(level, "level must not be null");
}
/**
* Get the level of detail for the health information to be returned.
* Only used by the high-level REST Client.
* The default value is 'shards' so it is backward compatible with the transport client behaviour.
*/
public Level level() {
return level;
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; return null;
@ -251,4 +275,8 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
} }
public enum Level {
CLUSTER, INDICES, SHARDS
}
} }

View File

@ -24,19 +24,115 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class ClusterHealthResponse extends ActionResponse implements StatusToXContentObject { public class ClusterHealthResponse extends ActionResponse implements StatusToXContentObject {
private static final String CLUSTER_NAME = "cluster_name";
private static final String STATUS = "status";
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis";
private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number";
private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String INDICES = "indices";
private static final ConstructingObjectParser<ClusterHealthResponse, Void> PARSER =
new ConstructingObjectParser<>("cluster_health_response", true,
parsedObjects -> {
int i = 0;
// ClusterStateHealth fields
int numberOfNodes = (int) parsedObjects[i++];
int numberOfDataNodes = (int) parsedObjects[i++];
int activeShards = (int) parsedObjects[i++];
int relocatingShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
int initializingShards = (int) parsedObjects[i++];
int unassignedShards = (int) parsedObjects[i++];
double activeShardsPercent = (double) parsedObjects[i++];
String statusStr = (String) parsedObjects[i++];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
@SuppressWarnings("unchecked") List<ClusterIndexHealth> indexList = (List<ClusterIndexHealth>) parsedObjects[i++];
final Map<String, ClusterIndexHealth> indices;
if (indexList == null || indexList.isEmpty()) {
indices = emptyMap();
} else {
indices = new HashMap<>(indexList.size());
for (ClusterIndexHealth indexHealth : indexList) {
indices.put(indexHealth.getIndex(), indexHealth);
}
}
ClusterStateHealth stateHealth = new ClusterStateHealth(activePrimaryShards, activeShards, relocatingShards,
initializingShards, unassignedShards, numberOfNodes, numberOfDataNodes, activeShardsPercent, status,
indices);
// ClusterHealthResponse fields
String clusterName = (String) parsedObjects[i++];
int numberOfPendingTasks = (int) parsedObjects[i++];
int numberOfInFlightFetch = (int) parsedObjects[i++];
int delayedUnassignedShards = (int) parsedObjects[i++];
long taskMaxWaitingTimeMillis = (long) parsedObjects[i++];
boolean timedOut = (boolean) parsedObjects[i];
return new ClusterHealthResponse(clusterName, numberOfPendingTasks, numberOfInFlightFetch, delayedUnassignedShards,
TimeValue.timeValueMillis(taskMaxWaitingTimeMillis), timedOut, stateHealth);
});
private static final ObjectParser.NamedObjectParser<ClusterIndexHealth, Void> INDEX_PARSER =
(XContentParser parser, Void context, String index) -> ClusterIndexHealth.innerFromXContent(parser, index);
static {
// ClusterStateHealth fields
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_NODES));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_DATA_NODES));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareDouble(constructorArg(), new ParseField(ACTIVE_SHARDS_PERCENT_AS_NUMBER));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
// Can be absent if LEVEL == 'cluster'
PARSER.declareNamedObjects(optionalConstructorArg(), INDEX_PARSER, new ParseField(INDICES));
// ClusterHealthResponse fields
PARSER.declareString(constructorArg(), new ParseField(CLUSTER_NAME));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_PENDING_TASKS));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_IN_FLIGHT_FETCH));
PARSER.declareInt(constructorArg(), new ParseField(DELAYED_UNASSIGNED_SHARDS));
PARSER.declareLong(constructorArg(), new ParseField(TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS));
PARSER.declareBoolean(constructorArg(), new ParseField(TIMED_OUT));
}
private String clusterName; private String clusterName;
private int numberOfPendingTasks = 0; private int numberOfPendingTasks = 0;
private int numberOfInFlightFetch = 0; private int numberOfInFlightFetch = 0;
@ -60,11 +156,23 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
this.numberOfPendingTasks = numberOfPendingTasks; this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch; this.numberOfInFlightFetch = numberOfInFlightFetch;
this.delayedUnassignedShards = delayedUnassignedShards; this.delayedUnassignedShards = delayedUnassignedShards;
this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
this.clusterHealthStatus = clusterStateHealth.getStatus();
}
/**
* For XContent Parser and serialization tests
*/
ClusterHealthResponse(String clusterName, int numberOfPendingTasks, int numberOfInFlightFetch, int delayedUnassignedShards,
TimeValue taskMaxWaitingTime, boolean timedOut, ClusterStateHealth clusterStateHealth) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks; this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch; this.numberOfInFlightFetch = numberOfInFlightFetch;
this.delayedUnassignedShards = delayedUnassignedShards;
this.taskMaxWaitingTime = taskMaxWaitingTime; this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices); this.timedOut = timedOut;
this.clusterStateHealth = clusterStateHealth;
this.clusterHealthStatus = clusterStateHealth.getStatus(); this.clusterHealthStatus = clusterStateHealth.getStatus();
} }
@ -210,25 +318,6 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
return isTimedOut() ? RestStatus.REQUEST_TIMEOUT : RestStatus.OK; return isTimedOut() ? RestStatus.REQUEST_TIMEOUT : RestStatus.OK;
} }
private static final String CLUSTER_NAME = "cluster_name";
private static final String STATUS = "status";
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE = "task_max_waiting_in_queue";
private static final String TASK_MAX_WAIT_TIME_IN_QUEUE_IN_MILLIS = "task_max_waiting_in_queue_millis";
private static final String ACTIVE_SHARDS_PERCENT_AS_NUMBER = "active_shards_percent_as_number";
private static final String ACTIVE_SHARDS_PERCENT = "active_shards_percent";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String INDICES = "indices";
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -254,13 +343,36 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
if (outputIndices) { if (outputIndices) {
builder.startObject(INDICES); builder.startObject(INDICES);
for (ClusterIndexHealth indexHealth : clusterStateHealth.getIndices().values()) { for (ClusterIndexHealth indexHealth : clusterStateHealth.getIndices().values()) {
builder.startObject(indexHealth.getIndex());
indexHealth.toXContent(builder, params); indexHealth.toXContent(builder, params);
builder.endObject();
} }
builder.endObject(); builder.endObject();
} }
builder.endObject(); builder.endObject();
return builder; return builder;
} }
public static ClusterHealthResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterHealthResponse that = (ClusterHealthResponse) o;
return Objects.equals(clusterName, that.clusterName) &&
numberOfPendingTasks == that.numberOfPendingTasks &&
numberOfInFlightFetch == that.numberOfInFlightFetch &&
delayedUnassignedShards == that.delayedUnassignedShards &&
Objects.equals(taskMaxWaitingTime, that.taskMaxWaitingTime) &&
timedOut == that.timedOut &&
Objects.equals(clusterStateHealth, that.clusterStateHealth) &&
clusterHealthStatus == that.clusterHealthStatus;
}
@Override
public int hashCode() {
return Objects.hash(clusterName, numberOfPendingTasks, numberOfInFlightFetch, delayedUnassignedShards, taskMaxWaitingTime,
timedOut, clusterStateHealth, clusterHealthStatus);
}
} }

View File

@ -22,19 +22,82 @@ package org.elasticsearch.cluster.health;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Writeable, ToXContentFragment { public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Writeable, ToXContentFragment {
private static final String STATUS = "status";
private static final String NUMBER_OF_SHARDS = "number_of_shards";
private static final String NUMBER_OF_REPLICAS = "number_of_replicas";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String SHARDS = "shards";
private static final ConstructingObjectParser<ClusterIndexHealth, String> PARSER =
new ConstructingObjectParser<>("cluster_index_health", true,
(parsedObjects, index) -> {
int i = 0;
int numberOfShards = (int) parsedObjects[i++];
int numberOfReplicas = (int) parsedObjects[i++];
int activeShards = (int) parsedObjects[i++];
int relocatingShards = (int) parsedObjects[i++];
int initializingShards = (int) parsedObjects[i++];
int unassignedShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
String statusStr = (String) parsedObjects[i++];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
@SuppressWarnings("unchecked") List<ClusterShardHealth> shardList = (List<ClusterShardHealth>) parsedObjects[i];
final Map<Integer, ClusterShardHealth> shards;
if (shardList == null || shardList.isEmpty()) {
shards = emptyMap();
} else {
shards = new HashMap<>(shardList.size());
for (ClusterShardHealth shardHealth : shardList) {
shards.put(shardHealth.getShardId(), shardHealth);
}
}
return new ClusterIndexHealth(index, numberOfShards, numberOfReplicas, activeShards, relocatingShards,
initializingShards, unassignedShards, activePrimaryShards, status, shards);
});
public static final ObjectParser.NamedObjectParser<ClusterShardHealth, String> SHARD_PARSER =
(XContentParser p, String indexIgnored, String shardId) -> ClusterShardHealth.innerFromXContent(p, Integer.valueOf(shardId));
static {
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_REPLICAS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
// Can be absent if LEVEL == 'indices' or 'cluster'
PARSER.declareNamedObjects(optionalConstructorArg(), SHARD_PARSER, new ParseField(SHARDS));
}
private final String index; private final String index;
private final int numberOfShards; private final int numberOfShards;
@ -45,13 +108,14 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
private final int unassignedShards; private final int unassignedShards;
private final int activePrimaryShards; private final int activePrimaryShards;
private final ClusterHealthStatus status; private final ClusterHealthStatus status;
private final Map<Integer, ClusterShardHealth> shards = new HashMap<>(); private final Map<Integer, ClusterShardHealth> shards;
public ClusterIndexHealth(final IndexMetaData indexMetaData, final IndexRoutingTable indexRoutingTable) { public ClusterIndexHealth(final IndexMetaData indexMetaData, final IndexRoutingTable indexRoutingTable) {
this.index = indexMetaData.getIndex().getName(); this.index = indexMetaData.getIndex().getName();
this.numberOfShards = indexMetaData.getNumberOfShards(); this.numberOfShards = indexMetaData.getNumberOfShards();
this.numberOfReplicas = indexMetaData.getNumberOfReplicas(); this.numberOfReplicas = indexMetaData.getNumberOfReplicas();
shards = new HashMap<>();
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id(); int shardId = shardRoutingTable.shardId().id();
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable)); shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
@ -104,12 +168,31 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
status = ClusterHealthStatus.fromValue(in.readByte()); status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readVInt(); int size = in.readVInt();
shards = new HashMap<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
ClusterShardHealth shardHealth = new ClusterShardHealth(in); ClusterShardHealth shardHealth = new ClusterShardHealth(in);
shards.put(shardHealth.getId(), shardHealth); shards.put(shardHealth.getShardId(), shardHealth);
} }
} }
/**
* For XContent Parser and serialization tests
*/
ClusterIndexHealth(String index, int numberOfShards, int numberOfReplicas, int activeShards, int relocatingShards,
int initializingShards, int unassignedShards, int activePrimaryShards, ClusterHealthStatus status,
Map<Integer, ClusterShardHealth> shards) {
this.index = index;
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.activeShards = activeShards;
this.relocatingShards = relocatingShards;
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.activePrimaryShards = activePrimaryShards;
this.status = status;
this.shards = shards;
}
public String getIndex() { public String getIndex() {
return index; return index;
} }
@ -173,19 +256,9 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
} }
} }
private static final String STATUS = "status";
private static final String NUMBER_OF_SHARDS = "number_of_shards";
private static final String NUMBER_OF_REPLICAS = "number_of_replicas";
private static final String ACTIVE_PRIMARY_SHARDS = "active_primary_shards";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String SHARDS = "shards";
private static final String PRIMARY_ACTIVE = "primary_active";
@Override @Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject(getIndex());
builder.field(STATUS, getStatus().name().toLowerCase(Locale.ROOT)); builder.field(STATUS, getStatus().name().toLowerCase(Locale.ROOT));
builder.field(NUMBER_OF_SHARDS, getNumberOfShards()); builder.field(NUMBER_OF_SHARDS, getNumberOfShards());
builder.field(NUMBER_OF_REPLICAS, getNumberOfReplicas()); builder.field(NUMBER_OF_REPLICAS, getNumberOfReplicas());
@ -197,22 +270,65 @@ public final class ClusterIndexHealth implements Iterable<ClusterShardHealth>, W
if ("shards".equals(params.param("level", "indices"))) { if ("shards".equals(params.param("level", "indices"))) {
builder.startObject(SHARDS); builder.startObject(SHARDS);
for (ClusterShardHealth shardHealth : shards.values()) { for (ClusterShardHealth shardHealth : shards.values()) {
builder.startObject(Integer.toString(shardHealth.getId())); shardHealth.toXContent(builder, params);
}
builder.field(STATUS, shardHealth.getStatus().name().toLowerCase(Locale.ROOT));
builder.field(PRIMARY_ACTIVE, shardHealth.isPrimaryActive());
builder.field(ACTIVE_SHARDS, shardHealth.getActiveShards());
builder.field(RELOCATING_SHARDS, shardHealth.getRelocatingShards());
builder.field(INITIALIZING_SHARDS, shardHealth.getInitializingShards());
builder.field(UNASSIGNED_SHARDS, shardHealth.getUnassignedShards());
builder.endObject(); builder.endObject();
} }
builder.endObject(); builder.endObject();
}
return builder; return builder;
} }
public static ClusterIndexHealth innerFromXContent(XContentParser parser, String index) {
return PARSER.apply(parser, index);
}
public static ClusterIndexHealth fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
String index = parser.currentName();
ClusterIndexHealth parsed = innerFromXContent(parser, index);
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser::getTokenLocation);
return parsed;
}
@Override
public String toString() {
return "ClusterIndexHealth{" +
"index='" + index + '\'' +
", numberOfShards=" + numberOfShards +
", numberOfReplicas=" + numberOfReplicas +
", activeShards=" + activeShards +
", relocatingShards=" + relocatingShards +
", initializingShards=" + initializingShards +
", unassignedShards=" + unassignedShards +
", activePrimaryShards=" + activePrimaryShards +
", status=" + status +
", shards.size=" + (shards == null ? "null" : shards.size()) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterIndexHealth that = (ClusterIndexHealth) o;
return Objects.equals(index, that.index) &&
numberOfShards == that.numberOfShards &&
numberOfReplicas == that.numberOfReplicas &&
activeShards == that.activeShards &&
relocatingShards == that.relocatingShards &&
initializingShards == that.initializingShards &&
unassignedShards == that.unassignedShards &&
activePrimaryShards == that.activePrimaryShards &&
status == that.status &&
Objects.equals(shards, that.shards);
}
@Override
public int hashCode() {
return Objects.hash(index, numberOfShards, numberOfReplicas, activeShards, relocatingShards, initializingShards, unassignedShards,
activePrimaryShards, status, shards);
}
} }

View File

@ -24,13 +24,54 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
public final class ClusterShardHealth implements Writeable { import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
public final class ClusterShardHealth implements Writeable, ToXContentFragment {
private static final String STATUS = "status";
private static final String ACTIVE_SHARDS = "active_shards";
private static final String RELOCATING_SHARDS = "relocating_shards";
private static final String INITIALIZING_SHARDS = "initializing_shards";
private static final String UNASSIGNED_SHARDS = "unassigned_shards";
private static final String PRIMARY_ACTIVE = "primary_active";
public static final ConstructingObjectParser<ClusterShardHealth, Integer> PARSER =
new ConstructingObjectParser<>("cluster_shard_health", true,
(parsedObjects, shardId) -> {
int i = 0;
boolean primaryActive = (boolean) parsedObjects[i++];
int activeShards = (int) parsedObjects[i++];
int relocatingShards = (int) parsedObjects[i++];
int initializingShards = (int) parsedObjects[i++];
int unassignedShards = (int) parsedObjects[i++];
String statusStr = (String) parsedObjects[i];
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
return new ClusterShardHealth(shardId, status, activeShards, relocatingShards, initializingShards, unassignedShards,
primaryActive);
});
static {
PARSER.declareBoolean(constructorArg(), new ParseField(PRIMARY_ACTIVE));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(INITIALIZING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(UNASSIGNED_SHARDS));
PARSER.declareString(constructorArg(), new ParseField(STATUS));
}
private final int shardId; private final int shardId;
private final ClusterHealthStatus status; private final ClusterHealthStatus status;
@ -88,7 +129,21 @@ public final class ClusterShardHealth implements Writeable {
primaryActive = in.readBoolean(); primaryActive = in.readBoolean();
} }
public int getId() { /**
* For XContent Parser and serialization tests
*/
ClusterShardHealth(int shardId, ClusterHealthStatus status, int activeShards, int relocatingShards, int initializingShards,
int unassignedShards, boolean primaryActive) {
this.shardId = shardId;
this.status = status;
this.activeShards = activeShards;
this.relocatingShards = relocatingShards;
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.primaryActive = primaryActive;
}
public int getShardId() {
return shardId; return shardId;
} }
@ -155,4 +210,54 @@ public final class ClusterShardHealth implements Writeable {
} }
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(getShardId()));
builder.field(STATUS, getStatus().name().toLowerCase(Locale.ROOT));
builder.field(PRIMARY_ACTIVE, isPrimaryActive());
builder.field(ACTIVE_SHARDS, getActiveShards());
builder.field(RELOCATING_SHARDS, getRelocatingShards());
builder.field(INITIALIZING_SHARDS, getInitializingShards());
builder.field(UNASSIGNED_SHARDS, getUnassignedShards());
builder.endObject();
return builder;
}
static ClusterShardHealth innerFromXContent(XContentParser parser, Integer shardId) {
return PARSER.apply(parser, shardId);
}
public static ClusterShardHealth fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
String shardIdStr = parser.currentName();
ClusterShardHealth parsed = innerFromXContent(parser, Integer.valueOf(shardIdStr));
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser::getTokenLocation);
return parsed;
}
@Override
public String toString() {
return Strings.toString(this);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ClusterShardHealth)) return false;
ClusterShardHealth that = (ClusterShardHealth) o;
return shardId == that.shardId &&
activeShards == that.activeShards &&
relocatingShards == that.relocatingShards &&
initializingShards == that.initializingShards &&
unassignedShards == that.unassignedShards &&
primaryActive == that.primaryActive &&
status == that.status;
}
@Override
public int hashCode() {
return Objects.hash(shardId, status, activeShards, relocatingShards, initializingShards, unassignedShards, primaryActive);
}
} }

View File

@ -33,6 +33,8 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, Writeable { public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, Writeable {
@ -45,7 +47,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
private final int unassignedShards; private final int unassignedShards;
private final double activeShardsPercent; private final double activeShardsPercent;
private final ClusterHealthStatus status; private final ClusterHealthStatus status;
private final Map<String, ClusterIndexHealth> indices = new HashMap<>(); private final Map<String, ClusterIndexHealth> indices;
/** /**
* Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and all indices in the cluster. * Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and all indices in the cluster.
@ -65,7 +67,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) { public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) {
numberOfNodes = clusterState.nodes().getSize(); numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().getDataNodes().size(); numberOfDataNodes = clusterState.nodes().getDataNodes().size();
indices = new HashMap<>();
for (String index : concreteIndices) { for (String index : concreteIndices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
IndexMetaData indexMetaData = clusterState.metaData().index(index); IndexMetaData indexMetaData = clusterState.metaData().index(index);
@ -134,6 +136,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
numberOfDataNodes = in.readVInt(); numberOfDataNodes = in.readVInt();
status = ClusterHealthStatus.fromValue(in.readByte()); status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readVInt(); int size = in.readVInt();
indices = new HashMap<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
ClusterIndexHealth indexHealth = new ClusterIndexHealth(in); ClusterIndexHealth indexHealth = new ClusterIndexHealth(in);
indices.put(indexHealth.getIndex(), indexHealth); indices.put(indexHealth.getIndex(), indexHealth);
@ -141,6 +144,24 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
activeShardsPercent = in.readDouble(); activeShardsPercent = in.readDouble();
} }
/**
* For ClusterHealthResponse's XContent Parser
*/
public ClusterStateHealth(int activePrimaryShards, int activeShards, int relocatingShards, int initializingShards, int unassignedShards,
int numberOfNodes, int numberOfDataNodes, double activeShardsPercent, ClusterHealthStatus status,
Map<String, ClusterIndexHealth> indices) {
this.activePrimaryShards = activePrimaryShards;
this.activeShards = activeShards;
this.relocatingShards = relocatingShards;
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.numberOfNodes = numberOfNodes;
this.numberOfDataNodes = numberOfDataNodes;
this.activeShardsPercent = activeShardsPercent;
this.status = status;
this.indices = indices;
}
public int getActiveShards() { public int getActiveShards() {
return activeShards; return activeShards;
} }
@ -202,4 +223,43 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
} }
out.writeDouble(activeShardsPercent); out.writeDouble(activeShardsPercent);
} }
@Override
public String toString() {
return "ClusterStateHealth{" +
"numberOfNodes=" + numberOfNodes +
", numberOfDataNodes=" + numberOfDataNodes +
", activeShards=" + activeShards +
", relocatingShards=" + relocatingShards +
", activePrimaryShards=" + activePrimaryShards +
", initializingShards=" + initializingShards +
", unassignedShards=" + unassignedShards +
", activeShardsPercent=" + activeShardsPercent +
", status=" + status +
", indices.size=" + (indices == null ? "null" : indices.size()) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterStateHealth that = (ClusterStateHealth) o;
return numberOfNodes == that.numberOfNodes &&
numberOfDataNodes == that.numberOfDataNodes &&
activeShards == that.activeShards &&
relocatingShards == that.relocatingShards &&
activePrimaryShards == that.activePrimaryShards &&
initializingShards == that.initializingShards &&
unassignedShards == that.unassignedShards &&
Double.compare(that.activeShardsPercent, activeShardsPercent) == 0 &&
status == that.status &&
Objects.equals(indices, that.indices);
}
@Override
public int hashCode() {
return Objects.hash(numberOfNodes, numberOfDataNodes, activeShards, relocatingShards, activePrimaryShards, initializingShards,
unassignedShards, activeShardsPercent, status, indices);
}
} }

View File

@ -21,26 +21,38 @@ package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterIndexHealthTests;
import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ClusterHealthResponsesTests extends ESTestCase { public class ClusterHealthResponsesTests extends AbstractStreamableXContentTestCase<ClusterHealthResponse> {
private final ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.values());
public void testIsTimeout() throws IOException { public void testIsTimeout() {
ClusterHealthResponse res = new ClusterHealthResponse(); ClusterHealthResponse res = new ClusterHealthResponse();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
res.setTimedOut(randomBoolean()); res.setTimedOut(randomBoolean());
@ -89,4 +101,101 @@ public class ClusterHealthResponsesTests extends ESTestCase {
} }
return clusterHealth; return clusterHealth;
} }
@Override
protected ClusterHealthResponse doParseInstance(XContentParser parser) {
return ClusterHealthResponse.fromXContent(parser);
}
@Override
protected ClusterHealthResponse createBlankInstance() {
return new ClusterHealthResponse();
}
@Override
protected ClusterHealthResponse createTestInstance() {
int indicesSize = randomInt(20);
Map<String, ClusterIndexHealth> indices = new HashMap<>(indicesSize);
if ("indices".equals(level) || "shards".equals(level)) {
for (int i = 0; i < indicesSize; i++) {
String indexName = randomAlphaOfLengthBetween(1, 5) + i;
indices.put(indexName, ClusterIndexHealthTests.randomIndexHealth(indexName, level));
}
}
ClusterStateHealth stateHealth = new ClusterStateHealth(randomInt(100), randomInt(100), randomInt(100),
randomInt(100), randomInt(100), randomInt(100), randomInt(100),
randomDoubleBetween(0d, 100d, true), randomFrom(ClusterHealthStatus.values()), indices);
return new ClusterHealthResponse(randomAlphaOfLengthBetween(1, 10), randomInt(100), randomInt(100), randomInt(100),
TimeValue.timeValueMillis(randomInt(10000)), randomBoolean(), stateHealth);
}
@Override
protected ToXContent.Params getToXContentParams() {
return new ToXContent.MapParams(Collections.singletonMap("level", level.name().toLowerCase(Locale.ROOT)));
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
// Ignore all paths which looks like "indices.RANDOMINDEXNAME.shards"
private static final Pattern SHARDS_IN_XCONTENT = Pattern.compile("^indices\\.\\w+\\.shards$");
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> "indices".equals(field) || SHARDS_IN_XCONTENT.matcher(field).find();
}
@Override
protected ClusterHealthResponse mutateInstance(ClusterHealthResponse instance) {
String mutate = randomFrom("clusterName", "numberOfPendingTasks","numberOfInFlightFetch", "delayedUnassignedShards",
"taskMaxWaitingTime", "timedOut", "clusterStateHealth");
switch (mutate) {
case "clusterName":
return new ClusterHealthResponse(instance.getClusterName() + randomAlphaOfLengthBetween(2, 5),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(),
instance.isTimedOut(), instance.getClusterStateHealth());
case "numberOfPendingTasks":
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks() + between(1, 10), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(),
instance.isTimedOut(), instance.getClusterStateHealth());
case "numberOfInFlightFetch":
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch() + between(1, 10),
instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(),
instance.isTimedOut(), instance.getClusterStateHealth());
case "delayedUnassignedShards":
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards() + between(1, 10), instance.getTaskMaxWaitingTime(),
instance.isTimedOut(), instance.getClusterStateHealth());
case "taskMaxWaitingTime":
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards(), new TimeValue(instance.getTaskMaxWaitingTime().millis() + between(1, 10)),
instance.isTimedOut(), instance.getClusterStateHealth());
case "timedOut":
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(),
instance.isTimedOut() == false, instance.getClusterStateHealth());
case "clusterStateHealth":
ClusterStateHealth state = instance.getClusterStateHealth();
ClusterStateHealth newState = new ClusterStateHealth(state.getActivePrimaryShards() + between(1, 10),
state.getActiveShards(), state.getRelocatingShards(), state.getInitializingShards(), state.getUnassignedShards(),
state.getNumberOfNodes(), state.getNumberOfDataNodes(), state.getActiveShardsPercent(), state.getStatus(),
state.getIndices());
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(),
instance.isTimedOut(), newState);
default:
throw new UnsupportedOperationException();
}
}
} }

View File

@ -19,29 +19,45 @@
package org.elasticsearch.cluster.health; package org.elasticsearch.cluster.health;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTableGenerator; import org.elasticsearch.cluster.routing.RoutingTableGenerator;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
public class ClusterIndexHealthTests extends ESTestCase { public class ClusterIndexHealthTests extends AbstractSerializingTestCase<ClusterIndexHealth> {
private final ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.SHARDS, ClusterHealthRequest.Level.INDICES);
public void testClusterIndexHealth() { public void testClusterIndexHealth() {
RoutingTableGenerator routingTableGenerator = new RoutingTableGenerator(); RoutingTableGenerator routingTableGenerator = new RoutingTableGenerator();
int numberOfShards = randomInt(3) + 1; int numberOfShards = randomInt(3) + 1;
int numberOfReplicas = randomInt(4); int numberOfReplicas = randomInt(4);
IndexMetaData indexMetaData = IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas).build(); IndexMetaData indexMetaData = IndexMetaData.builder("test1").settings(settings(Version.CURRENT))
.numberOfShards(numberOfShards).numberOfReplicas(numberOfReplicas).build();
RoutingTableGenerator.ShardCounter counter = new RoutingTableGenerator.ShardCounter(); RoutingTableGenerator.ShardCounter counter = new RoutingTableGenerator.ShardCounter();
IndexRoutingTable indexRoutingTable = routingTableGenerator.genIndexRoutingTable(indexMetaData, counter); IndexRoutingTable indexRoutingTable = routingTableGenerator.genIndexRoutingTable(indexMetaData, counter);
ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetaData, indexRoutingTable); ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetaData, indexRoutingTable);
logger.info("index status: {}, expected {}", indexHealth.getStatus(), counter.status());
assertIndexHealth(indexHealth, counter, indexMetaData); assertIndexHealth(indexHealth, counter, indexMetaData);
} }
private void assertIndexHealth(ClusterIndexHealth indexHealth, RoutingTableGenerator.ShardCounter counter,
private void assertIndexHealth(ClusterIndexHealth indexHealth, RoutingTableGenerator.ShardCounter counter, IndexMetaData indexMetaData) { IndexMetaData indexMetaData) {
assertThat(indexHealth.getStatus(), equalTo(counter.status())); assertThat(indexHealth.getStatus(), equalTo(counter.status()));
assertThat(indexHealth.getNumberOfShards(), equalTo(indexMetaData.getNumberOfShards())); assertThat(indexHealth.getNumberOfShards(), equalTo(indexMetaData.getNumberOfShards()));
assertThat(indexHealth.getNumberOfReplicas(), equalTo(indexMetaData.getNumberOfReplicas())); assertThat(indexHealth.getNumberOfReplicas(), equalTo(indexMetaData.getNumberOfReplicas()));
@ -57,4 +73,119 @@ public class ClusterIndexHealthTests extends ESTestCase {
assertThat(totalShards, equalTo(indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas()))); assertThat(totalShards, equalTo(indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas())));
} }
@Override
protected ClusterIndexHealth createTestInstance() {
return randomIndexHealth(randomAlphaOfLengthBetween(1, 10), level);
}
public static ClusterIndexHealth randomIndexHealth(String indexName, ClusterHealthRequest.Level level) {
Map<Integer, ClusterShardHealth> shards = new HashMap<>();
if (level == ClusterHealthRequest.Level.SHARDS) {
for (int i = 0; i < randomInt(5); i++) {
shards.put(i, ClusterShardHealthTests.randomShardHealth(i));
}
}
return new ClusterIndexHealth(indexName, randomInt(1000), randomInt(1000), randomInt(1000), randomInt(1000),
randomInt(1000), randomInt(1000), randomInt(1000), randomFrom(ClusterHealthStatus.values()), shards);
}
@Override
protected Writeable.Reader<ClusterIndexHealth> instanceReader() {
return ClusterIndexHealth::new;
}
@Override
protected ClusterIndexHealth doParseInstance(XContentParser parser) throws IOException {
return ClusterIndexHealth.fromXContent(parser);
}
@Override
protected ToXContent.Params getToXContentParams() {
return new ToXContent.MapParams(Collections.singletonMap("level", level.name().toLowerCase(Locale.ROOT)));
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
// Ignore all paths which looks like "RANDOMINDEXNAME.shards"
private static final Pattern SHARDS_IN_XCONTENT = Pattern.compile("^\\w+\\.shards$");
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> "".equals(field) || SHARDS_IN_XCONTENT.matcher(field).find();
}
@Override
protected ClusterIndexHealth mutateInstance(ClusterIndexHealth instance) throws IOException {
String mutate = randomFrom("index", "numberOfShards", "numberOfReplicas", "activeShards", "relocatingShards",
"initializingShards", "unassignedShards", "activePrimaryShards", "status", "shards");
switch (mutate) {
case "index":
return new ClusterIndexHealth(instance.getIndex() + randomAlphaOfLengthBetween(2, 5), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "numberOfShards":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards() + between(1, 10),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "numberOfReplicas":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas() + between(1, 10), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "activeShards":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards() + between(1, 10), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "relocatingShards":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards() + between(1, 10),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "initializingShards":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards() + between(1, 10), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "unassignedShards":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards() + between(1, 10),
instance.getActivePrimaryShards(), instance.getStatus(), instance.getShards());
case "activePrimaryShards":
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards() + between(1, 10), instance.getStatus(), instance.getShards());
case "status":
ClusterHealthStatus status = randomFrom(
Arrays.stream(ClusterHealthStatus.values()).filter(
value -> !value.equals(instance.getStatus())
).collect(Collectors.toList())
);
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), status, instance.getShards());
case "shards":
Map<Integer, ClusterShardHealth> map;
if (instance.getShards().isEmpty()) {
map = Collections.singletonMap(0, ClusterShardHealthTests.randomShardHealth(0));
} else {
map = new HashMap<>(instance.getShards());
map.remove(map.keySet().iterator().next());
}
return new ClusterIndexHealth(instance.getIndex(), instance.getNumberOfShards(),
instance.getNumberOfReplicas(), instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.getActivePrimaryShards(), instance.getStatus(), map);
default:
throw new UnsupportedOperationException();
}
}
} }

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.health;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class ClusterShardHealthTests extends AbstractSerializingTestCase<ClusterShardHealth> {
@Override
protected ClusterShardHealth doParseInstance(XContentParser parser) throws IOException {
return ClusterShardHealth.fromXContent(parser);
}
@Override
protected ClusterShardHealth createTestInstance() {
return randomShardHealth(randomInt(1000));
}
static ClusterShardHealth randomShardHealth(int id) {
return new ClusterShardHealth(id, randomFrom(ClusterHealthStatus.values()), randomInt(1000), randomInt(1000),
randomInt(1000), randomInt(1000), randomBoolean());
}
@Override
protected Writeable.Reader<ClusterShardHealth> instanceReader() {
return ClusterShardHealth::new;
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
//don't inject random fields at the root, which contains arbitrary shard ids
return ""::equals;
}
@Override
protected ClusterShardHealth mutateInstance(final ClusterShardHealth instance) {
String mutate = randomFrom("shardId", "status", "activeShards", "relocatingShards", "initializingShards",
"unassignedShards", "primaryActive");
switch (mutate) {
case "shardId":
return new ClusterShardHealth(instance.getShardId() + between(1, 10), instance.getStatus(),
instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.isPrimaryActive());
case "status":
ClusterHealthStatus status = randomFrom(
Arrays.stream(ClusterHealthStatus.values()).filter(
value -> !value.equals(instance.getStatus())
).collect(Collectors.toList())
);
return new ClusterShardHealth(instance.getShardId(), status,
instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.isPrimaryActive());
case "activeShards":
return new ClusterShardHealth(instance.getShardId(), instance.getStatus(),
instance.getActiveShards() + between(1, 10), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.isPrimaryActive());
case "relocatingShards":
return new ClusterShardHealth(instance.getShardId(), instance.getStatus(),
instance.getActiveShards(), instance.getRelocatingShards() + between(1, 10),
instance.getInitializingShards(), instance.getUnassignedShards(), instance.isPrimaryActive());
case "initializingShards":
return new ClusterShardHealth(instance.getShardId(), instance.getStatus(),
instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards() + between(1, 10), instance.getUnassignedShards(),
instance.isPrimaryActive());
case "unassignedShards":
return new ClusterShardHealth(instance.getShardId(), instance.getStatus(),
instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards() + between(1, 10),
instance.isPrimaryActive());
case "primaryActive":
return new ClusterShardHealth(instance.getShardId(), instance.getStatus(),
instance.getActiveShards(), instance.getRelocatingShards(),
instance.getInitializingShards(), instance.getUnassignedShards(),
instance.isPrimaryActive() == false);
default:
throw new UnsupportedOperationException();
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractXContentTestCase;
@ -96,7 +97,7 @@ public class CancelTasksResponseTests extends AbstractXContentTestCase<CancelTas
boolean assertToXContentEquivalence = false; boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence); this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
} }
private static CancelTasksResponse createTestInstanceWithFailures() { private static CancelTasksResponse createTestInstanceWithFailures() {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractXContentTestCase;
@ -156,7 +157,7 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
boolean assertToXContentEquivalence = false; boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence); this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
} }
private static ListTasksResponse createTestInstanceWithFailures() { private static ListTasksResponse createTestInstanceWithFailures() {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.test;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
@ -35,7 +36,7 @@ public abstract class AbstractSerializingTestCase<T extends ToXContent & Writeab
public final void testFromXContent() throws IOException { public final void testFromXContent() throws IOException {
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(), AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(),
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, true); this::assertEqualInstances, true, getToXContentParams());
} }
/** /**
@ -64,4 +65,11 @@ public abstract class AbstractSerializingTestCase<T extends ToXContent & Writeab
protected String[] getShuffleFieldsExceptions() { protected String[] getShuffleFieldsExceptions() {
return Strings.EMPTY_ARRAY; return Strings.EMPTY_ARRAY;
} }
/**
* Params that have to be provided when calling calling {@link ToXContent#toXContent(XContentBuilder, ToXContent.Params)}
*/
protected ToXContent.Params getToXContentParams() {
return ToXContent.EMPTY_PARAMS;
}
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.test;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
@ -35,7 +36,7 @@ public abstract class AbstractStreamableXContentTestCase<T extends ToXContent &
public final void testFromXContent() throws IOException { public final void testFromXContent() throws IOException {
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(), AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(),
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, true); this::assertEqualInstances, true, getToXContentParams());
} }
/** /**
@ -64,4 +65,11 @@ public abstract class AbstractStreamableXContentTestCase<T extends ToXContent &
protected String[] getShuffleFieldsExceptions() { protected String[] getShuffleFieldsExceptions() {
return Strings.EMPTY_ARRAY; return Strings.EMPTY_ARRAY;
} }
/**
* Params that have to be provided when calling calling {@link ToXContent#toXContent(XContentBuilder, ToXContent.Params)}
*/
protected ToXContent.Params getToXContentParams() {
return ToXContent.EMPTY_PARAMS;
}
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -48,12 +49,13 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
createParserFunction, createParserFunction,
CheckedFunction<XContentParser, T, IOException> parseFunction, CheckedFunction<XContentParser, T, IOException> parseFunction,
BiConsumer<T, T> assertEqualsConsumer, BiConsumer<T, T> assertEqualsConsumer,
boolean assertToXContentEquivalence) throws IOException { boolean assertToXContentEquivalence,
ToXContent.Params toXContentParams) throws IOException {
for (int runs = 0; runs < numberOfTestRuns; runs++) { for (int runs = 0; runs < numberOfTestRuns; runs++) {
T testInstance = instanceSupplier.get(); T testInstance = instanceSupplier.get();
XContentType xContentType = randomFrom(XContentType.values()); XContentType xContentType = randomFrom(XContentType.values());
BytesReference shuffled = toShuffledXContent(testInstance, xContentType, ToXContent.EMPTY_PARAMS, false, createParserFunction, BytesReference shuffled = toShuffledXContent(testInstance, xContentType, toXContentParams,false,
shuffleFieldsExceptions); createParserFunction, shuffleFieldsExceptions);
BytesReference withRandomFields; BytesReference withRandomFields;
if (supportsUnknownFields) { if (supportsUnknownFields) {
// we add a few random fields to check that parser is lenient on new fields // we add a few random fields to check that parser is lenient on new fields
@ -65,7 +67,8 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
T parsed = parseFunction.apply(parser); T parsed = parseFunction.apply(parser);
assertEqualsConsumer.accept(testInstance, parsed); assertEqualsConsumer.accept(testInstance, parsed);
if (assertToXContentEquivalence) { if (assertToXContentEquivalence) {
assertToXContentEquivalent(shuffled, XContentHelper.toXContent(parsed, xContentType, false), xContentType); assertToXContentEquivalent(shuffled, XContentHelper.toXContent(parsed, xContentType, toXContentParams, false),
xContentType);
} }
} }
} }
@ -77,7 +80,7 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
public final void testFromXContent() throws IOException { public final void testFromXContent() throws IOException {
testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(), getShuffleFieldsExceptions(), testFromXContent(NUMBER_OF_TEST_RUNS, this::createTestInstance, supportsUnknownFields(), getShuffleFieldsExceptions(),
getRandomFieldsExcludeFilter(), this::createParser, this::parseInstance, this::assertEqualInstances, getRandomFieldsExcludeFilter(), this::createParser, this::parseInstance, this::assertEqualInstances,
assertToXContentEquivalence()); assertToXContentEquivalence(), getToXContentParams());
} }
/** /**
@ -127,4 +130,11 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
protected String[] getShuffleFieldsExceptions() { protected String[] getShuffleFieldsExceptions() {
return Strings.EMPTY_ARRAY; return Strings.EMPTY_ARRAY;
} }
/**
* Params that have to be provided when calling calling {@link ToXContent#toXContent(XContentBuilder, ToXContent.Params)}
*/
protected ToXContent.Params getToXContentParams() {
return ToXContent.EMPTY_PARAMS;
}
} }