Add Shard Indexing Pressure Memory Manager (#478) (#945)

It introduces a Memory Manager for Shard Indexing Pressure. It is responsible for increasing and decreasing the allocated shard limit based on incoming requests, and validate the current values against the thresholds.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
Saurabh Singh 2021-07-20 05:58:05 +05:30 committed by Rabi Panda
parent 5bbbad34d2
commit 4d16faee5f
4 changed files with 1047 additions and 8 deletions

View File

@ -34,6 +34,12 @@ package org.opensearch.common.settings;
import org.apache.logging.log4j.LogManager;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
@ -101,11 +107,6 @@ import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
@ -592,7 +593,14 @@ public final class ClusterSettings extends AbstractScopedSettings {
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,
ShardIndexingPressureSettings.SHARD_MIN_LIMIT,
ShardIndexingPressureStore.MAX_COLD_STORE_SIZE)));
ShardIndexingPressureStore.MAX_COLD_STORE_SIZE,
ShardIndexingPressureMemoryManager.LOWER_OPERATING_FACTOR,
ShardIndexingPressureMemoryManager.OPTIMAL_OPERATING_FACTOR,
ShardIndexingPressureMemoryManager.UPPER_OPERATING_FACTOR,
ShardIndexingPressureMemoryManager.NODE_SOFT_LIMIT,
ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS,
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS)));
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,

View File

