Update node attribute check to version update (1.2) check for shard indexing pressure serialization. (#1395)

This commit adds an explicit version check for shard indexing pressure 
serialization. This is required to not mandate test have the cluster service 
initialized while asserting node attributes.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
Saurabh Singh 2021-10-20 22:48:46 +05:30 committed by GitHub
parent 574e42c31b
commit 284968bb85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 6 additions and 26 deletions

View File

@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.node.stats;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
@ -43,7 +44,6 @@ import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.indices.NodeIndicesStats;
@ -152,7 +152,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
} else {
indexingPressureStats = null;
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);
} else {
shardIndexingPressureStats = null;
@ -337,7 +337,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_9_0)) {
out.writeOptionalWriteable(indexingPressureStats);
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeOptionalWriteable(shardIndexingPressureStats);
}
}

View File

@ -33,10 +33,10 @@
package org.opensearch.action.admin.indices.stats;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.ShardIndexingPressureSettings;
import java.io.IOException;
import java.util.Collections;
@ -83,7 +83,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
}
@ -105,7 +105,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
out.writeBoolean(includeUnloadedSegments);
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
}

View File

@ -5,15 +5,11 @@
package org.opensearch.index;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import java.util.Iterator;
import java.util.Objects;
/**
* This class contains all the settings which are required and owned by {TODO link ShardIndexingPressure}. These will be
* referenced/used in ShardIndexingPressure, as well as its dependent components, i.e.
@ -69,10 +65,8 @@ public final class ShardIndexingPressureSettings {
private volatile int requestSizeWindow;
private volatile double shardMinLimit;
private final long primaryAndCoordinatingNodeLimits;
private static ClusterService clusterService;
public ShardIndexingPressureSettings(ClusterService clusterService, Settings settings, long primaryAndCoordinatingLimits) {
ShardIndexingPressureSettings.clusterService = clusterService;
ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.shardIndexingPressureEnabled = SHARD_INDEXING_PRESSURE_ENABLED.get(settings);
@ -92,20 +86,6 @@ public final class ShardIndexingPressureSettings {
clusterSettings.addSettingsUpdateConsumer(SHARD_MIN_LIMIT, this::setShardMinLimit);
}
public static boolean isShardIndexingPressureAttributeEnabled() {
// Null check is required only for bwc tests which start node without initializing cluster service.
// In actual run time, clusterService will never be null.
if (Objects.nonNull(clusterService) && clusterService.getClusterApplierService().isInitialClusterStateSet()) {
Iterator<DiscoveryNode> nodes = clusterService.state().getNodes().getNodes().valuesIt();
while (nodes.hasNext()) {
if (Boolean.parseBoolean(nodes.next().getAttributes().get(SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY)) == false) {
return false;
}
}
}
return true;
}
private void setShardIndexingPressureEnabled(Boolean shardIndexingPressureEnableValue) {
this.shardIndexingPressureEnabled = shardIndexingPressureEnableValue;
}