Apply fix for health API response to distinguish no master (#656)

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>
This commit is contained in:
Mohit Godwani 2021-05-24 22:35:55 +05:30 committed by GitHub
parent 658dc18b5f
commit 4e8c92f2aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 135 additions and 11 deletions

View File

@ -12,6 +12,7 @@
status .+ \n
node.total .+ \n
node.data .+ \n
discovered_master .+ \n
shards .+ \n
pri .+ \n
relo .+ \n
@ -39,6 +40,7 @@
\w+ \s+ # status
\d+ \s+ # node.total
\d+ \s+ # node.data
\w+ \s+ # discovered_master
\d+ \s+ # shards
\d+ \s+ # pri
\d+ \s+ # relo
@ -66,6 +68,7 @@
\w+ \s+ # status
\d+ \s+ # node.total
\d+ \s+ # node.data
\w+ \s+ # discovered_master
\d+ \s+ # shards
\d+ \s+ # pri
\d+ \s+ # relo

View File

@ -66,6 +66,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
private static final String TIMED_OUT = "timed_out";
private static final String NUMBER_OF_NODES = "number_of_nodes";
private static final String NUMBER_OF_DATA_NODES = "number_of_data_nodes";
private static final String DISCOVERED_MASTER = "discovered_master";
private static final String NUMBER_OF_PENDING_TASKS = "number_of_pending_tasks";
private static final String NUMBER_OF_IN_FLIGHT_FETCH = "number_of_in_flight_fetch";
private static final String DELAYED_UNASSIGNED_SHARDS = "delayed_unassigned_shards";
@ -87,6 +88,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
// ClusterStateHealth fields
int numberOfNodes = (int) parsedObjects[i++];
int numberOfDataNodes = (int) parsedObjects[i++];
boolean hasDiscoveredMaster = (boolean) parsedObjects[i++];
int activeShards = (int) parsedObjects[i++];
int relocatingShards = (int) parsedObjects[i++];
int activePrimaryShards = (int) parsedObjects[i++];
@ -106,9 +108,8 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
}
}
ClusterStateHealth stateHealth = new ClusterStateHealth(activePrimaryShards, activeShards, relocatingShards,
initializingShards, unassignedShards, numberOfNodes, numberOfDataNodes, activeShardsPercent, status,
indices);
initializingShards, unassignedShards, numberOfNodes, numberOfDataNodes, hasDiscoveredMaster,
activeShardsPercent, status, indices);
// ClusterHealthResponse fields
String clusterName = (String) parsedObjects[i++];
int numberOfPendingTasks = (int) parsedObjects[i++];
@ -127,6 +128,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
// ClusterStateHealth fields
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_NODES));
PARSER.declareInt(constructorArg(), new ParseField(NUMBER_OF_DATA_NODES));
PARSER.declareBoolean(constructorArg(), new ParseField(DISCOVERED_MASTER));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(RELOCATING_SHARDS));
PARSER.declareInt(constructorArg(), new ParseField(ACTIVE_PRIMARY_SHARDS));
@ -237,6 +239,10 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
return clusterStateHealth.getNumberOfDataNodes();
}
public boolean hasDiscoveredMaster() {
return clusterStateHealth.hasDiscoveredMaster();
}
public int getNumberOfPendingTasks() {
return this.numberOfPendingTasks;
}
@ -334,6 +340,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
builder.field(TIMED_OUT, isTimedOut());
builder.field(NUMBER_OF_NODES, getNumberOfNodes());
builder.field(NUMBER_OF_DATA_NODES, getNumberOfDataNodes());
builder.field(DISCOVERED_MASTER, hasDiscoveredMaster());
builder.field(ACTIVE_PRIMARY_SHARDS, getActivePrimaryShards());
builder.field(ACTIVE_SHARDS, getActiveShards());
builder.field(RELOCATING_SHARDS, getRelocatingShards());

View File