@ -0,0 +1,457 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.ShardIndexingPressureTracker.OperationTracker;
import org.opensearch.index.ShardIndexingPressureTracker.PerformanceTracker;
import org.opensearch.index.ShardIndexingPressureTracker.RejectionTracker;
import org.opensearch.index.ShardIndexingPressureTracker.StatsTracker;
import org.opensearch.index.shard.ShardId;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
/**
* The Shard Indexing Pressure Memory Manager is the construct responsible for increasing and decreasing the allocated shard limit
* based on incoming requests. A shard limits defines the maximum memory that a shard can occupy in the heap for request objects.
*
* Based on the overall memory utilization on the node, and current traffic needs shard limits will be modified:
*
* 1. If the limits assigned to a shard is breached (Primary Parameter) while the node level overall occupancy across all shards
* is not greater than primary_parameter.node.soft_limit, MemoryManager will increase the shard limits without any deeper evaluation.
* 2. If the limits assigned to the shard is breached(Primary Parameter) and the node level overall occupancy across all shards
* is greater than primary_parameter.node.soft_limit, then MemoryManager will evaluate deeper parameters for shards to identify any
* issues, such as throughput degradation (Secondary Parameter - 1) and time since last request was successful (Secondary Parameter - 2).
* This helps identify detect any duress state with the shard, requesting more memory.
*
* Secondary Parameters covered above:
* 1. ThroughputDegradationLimitsBreached - When the moving window throughput average has increased by a factor compared to
* the historical throughput average. If the factor by which it has increased is greater than the degradation limit threshold, this
* parameter is considered to be breached.
* 2. LastSuccessfulRequestDurationLimitsBreached - When the time since the last successful request completed is greater than the max
* timeout threshold value, while there a number of outstanding requests greater than the max outstanding requests then this parameter
* is considered to be breached.
*
* MemoryManager attempts to increase of decrease the shard limits in case the shard utilization goes below operating_factor.lower or
* goes above operating_factor.upper of current shard limits. MemoryManager attempts to update the new shard limit such that the new value
* remains withing the operating_factor.optimal range of current shard utilization.
*
*/
public class ShardIndexingPressureMemoryManager {
private static final Logger logger = LogManager.getLogger(ShardIndexingPressureMemoryManager.class);
/**
* Shard operating factor can be evaluated using currentShardBytes/shardLimits. Outcome of this expression is categorized as
* lower, optimal and upper boundary, and appropriate action is taken once the below defined threshold values are breached.
*/
public static final Setting<Double> LOWER_OPERATING_FACTOR =
Setting.doubleSetting("shard_indexing_pressure.operating_factor.lower", 0.75d, 0.0d,
Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<Double> OPTIMAL_OPERATING_FACTOR =
Setting.doubleSetting("shard_indexing_pressure.operating_factor.optimal", 0.85d, 0.0d,
Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<Double> UPPER_OPERATING_FACTOR =
Setting.doubleSetting("shard_indexing_pressure.operating_factor.upper", 0.95d, 0.0d,
Setting.Property.NodeScope, Setting.Property.Dynamic);
/**
* This determines the max time elapsed since any request was processed successfully. Appropriate action is taken
* once the below below defined threshold value is breached.
*/
public static final Setting<TimeValue> SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT =
Setting.positiveTimeSetting("shard_indexing_pressure.secondary_parameter.successful_request.elapsed_timeout",
TimeValue.timeValueMillis(300000), Setting.Property.NodeScope, Setting.Property.Dynamic);
/**
* This determines the max outstanding request that are yet to be processed successfully. Appropriate
* action is taken once the below defined threshold value is breached.
*/
public static final Setting<Integer> MAX_OUTSTANDING_REQUESTS =
Setting.intSetting("shard_indexing_pressure.secondary_parameter.successful_request.max_outstanding_requests",
100, Setting.Property.NodeScope, Setting.Property.Dynamic);
/**
* Degradation for a shard can be evaluated using average throughput of last N requests,
* where N being {@link ShardIndexingPressureSettings#REQUEST_SIZE_WINDOW}, divided by lifetime average throughput.
* Appropriate action is taken once the outcome of above expression breaches the below defined threshold value is breached.
*/
public static final Setting<Double> THROUGHPUT_DEGRADATION_LIMITS =
Setting.doubleSetting("shard_indexing_pressure.secondary_parameter.throughput.degradation_factor", 5.0d, 1.0d,
Setting.Property.NodeScope, Setting.Property.Dynamic);
/**
* The node level soft limit determines when the secondary parameters for shard is to be evaluated for degradation.
*/
public static final Setting<Double> NODE_SOFT_LIMIT =
Setting.doubleSetting("shard_indexing_pressure.primary_parameter.node.soft_limit", 0.7d, 0.0d,
Setting.Property.NodeScope, Setting.Property.Dynamic);
private final AtomicLong totalNodeLimitsBreachedRejections = new AtomicLong();
private final AtomicLong totalLastSuccessfulRequestLimitsBreachedRejections = new AtomicLong();
private final AtomicLong totalThroughputDegradationLimitsBreachedRejections = new AtomicLong();
private final ShardIndexingPressureSettings shardIndexingPressureSettings;
private final ShardIndexingPressureStore shardIndexingPressureStore;
private volatile double lowerOperatingFactor;
private volatile double optimalOperatingFactor;
private volatile double upperOperatingFactor;
private volatile TimeValue successfulRequestElapsedTimeout;
private volatile int maxOutstandingRequests;
private volatile double primaryAndCoordinatingThroughputDegradationLimits;
private volatile double replicaThroughputDegradationLimits;
private volatile double nodeSoftLimit;
public ShardIndexingPressureMemoryManager(ShardIndexingPressureSettings shardIndexingPressureSettings,
ClusterSettings clusterSettings, Settings settings) {
this.shardIndexingPressureSettings = shardIndexingPressureSettings;
this.shardIndexingPressureStore = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings);
this.lowerOperatingFactor = LOWER_OPERATING_FACTOR.get(settings).doubleValue();
clusterSettings.addSettingsUpdateConsumer(LOWER_OPERATING_FACTOR, this::setLowerOperatingFactor);
this.optimalOperatingFactor = OPTIMAL_OPERATING_FACTOR.get(settings).doubleValue();
clusterSettings.addSettingsUpdateConsumer(OPTIMAL_OPERATING_FACTOR, this::setOptimalOperatingFactor);
this.upperOperatingFactor = UPPER_OPERATING_FACTOR.get(settings).doubleValue();
clusterSettings.addSettingsUpdateConsumer(UPPER_OPERATING_FACTOR, this::setUpperOperatingFactor);
this.successfulRequestElapsedTimeout = SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.get(settings);
clusterSettings.addSettingsUpdateConsumer(SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, this::setSuccessfulRequestElapsedTimeout);
this.maxOutstandingRequests = MAX_OUTSTANDING_REQUESTS.get(settings).intValue();
clusterSettings.addSettingsUpdateConsumer(MAX_OUTSTANDING_REQUESTS, this::setMaxOutstandingRequests);
this.primaryAndCoordinatingThroughputDegradationLimits = THROUGHPUT_DEGRADATION_LIMITS.get(settings).doubleValue();
this.replicaThroughputDegradationLimits = this.primaryAndCoordinatingThroughputDegradationLimits * 1.5;
clusterSettings.addSettingsUpdateConsumer(THROUGHPUT_DEGRADATION_LIMITS, this::setThroughputDegradationLimits);
this.nodeSoftLimit = NODE_SOFT_LIMIT.get(settings).doubleValue();
clusterSettings.addSettingsUpdateConsumer(NODE_SOFT_LIMIT, this::setNodeSoftLimit);
}
/**
* Checks if the node level memory threshold is breached for coordinating operations.
*/
boolean isCoordinatingNodeLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes) {
if(nodeTotalBytes > this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) {
logger.debug("Node limits breached for coordinating operation [node_total_bytes={} , " +
"node_primary_and_coordinating_limits={}]", nodeTotalBytes,
this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits());
incrementNodeLimitBreachedRejectionCount(tracker.getCoordinatingOperationTracker().getRejectionTracker());
return true;
}
return false;
}
/**
* Checks if the shard level memory threshold is breached for coordinating operations.
*/
boolean isCoordinatingShardLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes, long requestStartTime) {
// Shard memory limits is breached when the current utilization is greater than operating_factor.upper limit.
long shardCombinedBytes = tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes();
long shardPrimaryAndCoordinatingLimits = tracker.getPrimaryAndCoordinatingLimits();
boolean shardMemoryLimitsBreached = ((double)shardCombinedBytes / shardPrimaryAndCoordinatingLimits) > this.upperOperatingFactor;
if(shardMemoryLimitsBreached) {
BooleanSupplier increaseShardLimitSupplier = () -> increaseShardLimits(tracker.getShardId(),
this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(),
() -> tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(),
tracker::getPrimaryAndCoordinatingLimits,
ShardIndexingPressureTracker::getPrimaryAndCoordinatingLimits,
tracker::compareAndSetPrimaryAndCoordinatingLimits);
return onShardLimitBreached(nodeTotalBytes, this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(),
requestStartTime, tracker.getCoordinatingOperationTracker(), increaseShardLimitSupplier);
} else {
return false;
}
}
/**
* Checks if the node level memory threshold is breached for primary operations.
*/
boolean isPrimaryNodeLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes) {
if(nodeTotalBytes > this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits()) {
logger.debug("Node limits breached for primary operation [node_total_bytes={}, " +
"node_primary_and_coordinating_limits={}]", nodeTotalBytes,
this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits());
incrementNodeLimitBreachedRejectionCount(tracker.getPrimaryOperationTracker().getRejectionTracker());
return true;
}
return false;
}
/**
* Checks if the shard level memory threshold is breached for primary operations.
*/
boolean isPrimaryShardLimitBreached(ShardIndexingPressureTracker tracker, long nodeTotalBytes, long requestStartTime) {
// Shard memory limits is breached when the current utilization is greater than operating_factor.upper limit.
long shardCombinedBytes = tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes();
long shardPrimaryAndCoordinatingLimits = tracker.getPrimaryAndCoordinatingLimits();
boolean shardMemoryLimitsBreached = ((double)shardCombinedBytes / shardPrimaryAndCoordinatingLimits) > this.upperOperatingFactor;
if(shardMemoryLimitsBreached) {
BooleanSupplier increaseShardLimitSupplier = () -> increaseShardLimits(tracker.getShardId(),
this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(),
() -> tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(),
tracker::getPrimaryAndCoordinatingLimits,
ShardIndexingPressureTracker::getPrimaryAndCoordinatingLimits,
tracker::compareAndSetPrimaryAndCoordinatingLimits);
return onShardLimitBreached(nodeTotalBytes, this.shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits(),
requestStartTime, tracker.getPrimaryOperationTracker(), increaseShardLimitSupplier);
} else {
return false;
}
}
/**
* Checks if the node level memory threshold is breached for replica operations.
*/
boolean isReplicaNodeLimitBreached(ShardIndexingPressureTracker tracker, long nodeReplicaBytes) {
if(nodeReplicaBytes > this.shardIndexingPressureSettings.getNodeReplicaLimits()) {
logger.debug("Node limits breached for replica operation [node_replica_bytes={} , " +
"node_replica_limits={}]", nodeReplicaBytes, this.shardIndexingPressureSettings.getNodeReplicaLimits());
incrementNodeLimitBreachedRejectionCount(tracker.getReplicaOperationTracker().getRejectionTracker());
return true;
}
return false;
}
/**
* Checks if the shard level memory threshold is breached for replica operations.
*/
boolean isReplicaShardLimitBreached(ShardIndexingPressureTracker tracker, long nodeReplicaBytes, long requestStartTime) {
// Shard memory limits is breached when the current utilization is greater than operating_factor.upper limit.
long shardReplicaBytes = tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes();
long shardReplicaLimits = tracker.getReplicaLimits();
final boolean shardMemoryLimitsBreached = ((double)shardReplicaBytes / shardReplicaLimits) > this.upperOperatingFactor;
if(shardMemoryLimitsBreached) {
BooleanSupplier increaseShardLimitSupplier = () -> increaseShardLimits(tracker.getShardId(),
this.shardIndexingPressureSettings.getNodeReplicaLimits(),
() -> tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes(),
tracker::getReplicaLimits,
ShardIndexingPressureTracker::getReplicaLimits,
tracker::compareAndSetReplicaLimits);
return onShardLimitBreached(nodeReplicaBytes, this.shardIndexingPressureSettings.getNodeReplicaLimits(),
requestStartTime, tracker.getReplicaOperationTracker(), increaseShardLimitSupplier);
} else {
return false;
}
}
void decreaseShardPrimaryAndCoordinatingLimits(ShardIndexingPressureTracker tracker) {
decreaseShardLimits(tracker.getShardId(),
() -> tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(),
tracker::getPrimaryAndCoordinatingLimits,
tracker::compareAndSetPrimaryAndCoordinatingLimits,
shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits());
}
void decreaseShardReplicaLimits(ShardIndexingPressureTracker tracker) {
decreaseShardLimits(tracker.getShardId(),
() -> tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes(),
tracker::getReplicaLimits,
tracker::compareAndSetReplicaLimits,
shardIndexingPressureSettings.getShardReplicaBaseLimits());
}
ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) {
return shardIndexingPressureStore.getShardIndexingPressureTracker(shardId);
}
long getTotalNodeLimitsBreachedRejections() {
return totalNodeLimitsBreachedRejections.get();
}
long getTotalLastSuccessfulRequestLimitsBreachedRejections() {
return totalLastSuccessfulRequestLimitsBreachedRejections.get();
}
long getTotalThroughputDegradationLimitsBreachedRejections() {
return totalThroughputDegradationLimitsBreachedRejections.get();
}
/**
* Verifies and returns true if the shard limit is hard-breached i.e. shard limit cannot be increased further. Otherwise
* increases the shard limit and returns false.
*/
private boolean onShardLimitBreached(long nodeTotalBytes, long nodeLimit, long requestStartTime, OperationTracker operationTracker,
BooleanSupplier increaseShardLimitSupplier) {
// Secondary Parameters (i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when
// the current node utilization is greater than primary_parameter.node.soft_limit of total node limits.
if(((double)nodeTotalBytes / nodeLimit) < this.nodeSoftLimit) {
boolean isShardLimitsIncreased = increaseShardLimitSupplier.getAsBoolean();
if (isShardLimitsIncreased == false) {
incrementNodeLimitBreachedRejectionCount(operationTracker.getRejectionTracker());
}
return !isShardLimitsIncreased;
} else {
boolean shardLastSuccessfulRequestDurationLimitsBreached =
evaluateLastSuccessfulRequestDurationLimitsBreached(operationTracker.getPerformanceTracker(), requestStartTime);
if (shardLastSuccessfulRequestDurationLimitsBreached) {
operationTracker.getRejectionTracker().incrementLastSuccessfulRequestLimitsBreachedRejections();
this.totalLastSuccessfulRequestLimitsBreachedRejections.incrementAndGet();
return true;
}
boolean shardThroughputDegradationLimitsBreached =
evaluateThroughputDegradationLimitsBreached(operationTracker.getPerformanceTracker(),
operationTracker.getStatsTracker(), primaryAndCoordinatingThroughputDegradationLimits);
if (shardThroughputDegradationLimitsBreached) {
operationTracker.getRejectionTracker().incrementThroughputDegradationLimitsBreachedRejections();
this.totalThroughputDegradationLimitsBreachedRejections.incrementAndGet();
return true;
}
boolean isShardLimitsIncreased = increaseShardLimitSupplier.getAsBoolean();
if (isShardLimitsIncreased == false) {
incrementNodeLimitBreachedRejectionCount(operationTracker.getRejectionTracker());
}
return !isShardLimitsIncreased;
}
}
private boolean increaseShardLimits(ShardId shardId, long nodeLimit,
LongSupplier shardCurrentBytesSupplier, LongSupplier shardLimitSupplier,
ToLongFunction<ShardIndexingPressureTracker> getShardLimitFunction,
BiPredicate<Long, Long> updateShardLimitPredicate) {
long currentShardLimit;
long newShardLimit;
do {
currentShardLimit = shardLimitSupplier.getAsLong();
long shardCurrentBytes = shardCurrentBytesSupplier.getAsLong();
if(((double)shardCurrentBytes / currentShardLimit) > this.upperOperatingFactor) {
newShardLimit = (long)(shardCurrentBytes / this.optimalOperatingFactor);
long totalShardLimitsExceptCurrentShard = this.shardIndexingPressureStore.getShardIndexingPressureHotStore()
.entrySet().stream()
.filter(entry -> (shardId != entry.getKey()))
.map(Map.Entry::getValue)
.mapToLong(getShardLimitFunction).sum();
if (totalShardLimitsExceptCurrentShard + newShardLimit > nodeLimit) {
logger.debug("Failed To Increase Shard Limit [shard_detail=[{}][{}}], " +
"shard_current_limit_bytes={}, " + "total_shard_limits_bytes_except_current_shard={}, " +
"expected_shard_limits_bytes={}]",
shardId.getIndexName(), shardId.id(), currentShardLimit, totalShardLimitsExceptCurrentShard, newShardLimit);
return false;
}
} else {
return true;
}
} while(!updateShardLimitPredicate.test(currentShardLimit, newShardLimit));
logger.debug("Increased Shard Limit [" +
"shard_detail=[{}][{}], old_shard_limit_bytes={}, " + "new_shard_limit_bytes={}]",
shardId.getIndexName(), shardId.id(), currentShardLimit, newShardLimit);
return true;
}
private void decreaseShardLimits(ShardId shardId, LongSupplier shardCurrentBytesSupplier, LongSupplier shardLimitSupplier,
BiPredicate<Long, Long> updateShardLimitPredicate, long shardBaseLimit) {
long currentShardLimit;
long newShardLimit;
do {
currentShardLimit = shardLimitSupplier.getAsLong();
long shardCurrentBytes = shardCurrentBytesSupplier.getAsLong();
newShardLimit = Math.max((long) (shardCurrentBytes / this.optimalOperatingFactor), shardBaseLimit);
if (((double)shardCurrentBytes / currentShardLimit) > this.lowerOperatingFactor) {
logger.debug("Shard Limits Already Decreased [" +
"shard_detail=[{}][{}], " + "current_shard_limit_bytes={}, " +
"expected_shard_limit_bytes={}]",
shardId.getIndexName(), shardId.id(), currentShardLimit, newShardLimit);
return;
}
} while(!updateShardLimitPredicate.test(currentShardLimit,newShardLimit));
logger.debug("Decreased Shard Limit [shard_detail=[{}][{}], " +
"old_shard_limit_bytes={}, new_shard_limit_bytes={}]",
shardId.getIndexName(), shardId.id(), currentShardLimit, newShardLimit);
}
/**
* This evaluation returns true if throughput of last N request divided by the total lifetime requests throughput is greater than
* the degradation limits threshold.
*/
private boolean evaluateThroughputDegradationLimitsBreached(PerformanceTracker performanceTracker, StatsTracker statsTracker,
double degradationLimits) {
double throughputMovingAverage = Double.longBitsToDouble(performanceTracker.getThroughputMovingAverage());
long throughputMovingQueueSize = performanceTracker.getThroughputMovingQueueSize();
double throughputHistoricalAverage = (double)statsTracker.getTotalBytes() / performanceTracker.getLatencyInMillis();
return throughputMovingAverage > 0 && throughputMovingQueueSize >= this.shardIndexingPressureSettings.getRequestSizeWindow() &&
throughputHistoricalAverage / throughputMovingAverage > degradationLimits;
}
/**
* This evaluation returns true if the difference in the current timestamp and last successful request timestamp is greater than
* the successful request elapsed-timeout threshold, and the total number of outstanding requests is greater than
* the maximum outstanding request-count threshold.
*/
private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) {
return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.millis() &&
performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests;
}
private void setLowerOperatingFactor(double lowerOperatingFactor) {
this.lowerOperatingFactor = lowerOperatingFactor;
}
private void setOptimalOperatingFactor(double optimalOperatingFactor) {
this.optimalOperatingFactor = optimalOperatingFactor;
}
private void setUpperOperatingFactor(double upperOperatingFactor) {
this.upperOperatingFactor = upperOperatingFactor;
}
private void setSuccessfulRequestElapsedTimeout(TimeValue successfulRequestElapsedTimeout) {
this.successfulRequestElapsedTimeout = successfulRequestElapsedTimeout;
}
private void setMaxOutstandingRequests(int maxOutstandingRequests) {
this.maxOutstandingRequests = maxOutstandingRequests;
}
private void setThroughputDegradationLimits(double throughputDegradationLimits) {
this.primaryAndCoordinatingThroughputDegradationLimits = throughputDegradationLimits;
this.replicaThroughputDegradationLimits = this.primaryAndCoordinatingThroughputDegradationLimits * 1.5;
}
private void setNodeSoftLimit(double nodeSoftLimit) {
this.nodeSoftLimit = nodeSoftLimit;
}
private void incrementNodeLimitBreachedRejectionCount(RejectionTracker rejectionTracker) {
rejectionTracker.incrementNodeLimitsBreachedRejections();
this.totalNodeLimitsBreachedRejections.incrementAndGet();
}
}

