Update node attribute check to version update (1.2) check for shard indexing pressure serialization. (#1398)

This is required to not mandate test have the cluster service initialized while asserting node attributes.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>

Co-authored-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
Saurabh Singh 2021-10-21 18:50:52 +05:30 committed by GitHub
parent e7753becf4
commit b792750b73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 6 additions and 26 deletions

View File

@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.node.stats; package org.opensearch.action.admin.cluster.node.stats;
import org.opensearch.LegacyESVersion; import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodeRole;
@ -43,7 +44,6 @@ import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats; import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats; import org.opensearch.http.HttpStats;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.indices.NodeIndicesStats; import org.opensearch.indices.NodeIndicesStats;
@ -152,7 +152,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
} else { } else {
indexingPressureStats = null; indexingPressureStats = null;
} }
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) { if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new); shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);
} else { } else {
shardIndexingPressureStats = null; shardIndexingPressureStats = null;
@ -337,7 +337,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_9_0)) { if (out.getVersion().onOrAfter(LegacyESVersion.V_7_9_0)) {
out.writeOptionalWriteable(indexingPressureStats); out.writeOptionalWriteable(indexingPressureStats);
} }
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) { if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeOptionalWriteable(shardIndexingPressureStats); out.writeOptionalWriteable(shardIndexingPressureStats);
} }
} }

View File

@ -33,10 +33,10 @@
package org.opensearch.action.admin.indices.stats; package org.opensearch.action.admin.indices.stats;
import org.opensearch.LegacyESVersion; import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.ShardIndexingPressureSettings;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
@ -83,7 +83,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) { if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean(); includeUnloadedSegments = in.readBoolean();
} }
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) { if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
includeAllShardIndexingPressureTrackers = in.readBoolean(); includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean(); includeOnlyTopIndexingPressureMetrics = in.readBoolean();
} }
@ -105,7 +105,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) { if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
out.writeBoolean(includeUnloadedSegments); out.writeBoolean(includeUnloadedSegments);
} }
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) { if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeBoolean(includeAllShardIndexingPressureTrackers); out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics); out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
} }

View File

@ -5,15 +5,11 @@
package org.opensearch.index; package org.opensearch.index;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import java.util.Iterator;
import java.util.Objects;
/** /**
* This class contains all the settings which are required and owned by {TODO link ShardIndexingPressure}. These will be * This class contains all the settings which are required and owned by {TODO link ShardIndexingPressure}. These will be
* referenced/used in ShardIndexingPressure, as well as its dependent components, i.e. * referenced/used in ShardIndexingPressure, as well as its dependent components, i.e.
@ -69,10 +65,8 @@ public final class ShardIndexingPressureSettings {
private volatile int requestSizeWindow; private volatile int requestSizeWindow;
private volatile double shardMinLimit; private volatile double shardMinLimit;
private final long primaryAndCoordinatingNodeLimits; private final long primaryAndCoordinatingNodeLimits;
private static ClusterService clusterService;
public ShardIndexingPressureSettings(ClusterService clusterService, Settings settings, long primaryAndCoordinatingLimits) { public ShardIndexingPressureSettings(ClusterService clusterService, Settings settings, long primaryAndCoordinatingLimits) {
ShardIndexingPressureSettings.clusterService = clusterService;
ClusterSettings clusterSettings = clusterService.getClusterSettings(); ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.shardIndexingPressureEnabled = SHARD_INDEXING_PRESSURE_ENABLED.get(settings); this.shardIndexingPressureEnabled = SHARD_INDEXING_PRESSURE_ENABLED.get(settings);
@ -92,20 +86,6 @@ public final class ShardIndexingPressureSettings {
clusterSettings.addSettingsUpdateConsumer(SHARD_MIN_LIMIT, this::setShardMinLimit); clusterSettings.addSettingsUpdateConsumer(SHARD_MIN_LIMIT, this::setShardMinLimit);
} }
public static boolean isShardIndexingPressureAttributeEnabled() {
// Null check is required only for bwc tests which start node without initializing cluster service.
// In actual run time, clusterService will never be null.
if (Objects.nonNull(clusterService) && clusterService.getClusterApplierService().isInitialClusterStateSet()) {
Iterator<DiscoveryNode> nodes = clusterService.state().getNodes().getNodes().valuesIt();
while (nodes.hasNext()) {
if (Boolean.parseBoolean(nodes.next().getAttributes().get(SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY)) == false) {
return false;
}
}
}
return true;
}
private void setShardIndexingPressureEnabled(Boolean shardIndexingPressureEnableValue) { private void setShardIndexingPressureEnabled(Boolean shardIndexingPressureEnableValue) {
this.shardIndexingPressureEnabled = shardIndexingPressureEnableValue; this.shardIndexingPressureEnabled = shardIndexingPressureEnableValue;
} }