@ -31,6 +31,7 @@
package org.opensearch.cluster.health;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
@ -53,6 +54,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
private final int numberOfNodes;
private final int numberOfDataNodes;
private final boolean hasDiscoveredMaster;
private final int activeShards;
private final int relocatingShards;
private final int activePrimaryShards;
@ -80,6 +82,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices) {
numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().getDataNodes().size();
hasDiscoveredMaster = clusterState.nodes().getMasterNodeId() != null;
indices = new HashMap<>();
for (String index : concreteIndices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
@ -147,6 +150,11 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
unassignedShards = in.readVInt();
numberOfNodes = in.readVInt();
numberOfDataNodes = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_1_0_0)) {
hasDiscoveredMaster = in.readBoolean();
} else {
hasDiscoveredMaster = true;
}
status = ClusterHealthStatus.fromValue(in.readByte());
int size = in.readVInt();
indices = new HashMap<>(size);
@ -161,7 +169,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
* For ClusterHealthResponse's XContent Parser
*/
public ClusterStateHealth(int activePrimaryShards, int activeShards, int relocatingShards, int initializingShards, int unassignedShards,
int numberOfNodes, int numberOfDataNodes, double activeShardsPercent, ClusterHealthStatus status,
int numberOfNodes, int numberOfDataNodes, boolean hasDiscoveredMaster, double activeShardsPercent, ClusterHealthStatus status,
Map<String, ClusterIndexHealth> indices) {
this.activePrimaryShards = activePrimaryShards;
this.activeShards = activeShards;
@ -170,6 +178,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
this.unassignedShards = unassignedShards;
this.numberOfNodes = numberOfNodes;
this.numberOfDataNodes = numberOfDataNodes;
this.hasDiscoveredMaster = hasDiscoveredMaster;
this.activeShardsPercent = activeShardsPercent;
this.status = status;
this.indices = indices;
@ -215,6 +224,10 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
return activeShardsPercent;
}
public boolean hasDiscoveredMaster() {
return hasDiscoveredMaster;
}
@Override
public Iterator<ClusterIndexHealth> iterator() {
return indices.values().iterator();
@ -229,6 +242,9 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
out.writeVInt(unassignedShards);
out.writeVInt(numberOfNodes);
out.writeVInt(numberOfDataNodes);
if (out.getVersion().onOrAfter(Version.V_1_0_0)) {
out.writeBoolean(hasDiscoveredMaster);
}
out.writeByte(status.value());
out.writeVInt(indices.size());
for (ClusterIndexHealth indexHealth : this) {
@ -242,6 +258,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
return "ClusterStateHealth{" +
"numberOfNodes=" + numberOfNodes +
", numberOfDataNodes=" + numberOfDataNodes +
", hasDiscoveredMaster=" + hasDiscoveredMaster +
", activeShards=" + activeShards +
", relocatingShards=" + relocatingShards +
", activePrimaryShards=" + activePrimaryShards +
@ -260,6 +277,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
ClusterStateHealth that = (ClusterStateHealth) o;
return numberOfNodes == that.numberOfNodes &&
numberOfDataNodes == that.numberOfDataNodes &&
hasDiscoveredMaster == that.hasDiscoveredMaster &&
activeShards == that.activeShards &&
relocatingShards == that.relocatingShards &&
activePrimaryShards == that.activePrimaryShards &&
@ -272,7 +290,7 @@ public final class ClusterStateHealth implements Iterable<ClusterIndexHealth>, W
@Override
public int hashCode() {
return Objects.hash(numberOfNodes, numberOfDataNodes, activeShards, relocatingShards, activePrimaryShards, initializingShards,
unassignedShards, activeShardsPercent, status, indices);
return Objects.hash(numberOfNodes, numberOfDataNodes, hasDiscoveredMaster, activeShards, relocatingShards,
activePrimaryShards, initializingShards, unassignedShards, activeShardsPercent, status, indices);
}
}

View File

@ -89,6 +89,7 @@ public class RestHealthAction extends AbstractCatAction {
t.addCell("status", "alias:st;desc:health status");
t.addCell("node.total", "alias:nt,nodeTotal;text-align:right;desc:total number of nodes");
t.addCell("node.data", "alias:nd,nodeData;text-align:right;desc:number of nodes that can store data");
t.addCell("discovered_master", "alias:dm;text-align:right;desc:discovered master");
t.addCell("shards", "alias:t,sh,shards.total,shardsTotal;text-align:right;desc:total number of shards");
t.addCell("pri", "alias:p,shards.primary,shardsPrimary;text-align:right;desc:number of primary shards");
t.addCell("relo", "alias:r,shards.relocating,shardsRelocating;text-align:right;desc:number of relocating nodes");
@ -109,6 +110,7 @@ public class RestHealthAction extends AbstractCatAction {
t.addCell(health.getStatus().name().toLowerCase(Locale.ROOT));
t.addCell(health.getNumberOfNodes());
t.addCell(health.getNumberOfDataNodes());
t.addCell(health.hasDiscoveredMaster());
t.addCell(health.getActiveShards());
t.addCell(health.getActivePrimaryShards());
t.addCell(health.getRelocatingShards());

View File

@ -32,6 +32,8 @@
package org.opensearch.action.admin.cluster.health;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
@ -39,18 +41,24 @@ import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.health.ClusterIndexHealthTests;
import org.opensearch.cluster.health.ClusterStateHealth;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.AbstractSerializingTestCase;
import org.opensearch.test.OpenSearchTestCase;
import org.hamcrest.Matchers;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.test.VersionUtils;
import java.io.IOException;
import java.util.Collections;
@ -97,6 +105,26 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
assertThat(clusterHealth.getActiveShardsPercent(), is(allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0))));
}
public void testClusterHealthVerifyMasterNodeDiscovery() throws IOException {
DiscoveryNode localNode = new DiscoveryNode("node", OpenSearchTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
//set the node information to verify master_node discovery in ClusterHealthResponse
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(DiscoveryNodes.builder()
.add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId()))
.build();
int pendingTasks = randomIntBetween(0, 200);
int inFlight = randomIntBetween(0, 200);
int delayedUnassigned = randomIntBetween(0, 200);
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", new String[] {Metadata.ALL},
clusterState, pendingTasks, inFlight, delayedUnassigned, pendingTaskInQueueTime);
clusterHealth = maybeSerialize(clusterHealth);
assertThat(clusterHealth.getClusterStateHealth().hasDiscoveredMaster(), Matchers.equalTo(true));
assertClusterHealth(clusterHealth);
}
private void assertClusterHealth(ClusterHealthResponse clusterHealth) {
ClusterStateHealth clusterStateHealth = clusterHealth.getClusterStateHealth();
@ -107,6 +135,53 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
assertThat(clusterHealth.getUnassignedShards(), Matchers.equalTo(clusterStateHealth.getUnassignedShards()));
assertThat(clusterHealth.getNumberOfNodes(), Matchers.equalTo(clusterStateHealth.getNumberOfNodes()));
assertThat(clusterHealth.getNumberOfDataNodes(), Matchers.equalTo(clusterStateHealth.getNumberOfDataNodes()));
assertThat(clusterHealth.hasDiscoveredMaster(), Matchers.equalTo(clusterStateHealth.hasDiscoveredMaster()));
}
public void testVersionCompatibleSerialization() throws IOException {
boolean hasDiscoveredMaster = false;
int indicesSize = randomInt(20);
Map<String, ClusterIndexHealth> indices = new HashMap<>(indicesSize);
if ("indices".equals(level) || "shards".equals(level)) {
for (int i = 0; i < indicesSize; i++) {
String indexName = randomAlphaOfLengthBetween(1, 5) + i;
indices.put(indexName, ClusterIndexHealthTests.randomIndexHealth(indexName, level));
}
}
ClusterStateHealth stateHealth = new ClusterStateHealth(randomInt(100), randomInt(100), randomInt(100),
randomInt(100), randomInt(100), randomInt(100), randomInt(100), hasDiscoveredMaster,
randomDoubleBetween(0d, 100d, true), randomFrom(ClusterHealthStatus.values()), indices);
//Create the Cluster Health Response object with discovered master as false,
//to verify serialization puts default value for the field
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("test-cluster", randomInt(100), randomInt(100),
randomInt(100), TimeValue.timeValueMillis(randomInt(10000)), randomBoolean(), stateHealth);
BytesStreamOutput out_lt_1_0 = new BytesStreamOutput();
Version old_version = VersionUtils.randomVersionBetween(random(), LegacyESVersion.V_6_0_0, LegacyESVersion.V_6_8_0);
out_lt_1_0.setVersion(old_version);
clusterHealth.writeTo(out_lt_1_0);
BytesStreamOutput out_gt_1_0 = new BytesStreamOutput();
Version new_version = VersionUtils.randomVersionBetween(random(), Version.V_1_0_0, Version.CURRENT);
out_gt_1_0.setVersion(new_version);
clusterHealth.writeTo(out_gt_1_0);
//The serialized output byte stream will not be same; and different by a boolean field "discovered_master"
assertNotEquals(out_lt_1_0.size(), out_gt_1_0.size());
assertThat(out_gt_1_0.size() - out_lt_1_0.size(), Matchers.equalTo(1));
//Input stream constructed from Version 6_8 or less will not have field "discovered_master";
//hence fallback to default as no value retained
StreamInput in_lt_6_8 = out_lt_1_0.bytes().streamInput();
in_lt_6_8.setVersion(old_version);
clusterHealth = ClusterHealthResponse.readResponseFrom(in_lt_6_8);
assertThat(clusterHealth.hasDiscoveredMaster(), Matchers.equalTo(true));
//Input stream constructed from Version 7_0 and above will have field "discovered_master"; hence value will be retained
StreamInput in_gt_7_0 = out_gt_1_0.bytes().streamInput();
in_gt_7_0.setVersion(new_version);
clusterHealth = ClusterHealthResponse.readResponseFrom(in_gt_7_0);
assertThat(clusterHealth.hasDiscoveredMaster(), Matchers.equalTo(hasDiscoveredMaster));
}
ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException {
@ -119,6 +194,23 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
return clusterHealth;
}
public void testParseFromXContentWithDiscoveredMasterField() throws IOException {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, "{\"cluster_name\":\"535799904437:7-1-3-node\",\"status\":\"green\"," +
"\"timed_out\":false,\"number_of_nodes\":6,\"number_of_data_nodes\":3,\"discovered_master\":false," +
"\"active_primary_shards\":4,\"active_shards\":5,\"relocating_shards\":0,\"initializing_shards\":0," +
"\"unassigned_shards\":0,\"delayed_unassigned_shards\":0,\"number_of_pending_tasks\":0," +
"\"number_of_in_flight_fetch\":0,\"task_max_waiting_in_queue_millis\":0," +
"\"active_shards_percent_as_number\":100}")) {
ClusterHealthResponse clusterHealth = ClusterHealthResponse.fromXContent(parser);
assertNotNull(clusterHealth);
assertThat(clusterHealth.getClusterName(), Matchers.equalTo("535799904437:7-1-3-node"));
assertThat(clusterHealth.getNumberOfNodes(), Matchers.equalTo(6));
assertThat(clusterHealth.hasDiscoveredMaster(), Matchers.equalTo(false));
}
}
@Override
protected ClusterHealthResponse doParseInstance(XContentParser parser) {
return ClusterHealthResponse.fromXContent(parser);
@ -135,7 +227,7 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
}
}
ClusterStateHealth stateHealth = new ClusterStateHealth(randomInt(100), randomInt(100), randomInt(100),
randomInt(100), randomInt(100), randomInt(100), randomInt(100),
randomInt(100), randomInt(100), randomInt(100), randomInt(100), randomBoolean(),
randomDoubleBetween(0d, 100d, true), randomFrom(ClusterHealthStatus.values()), indices);
return new ClusterHealthResponse(randomAlphaOfLengthBetween(1, 10), randomInt(100), randomInt(100), randomInt(100),
@ -205,8 +297,8 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
ClusterStateHealth state = instance.getClusterStateHealth();
ClusterStateHealth newState = new ClusterStateHealth(state.getActivePrimaryShards() + between(1, 10),
state.getActiveShards(), state.getRelocatingShards(), state.getInitializingShards(), state.getUnassignedShards(),
state.getNumberOfNodes(), state.getNumberOfDataNodes(), state.getActiveShardsPercent(), state.getStatus(),
state.getIndices());
state.getNumberOfNodes(), state.getNumberOfDataNodes(), state.hasDiscoveredMaster(), state.getActiveShardsPercent(),
state.getStatus(), state.getIndices());
return new ClusterHealthResponse(instance.getClusterName(),
instance.getNumberOfPendingTasks(), instance.getNumberOfInFlightFetch(),
instance.getDelayedUnassignedShards(), instance.getTaskMaxWaitingTime(),

View File

@ -190,6 +190,7 @@ public class ClusterStateHealthTests extends OpenSearchTestCase {
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable.build())
.nodes(clusterService.state().nodes())
.build();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(
clusterState, IndicesOptions.strictExpand(), (String[]) null
@ -197,6 +198,7 @@ public class ClusterStateHealthTests extends OpenSearchTestCase {
ClusterStateHealth clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
logger.info("cluster status: {}, expected {}", clusterStateHealth.getStatus(), counter.status());
clusterStateHealth = maybeSerialize(clusterStateHealth);
assertThat(clusterStateHealth.hasDiscoveredMaster(), equalTo(clusterService.state().nodes().getMasterNodeId() != null));
assertClusterHealth(clusterStateHealth, counter);
}