View File

@ -57,10 +57,18 @@ public class ShardIndexingPressureTracker {
return primaryAndCoordinatingLimits.get();
}
public boolean compareAndSetPrimaryAndCoordinatingLimits(long expectedValue, long newValue) {
return primaryAndCoordinatingLimits.compareAndSet(expectedValue, newValue);
}
public long getReplicaLimits() {
return replicaLimits.get();
}
public boolean compareAndSetReplicaLimits(long expectedValue, long newValue) {
return replicaLimits.compareAndSet(expectedValue, newValue);
}
public OperationTracker getCoordinatingOperationTracker() {
return coordinatingOperationTracker;
}
@ -116,10 +124,18 @@ public class ShardIndexingPressureTracker {
return currentBytes.get();
}
public long incrementCurrentBytes(long bytes) {
return currentBytes.addAndGet(bytes);
}
public long getTotalBytes() {
return totalBytes.get();
}
public long incrementTotalBytes(long bytes) {
return totalBytes.addAndGet(bytes);
}
public long getRequestCount() {
return requestCount.get();
}
@ -149,13 +165,25 @@ public class ShardIndexingPressureTracker {
return nodeLimitsBreachedRejections.get();
}
public long incrementNodeLimitsBreachedRejections() {
return nodeLimitsBreachedRejections.incrementAndGet();
}
public long getLastSuccessfulRequestLimitsBreachedRejections() {
return lastSuccessfulRequestLimitsBreachedRejections.get();
}
public long incrementLastSuccessfulRequestLimitsBreachedRejections() {
return lastSuccessfulRequestLimitsBreachedRejections.incrementAndGet();
}
public long getThroughputDegradationLimitsBreachedRejections() {
return throughputDegradationLimitsBreachedRejections.get();
}
public long incrementThroughputDegradationLimitsBreachedRejections() {
return throughputDegradationLimitsBreachedRejections.incrementAndGet();
}
}
/**
@ -170,7 +198,7 @@ public class ShardIndexingPressureTracker {
*/
public static class PerformanceTracker {
private final AtomicLong latencyInMillis = new AtomicLong();
private final AtomicLong lastSuccessfulRequestTimestamp = new AtomicLong();
private volatile long lastSuccessfulRequestTimestamp = 0;
private final AtomicLong totalOutstandingRequests = new AtomicLong();
/**
* Shard Window Throughput Tracker.
@ -184,18 +212,34 @@ public class ShardIndexingPressureTracker {
return latencyInMillis.get();
}
public long addLatencyInMillis(long latency) {
return latencyInMillis.addAndGet(latency);
}
public long getLastSuccessfulRequestTimestamp() {
return lastSuccessfulRequestTimestamp.get();
return lastSuccessfulRequestTimestamp;
}
public void updateLastSuccessfulRequestTimestamp(long timeStamp) {
lastSuccessfulRequestTimestamp = timeStamp;
}
public long getTotalOutstandingRequests() {
return totalOutstandingRequests.get();
}
public long incrementTotalOutstandingRequests() {
return totalOutstandingRequests.incrementAndGet();
}
public long getThroughputMovingAverage() {
return throughputMovingAverage.get();
}
public long updateThroughputMovingAverage(long newAvg) {
return throughputMovingAverage.getAndSet(newAvg);
}
public boolean addNewThroughout(Double newThroughput) {
return throughputMovingQueue.offer(newThroughput);
}
@ -224,6 +268,10 @@ public class ShardIndexingPressureTracker {
return currentCombinedCoordinatingAndPrimaryBytes.get();
}
public long incrementCurrentCombinedCoordinatingAndPrimaryBytes(long bytes) {
return currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
}
public long getTotalCombinedCoordinatingAndPrimaryBytes() {
return totalCombinedCoordinatingAndPrimaryBytes.get();
}

View File

@ -0,0 +1,526 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;
public class ShardIndexingPressureMemoryManagerTests extends OpenSearchTestCase {
private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), 20)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 2)
.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ShardIndexingPressureSettings shardIndexingPressureSettings =
new ShardIndexingPressureSettings(new ClusterService(settings, clusterSettings, null), settings,
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());
private final Index index = new Index("IndexName", "UUID");
private final ShardId shardId1 = new ShardId(index, 0);
private final ShardId shardId2 = new ShardId(index, 1);
private final ShardIndexingPressureMemoryManager memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings,
clusterSettings, settings);
public void testCoordinatingNodeLevelBreach() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
assertFalse(memoryManager.isCoordinatingNodeLimitBreached(tracker, 1 * 1024));
assertTrue(memoryManager.isCoordinatingNodeLimitBreached(tracker, 11 * 1024));
}
public void testPrimaryNodeLevelBreach() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
assertFalse(memoryManager.isPrimaryNodeLimitBreached(tracker, 1 * 1024));
assertTrue(memoryManager.isPrimaryNodeLimitBreached(tracker, 11 * 1024));
}
public void testReplicaNodeLevelBreach() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
assertFalse(memoryManager.isReplicaNodeLimitBreached(tracker, 1 * 1024));
assertTrue(memoryManager.isReplicaNodeLimitBreached(tracker, 16 * 1024));
}
public void testCoordinatingPrimaryShardLimitsNotBreached() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(1);
long requestStartTime = System.nanoTime();
assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, 1 * 1024, requestStartTime));
assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, 1 * 1024, requestStartTime));
}
public void testReplicaShardLimitsNotBreached() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(1);
long requestStartTime = System.nanoTime();
assertFalse(memoryManager.isReplicaShardLimitBreached(tracker, 1 * 1024, requestStartTime));
}
public void testCoordinatingPrimaryShardLimitsIncreasedAndSoftLimitNotBreached() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(10);
long baseLimit = tracker.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker, 1 * 1024, requestStartTime));
assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker, 1 * 1024, requestStartTime));
assertTrue(tracker.getPrimaryAndCoordinatingLimits() > baseLimit);
assertEquals(tracker.getPrimaryAndCoordinatingLimits(), (long)(baseLimit/0.85));
}
public void testReplicaShardLimitsIncreasedAndSoftLimitNotBreached() {
ShardIndexingPressureTracker tracker = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(15);
long baseLimit = tracker.getReplicaLimits();
long requestStartTime = System.nanoTime();
assertFalse(memoryManager.isReplicaShardLimitBreached(tracker, 1 * 1024, requestStartTime));
assertTrue(tracker.getReplicaLimits() > baseLimit);
assertEquals(tracker.getReplicaLimits(), (long)(baseLimit/0.85));
}
public void testCoordinatingPrimarySoftLimitNotBreachedAndNodeLevelRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(2, memoryManager.getTotalNodeLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitNotBreachedAndNodeLevelRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 10 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 10 * 1024, requestStartTime));
assertEquals(limit1, tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(1, memoryManager.getTotalNodeLimitsBreachedRejections());
}
public void testCoordinatingPrimarySoftLimitBreachedAndNodeLevelRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(2, memoryManager.getTotalNodeLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitBreachedAndNodeLevelRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime));
assertEquals(limit1, tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(1, memoryManager.getTotalNodeLimitsBreachedRejections());
}
public void testCoordinatingPrimarySoftLimitBreachedAndLastSuccessfulRequestLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(2, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitBreachedAndLastSuccessfulRequestLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
tracker1.getReplicaOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100);
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime));
assertEquals(limit1, tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(1, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections());
}
public void testCoordinatingPrimarySoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(1 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(0, tracker1.getCoordinatingOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(0, tracker1.getPrimaryOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertTrue(tracker1.getPrimaryAndCoordinatingLimits() > limit1);
assertEquals((long)(1 * 1024/0.85), tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(0, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitBreachedAndLessOutstandingRequestsAndNoLastSuccessfulRequestLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(2 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
tracker1.getReplicaOperationTracker().getPerformanceTracker().updateLastSuccessfulRequestTimestamp(requestStartTime - 100);
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
assertFalse(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime));
assertTrue(tracker1.getReplicaLimits() > limit1);
assertEquals((long)(2 * 1024/0.85), tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(0, tracker1.getReplicaOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker()
.getLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(0, memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections());
}
public void testCoordinatingPrimarySoftLimitBreachedAndThroughputDegradationLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getCoordinatingOperationTracker().getStatsTracker().incrementTotalBytes(60);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(1d);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(2d);
assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(60);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(1d);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(2d);
assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(2, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitBreachedAndThroughputDegradationLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
tracker1.getReplicaOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getReplicaOperationTracker().getStatsTracker().incrementTotalBytes(80);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(1d);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(2d);
assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime));
assertEquals(limit1, tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(1, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections());
}
public void testCoordinatingPrimarySoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNoThroughputDegradationLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(1 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateThroughputMovingAverage
(Double.doubleToLongBits(1d));
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getCoordinatingOperationTracker().getStatsTracker().incrementTotalBytes(60);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(1d);
assertFalse(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(0, tracker1.getCoordinatingOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(60);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(1d);
assertFalse(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(0, tracker1.getPrimaryOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertTrue(tracker1.getPrimaryAndCoordinatingLimits() > limit1);
assertEquals((long)(1 * 1024/0.85), tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(0, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitBreachedAndMovingAverageQueueNotBuildUpAndNThroughputDegradationLimitRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(2 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
tracker1.getReplicaOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getReplicaOperationTracker().getStatsTracker().incrementTotalBytes(80);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(1d);
assertFalse(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime));
assertTrue(tracker1.getReplicaLimits() > limit1);
assertEquals((long)(2 * 1024/0.85), tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(0, tracker1.getReplicaOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker()
.getThroughputDegradationLimitsBreachedRejections());
assertEquals(0, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, memoryManager.getTotalThroughputDegradationLimitsBreachedRejections());
}
public void testCoordinatingPrimarySoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(4 * 1024);
tracker2.compareAndSetPrimaryAndCoordinatingLimits(tracker2.getPrimaryAndCoordinatingLimits(), 6 * 1024);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
long limit2 = tracker2.getPrimaryAndCoordinatingLimits();
long requestStartTime = System.nanoTime();
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getCoordinatingOperationTracker().getStatsTracker().incrementTotalBytes(60);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getCoordinatingOperationTracker().getPerformanceTracker().addNewThroughout(1d);
assertTrue(memoryManager.isCoordinatingShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getCoordinatingOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
tracker1.getPrimaryOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getPrimaryOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(60);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getPrimaryOperationTracker().getPerformanceTracker().addNewThroughout(1d);
assertTrue(memoryManager.isPrimaryShardLimitBreached(tracker1, 8 * 1024, requestStartTime));
assertEquals(1, tracker1.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getPrimaryOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(limit1, tracker1.getPrimaryAndCoordinatingLimits());
assertEquals(limit2, tracker2.getPrimaryAndCoordinatingLimits());
assertEquals(2, memoryManager.getTotalNodeLimitsBreachedRejections());
}
public void testReplicaShardLimitsSoftLimitBreachedAndNoSecondaryParameterBreachedAndNodeLevelRejections() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
ShardIndexingPressureTracker tracker2 = memoryManager.getShardIndexingPressureTracker(shardId2);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(5 * 1024);
tracker2.compareAndSetReplicaLimits(tracker2.getReplicaLimits(), 12 * 1024);
long limit1 = tracker1.getReplicaLimits();
long limit2 = tracker2.getReplicaLimits();
long requestStartTime = System.nanoTime();
tracker1.getReplicaOperationTracker().getPerformanceTracker().updateThroughputMovingAverage(Double.doubleToLongBits(1d));
tracker1.getReplicaOperationTracker().getPerformanceTracker().incrementTotalOutstandingRequests();
tracker1.getReplicaOperationTracker().getStatsTracker().incrementTotalBytes(80);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addLatencyInMillis(10);
tracker1.getReplicaOperationTracker().getPerformanceTracker().addNewThroughout(1d);
assertTrue(memoryManager.isReplicaShardLimitBreached(tracker1, 12 * 1024, requestStartTime));
assertEquals(limit1, tracker1.getReplicaLimits());
assertEquals(limit2, tracker2.getReplicaLimits());
assertEquals(1, tracker1.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(0, tracker2.getReplicaOperationTracker().getRejectionTracker().getNodeLimitsBreachedRejections());
assertEquals(1, memoryManager.getTotalNodeLimitsBreachedRejections());
}
public void testDecreaseShardPrimaryAndCoordinatingLimitsToBaseLimit() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker1.compareAndSetPrimaryAndCoordinatingLimits(tracker1.getPrimaryAndCoordinatingLimits(), 1 * 1024);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(0);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker1);
assertTrue(tracker1.getPrimaryAndCoordinatingLimits() < limit1);
assertEquals(10, tracker1.getPrimaryAndCoordinatingLimits());
}
public void testDecreaseShardReplicaLimitsToBaseLimit() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker1.compareAndSetReplicaLimits(tracker1.getReplicaLimits(), 1 * 1024);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(0);
long limit1 = tracker1.getReplicaLimits();
memoryManager.decreaseShardReplicaLimits(tracker1);
assertTrue(tracker1.getReplicaLimits() < limit1);
assertEquals(15, tracker1.getReplicaLimits());
}
public void testDecreaseShardPrimaryAndCoordinatingLimits() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker1.compareAndSetPrimaryAndCoordinatingLimits(tracker1.getPrimaryAndCoordinatingLimits(), 1 * 1024);
tracker1.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(512);
long limit1 = tracker1.getPrimaryAndCoordinatingLimits();
memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker1);
assertTrue(tracker1.getPrimaryAndCoordinatingLimits() < limit1);
assertEquals((long)(512/0.85), tracker1.getPrimaryAndCoordinatingLimits());
}
public void testDecreaseShardReplicaLimits() {
ShardIndexingPressureTracker tracker1 = memoryManager.getShardIndexingPressureTracker(shardId1);
tracker1.compareAndSetReplicaLimits(tracker1.getReplicaLimits(), 1 * 1024);
tracker1.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(512);
long limit1 = tracker1.getReplicaLimits();
memoryManager.decreaseShardReplicaLimits(tracker1);
assertTrue(tracker1.getReplicaLimits() < limit1);
assertEquals((long)(512/0.85), tracker1.getReplicaLimits());
}
}