mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-12 07:55:24 +00:00
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
parent
457c1cd6ec
commit
84e0f1ea79
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Copyright OpenSearch Contributors.
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package org.opensearch.index;
|
||||
|
||||
import org.opensearch.cluster.node.DiscoveryNode;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Setting;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* This class contains all the settings which are required and owned by {TODO link ShardIndexingPressure}. These will be
|
||||
* referenced/used in ShardIndexingPressure, as well as its dependent components, i.e.
|
||||
* {TODO link ShardIndexingPressureMemoryManager} and {TODO link ShardIndexingPressureStore}
|
||||
*/
|
||||
public final class ShardIndexingPressureSettings {
|
||||
|
||||
public static final String SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY = "shard_indexing_pressure_enabled";
|
||||
|
||||
public static final Setting<Boolean> SHARD_INDEXING_PRESSURE_ENABLED =
|
||||
Setting.boolSetting("shard_indexing_pressure.enabled", false, Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
|
||||
/**
|
||||
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set to true, shard level
|
||||
* rejection will be performed, otherwise only rejection metrics will be populated.
|
||||
*/
|
||||
public static final Setting<Boolean> SHARD_INDEXING_PRESSURE_ENFORCED =
|
||||
Setting.boolSetting("shard_indexing_pressure.enforced", false, Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
|
||||
/**
|
||||
* This represents the window size of last N request sampled and considered for throughput evaluation.
|
||||
*/
|
||||
public static final Setting<Integer> REQUEST_SIZE_WINDOW =
|
||||
Setting.intSetting("shard_indexing_pressure.secondary_parameter.throughput.request_size_window", 2000,
|
||||
Setting.Property.NodeScope, Setting.Property.Dynamic);
|
||||
|
||||
/**
|
||||
* This represents the base limit set for the utilization of every shard. Will be initilized as 1/1000th bytes of node limits.
|
||||
*/
|
||||
public static final Setting<Double> SHARD_MIN_LIMIT =
|
||||
Setting.doubleSetting("shard_indexing_pressure.primary_parameter.shard.min_limit", 0.001d, 0.0d,
|
||||
Setting.Property.NodeScope, Setting.Property.Dynamic);
|
||||
|
||||
private volatile boolean shardIndexingPressureEnabled;
|
||||
private volatile boolean shardIndexingPressureEnforced;
|
||||
private volatile long shardPrimaryAndCoordinatingBaseLimits;
|
||||
private volatile long shardReplicaBaseLimits;
|
||||
private volatile int requestSizeWindow;
|
||||
private volatile double shardMinLimit;
|
||||
private final long primaryAndCoordinatingNodeLimits;
|
||||
private static ClusterService clusterService;
|
||||
|
||||
public ShardIndexingPressureSettings(ClusterService clusterService, Settings settings, long primaryAndCoordinatingLimits) {
|
||||
ShardIndexingPressureSettings.clusterService = clusterService;
|
||||
ClusterSettings clusterSettings = clusterService.getClusterSettings();
|
||||
|
||||
this.shardIndexingPressureEnabled = SHARD_INDEXING_PRESSURE_ENABLED.get(settings);
|
||||
clusterSettings.addSettingsUpdateConsumer(SHARD_INDEXING_PRESSURE_ENABLED, this::setShardIndexingPressureEnabled);
|
||||
|
||||
this.shardIndexingPressureEnforced = SHARD_INDEXING_PRESSURE_ENFORCED.get(settings);
|
||||
clusterSettings.addSettingsUpdateConsumer(SHARD_INDEXING_PRESSURE_ENFORCED, this::setShardIndexingPressureEnforced);
|
||||
|
||||
this.requestSizeWindow = REQUEST_SIZE_WINDOW.get(settings).intValue();
|
||||
clusterSettings.addSettingsUpdateConsumer(REQUEST_SIZE_WINDOW, this::setRequestSizeWindow);
|
||||
|
||||
this.primaryAndCoordinatingNodeLimits = primaryAndCoordinatingLimits;
|
||||
|
||||
this.shardMinLimit = SHARD_MIN_LIMIT.get(settings).floatValue();
|
||||
this.shardPrimaryAndCoordinatingBaseLimits = (long) (primaryAndCoordinatingLimits * shardMinLimit);
|
||||
this.shardReplicaBaseLimits = (long) (shardPrimaryAndCoordinatingBaseLimits * 1.5);
|
||||
clusterSettings.addSettingsUpdateConsumer(SHARD_MIN_LIMIT, this::setShardMinLimit);
|
||||
}
|
||||
|
||||
public static boolean isShardIndexingPressureAttributeEnabled() {
|
||||
Iterator<DiscoveryNode> nodes = clusterService.state().getNodes().getNodes().valuesIt();
|
||||
while (nodes.hasNext()) {
|
||||
if (Boolean.parseBoolean(nodes.next().getAttributes().get(SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY)) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void setShardIndexingPressureEnabled(Boolean shardIndexingPressureEnableValue) {
|
||||
this.shardIndexingPressureEnabled = shardIndexingPressureEnableValue;
|
||||
}
|
||||
|
||||
private void setShardIndexingPressureEnforced(Boolean shardIndexingPressureEnforcedValue) {
|
||||
this.shardIndexingPressureEnforced = shardIndexingPressureEnforcedValue;
|
||||
}
|
||||
|
||||
private void setRequestSizeWindow(int requestSizeWindow) {
|
||||
this.requestSizeWindow = requestSizeWindow;
|
||||
}
|
||||
|
||||
private void setShardMinLimit(double shardMinLimit) {
|
||||
this.shardMinLimit = shardMinLimit;
|
||||
|
||||
//Updating the dependent value once when the dynamic settings update
|
||||
this.setShardPrimaryAndCoordinatingBaseLimits();
|
||||
this.setShardReplicaBaseLimits();
|
||||
}
|
||||
|
||||
private void setShardPrimaryAndCoordinatingBaseLimits() {
|
||||
shardPrimaryAndCoordinatingBaseLimits = (long) (primaryAndCoordinatingNodeLimits * shardMinLimit);
|
||||
}
|
||||
|
||||
private void setShardReplicaBaseLimits() {
|
||||
shardReplicaBaseLimits = (long) (shardPrimaryAndCoordinatingBaseLimits * 1.5);
|
||||
}
|
||||
|
||||
public boolean isShardIndexingPressureEnabled() {
|
||||
return shardIndexingPressureEnabled;
|
||||
}
|
||||
|
||||
public boolean isShardIndexingPressureEnforced() {
|
||||
return shardIndexingPressureEnforced;
|
||||
}
|
||||
|
||||
public int getRequestSizeWindow() {
|
||||
return requestSizeWindow;
|
||||
}
|
||||
|
||||
public long getShardPrimaryAndCoordinatingBaseLimits() {
|
||||
return shardPrimaryAndCoordinatingBaseLimits;
|
||||
}
|
||||
|
||||
public long getShardReplicaBaseLimits() {
|
||||
return shardReplicaBaseLimits;
|
||||
}
|
||||
|
||||
public long getNodePrimaryAndCoordinatingLimits() {
|
||||
return primaryAndCoordinatingNodeLimits;
|
||||
}
|
||||
|
||||
public long getNodeReplicaLimits() {
|
||||
return (long) (primaryAndCoordinatingNodeLimits * 1.5);
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.index;
|
||||
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
|
||||
public class ShardIndexingPressureSettingsTests extends OpenSearchTestCase {
|
||||
|
||||
private final Settings settings = Settings.builder()
|
||||
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10MB")
|
||||
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
|
||||
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
|
||||
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 2000)
|
||||
.put(ShardIndexingPressureSettings.SHARD_MIN_LIMIT.getKey(), 0.001d)
|
||||
.build();
|
||||
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
final ClusterService clusterService = new ClusterService(settings, clusterSettings, null);
|
||||
|
||||
public void testFromSettings() {
|
||||
ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings(clusterService, settings,
|
||||
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());
|
||||
|
||||
assertTrue(shardIndexingPressureSettings.isShardIndexingPressureEnabled());
|
||||
assertTrue(shardIndexingPressureSettings.isShardIndexingPressureEnforced());
|
||||
assertEquals(2000, shardIndexingPressureSettings.getRequestSizeWindow());
|
||||
|
||||
// Node level limits
|
||||
long nodePrimaryAndCoordinatingLimits = shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits();
|
||||
long tenMB = 10 * 1024 * 1024;
|
||||
assertEquals(tenMB, nodePrimaryAndCoordinatingLimits);
|
||||
assertEquals((long)(tenMB * 1.5), shardIndexingPressureSettings.getNodeReplicaLimits());
|
||||
|
||||
// Shard Level Limits
|
||||
long shardPrimaryAndCoordinatingBaseLimits = (long) (nodePrimaryAndCoordinatingLimits * 0.001d);
|
||||
assertEquals(shardPrimaryAndCoordinatingBaseLimits, shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits());
|
||||
assertEquals((long)(shardPrimaryAndCoordinatingBaseLimits * 1.5),
|
||||
shardIndexingPressureSettings.getShardReplicaBaseLimits());
|
||||
}
|
||||
|
||||
public void testUpdateSettings() {
|
||||
ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings(clusterService, settings,
|
||||
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());
|
||||
|
||||
Settings.Builder updated = Settings.builder();
|
||||
clusterSettings.updateDynamicSettings(Settings.builder()
|
||||
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false)
|
||||
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false)
|
||||
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 4000)
|
||||
.put(ShardIndexingPressureSettings.SHARD_MIN_LIMIT.getKey(), 0.003d)
|
||||
.build(),
|
||||
Settings.builder().put(settings), updated, getTestClass().getName());
|
||||
clusterSettings.applySettings(updated.build());
|
||||
|
||||
assertFalse(shardIndexingPressureSettings.isShardIndexingPressureEnabled());
|
||||
assertFalse(shardIndexingPressureSettings.isShardIndexingPressureEnforced());
|
||||
assertEquals(4000, shardIndexingPressureSettings.getRequestSizeWindow());
|
||||
|
||||
// Node level limits
|
||||
long nodePrimaryAndCoordinatingLimits = shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits();
|
||||
long tenMB = 10 * 1024 * 1024;
|
||||
assertEquals(tenMB, nodePrimaryAndCoordinatingLimits);
|
||||
assertEquals((long)(tenMB * 1.5), shardIndexingPressureSettings.getNodeReplicaLimits());
|
||||
|
||||
// Shard Level Limits
|
||||
long shardPrimaryAndCoordinatingBaseLimits = (long) (nodePrimaryAndCoordinatingLimits * 0.003d);
|
||||
assertEquals(shardPrimaryAndCoordinatingBaseLimits, shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits());
|
||||
assertEquals((long)(shardPrimaryAndCoordinatingBaseLimits * 1.5),
|
||||
shardIndexingPressureSettings.getShardReplicaBaseLimits());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user