Add ShardIndexingPressure framework level construct and Stats (#1015)

* Add ShardIndexingPressure framework level construct and related Stats artefacts.
* Test and code refactoring for shard indexing pressure.
* Moved the average calculation logic to common memory manager util.
* Add wrapper for releasable in ShardIndexingPressure operations.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
Saurabh Singh 2021-08-12 09:51:01 +05:30 committed by Rabi Panda
parent 4d16faee5f
commit 7fbeb87f95
8 changed files with 2618 additions and 1 deletions

View File

@ -36,6 +36,7 @@ import org.opensearch.LegacyESVersion;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.ShardIndexingPressureSettings;
import java.io.IOException;
import java.util.Collections;
@ -53,6 +54,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
private String[] completionDataFields = null;
private boolean includeSegmentFileSizes = false;
private boolean includeUnloadedSegments = false;
private boolean includeAllShardIndexingPressureTrackers = false;
private boolean includeOnlyTopIndexingPressureMetrics = false;
/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
@ -80,6 +83,10 @@ public class CommonStatsFlags implements Writeable, Cloneable {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
includeUnloadedSegments = in.readBoolean();
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
includeAllShardIndexingPressureTrackers = in.readBoolean();
includeOnlyTopIndexingPressureMetrics = in.readBoolean();
}
}
@Override
@ -98,6 +105,10 @@ public class CommonStatsFlags implements Writeable, Cloneable {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) {
out.writeBoolean(includeUnloadedSegments);
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
out.writeBoolean(includeAllShardIndexingPressureTrackers);
out.writeBoolean(includeOnlyTopIndexingPressureMetrics);
}
}
/**
@ -111,6 +122,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}
@ -125,6 +138,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
completionDataFields = null;
includeSegmentFileSizes = false;
includeUnloadedSegments = false;
includeAllShardIndexingPressureTrackers = false;
includeOnlyTopIndexingPressureMetrics = false;
return this;
}
@ -198,10 +213,28 @@ public class CommonStatsFlags implements Writeable, Cloneable {
return this;
}
public CommonStatsFlags includeAllShardIndexingPressureTrackers(boolean includeAllShardPressureTrackers) {
this.includeAllShardIndexingPressureTrackers = includeAllShardPressureTrackers;
return this;
}
public CommonStatsFlags includeOnlyTopIndexingPressureMetrics(boolean includeOnlyTopIndexingPressureMetrics) {
this.includeOnlyTopIndexingPressureMetrics = includeOnlyTopIndexingPressureMetrics;
return this;
}
public boolean includeUnloadedSegments() {
return this.includeUnloadedSegments;
}
public boolean includeAllShardIndexingPressureTrackers() {
return this.includeAllShardIndexingPressureTrackers;
}
public boolean includeOnlyTopIndexingPressureMetrics() {
return this.includeOnlyTopIndexingPressureMetrics;
}
public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

View File

@ -0,0 +1,350 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.index;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.ShardIndexingPressureTracker.CommonOperationTracker;
import org.opensearch.index.ShardIndexingPressureTracker.OperationTracker;
import org.opensearch.index.ShardIndexingPressureTracker.PerformanceTracker;
import org.opensearch.index.ShardIndexingPressureTracker.RejectionTracker;
import org.opensearch.index.ShardIndexingPressureTracker.StatsTracker;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.stats.IndexingPressurePerShardStats;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Shard Indexing Pressure is a framework level artefact build on top of IndexingPressure to track incoming indexing request, per shard.
* The interfaces provided by this class will be used by Transport Action layers to start accounting for an incoming request.
* Interfaces returns Releasable which when triggered will release the acquired accounting tokens values and also
* perform necessary actions such as throughput evaluation once the request completes.
* Consumers of these interfaces are expected to trigger close on releasable, reliably for consistency.
*
* Overall ShardIndexingPressure provides:
* 1. Memory Accounting at shard level. This can be enabled/disabled based on dynamic setting.
* 2. Memory Accounting at Node level. Tracking is done using the IndexingPressure artefacts to support feature seamless toggling.
* 3. Interfaces to access the statistics for shard trackers.
*/
public class ShardIndexingPressure extends IndexingPressure {
private static final Logger logger = LogManager.getLogger(ShardIndexingPressure.class);
private final ShardIndexingPressureSettings shardIndexingPressureSettings;
private final ShardIndexingPressureMemoryManager memoryManager;
ShardIndexingPressure(Settings settings, ClusterService clusterService) {
super(settings);
shardIndexingPressureSettings = new ShardIndexingPressureSettings(clusterService, settings, primaryAndCoordinatingLimits);
ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, clusterSettings, settings);
}
public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }
long requestStartTime = System.nanoTime();
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
long nodeCombinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long nodeReplicaBytes = currentReplicaBytes.get();
long nodeTotalBytes = nodeCombinedBytes + nodeReplicaBytes;
long shardCombinedBytes = tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(bytes);
boolean shardLevelLimitBreached = false;
if (forceExecution == false) {
boolean nodeLevelLimitBreached = memoryManager.isCoordinatingNodeLimitBreached(tracker, nodeTotalBytes);
if (nodeLevelLimitBreached == false) {
shardLevelLimitBreached = memoryManager.isCoordinatingShardLimitBreached(tracker, nodeTotalBytes, requestStartTime);
}
if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
coordinatingRejections.getAndIncrement();
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes);
rejectShardRequest(tracker, bytes, nodeTotalBytes, shardCombinedBytes,
tracker.getCoordinatingOperationTracker().getRejectionTracker(), "coordinating");
}
}
currentCoordinatingBytes.addAndGet(bytes);
totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
totalCoordinatingBytes.addAndGet(bytes);
StatsTracker statsTracker = tracker.getCoordinatingOperationTracker().getStatsTracker();
statsTracker.incrementCurrentBytes(bytes);
markShardOperationStarted(statsTracker, tracker.getCoordinatingOperationTracker().getPerformanceTracker());
boolean isShadowModeBreach = shardLevelLimitBreached;
return wrapReleasable(() -> {
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
currentCoordinatingBytes.addAndGet(-bytes);
markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getCoordinatingOperationTracker(),
tracker.getCommonOperationTracker());
memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker);
tryReleaseTracker(tracker);
});
}
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) {
if(bytes == 0) { return () -> {}; }
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
currentPrimaryBytes.addAndGet(bytes);
totalPrimaryBytes.addAndGet(bytes);
tracker.getPrimaryOperationTracker().getStatsTracker().incrementCurrentBytes(bytes);
tracker.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(bytes);
return wrapReleasable(() -> {
currentPrimaryBytes.addAndGet(-bytes);
tracker.getPrimaryOperationTracker().getStatsTracker().incrementCurrentBytes(-bytes);
});
}
public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }
long requestStartTime = System.nanoTime();
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
long nodeCombinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long nodeReplicaBytes = currentReplicaBytes.get();
long nodeTotalBytes = nodeCombinedBytes + nodeReplicaBytes;
long shardCombinedBytes = tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(bytes);
boolean shardLevelLimitBreached = false;
if (forceExecution == false) {
boolean nodeLevelLimitBreached = memoryManager.isPrimaryNodeLimitBreached(tracker, nodeTotalBytes);
if (nodeLevelLimitBreached == false) {
shardLevelLimitBreached = memoryManager.isPrimaryShardLimitBreached(tracker, nodeTotalBytes, requestStartTime);
}
if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
primaryRejections.getAndIncrement();
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes);
rejectShardRequest(tracker, bytes, nodeTotalBytes, shardCombinedBytes,
tracker.getPrimaryOperationTracker().getRejectionTracker(), "primary");
}
}
currentPrimaryBytes.addAndGet(bytes);
totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
totalPrimaryBytes.addAndGet(bytes);
StatsTracker statsTracker = tracker.getPrimaryOperationTracker().getStatsTracker();
statsTracker.incrementCurrentBytes(bytes);
markShardOperationStarted(statsTracker, tracker.getPrimaryOperationTracker().getPerformanceTracker());
boolean isShadowModeBreach = shardLevelLimitBreached;
return wrapReleasable(() -> {
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
currentPrimaryBytes.addAndGet(-bytes);
markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getPrimaryOperationTracker(),
tracker.getCommonOperationTracker());
memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker);
tryReleaseTracker(tracker);
});
}
public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if(0 == bytes) { return () -> {}; }
long requestStartTime = System.nanoTime();
ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId);
long nodeReplicaBytes = currentReplicaBytes.addAndGet(bytes);
long shardReplicaBytes = tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(bytes);
boolean shardLevelLimitBreached = false;
if (forceExecution == false) {
boolean nodeLevelLimitBreached = memoryManager.isReplicaNodeLimitBreached(tracker, nodeReplicaBytes);
if (nodeLevelLimitBreached == false) {
shardLevelLimitBreached = memoryManager.isReplicaShardLimitBreached(tracker, nodeReplicaBytes, requestStartTime);
}
if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
replicaRejections.getAndIncrement();
currentReplicaBytes.addAndGet(-bytes);
tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(-bytes);
rejectShardRequest(tracker, bytes, nodeReplicaBytes, shardReplicaBytes,
tracker.getReplicaOperationTracker().getRejectionTracker(), "replica");
}
}
totalReplicaBytes.addAndGet(bytes);
StatsTracker statsTracker = tracker.getReplicaOperationTracker().getStatsTracker();
markShardOperationStarted(statsTracker, tracker.getReplicaOperationTracker().getPerformanceTracker());
boolean isShadowModeBreach = shardLevelLimitBreached;
return wrapReleasable(() -> {
currentReplicaBytes.addAndGet(-bytes);
markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getReplicaOperationTracker());
memoryManager.decreaseShardReplicaLimits(tracker);
tryReleaseTracker(tracker);
});
}
private static Releasable wrapReleasable(Releasable releasable) {
final AtomicBoolean called = new AtomicBoolean();
return () -> {
if (called.compareAndSet(false, true)) {
releasable.close();
} else {
logger.error("ShardIndexingPressure Release is called twice", new IllegalStateException("Releasable is called twice"));
assert false : "ShardIndexingPressure Release is called twice";
}
};
}
private boolean shouldRejectRequest(boolean nodeLevelLimitBreached, boolean shardLevelLimitBreached) {
return nodeLevelLimitBreached ||
(shardLevelLimitBreached && shardIndexingPressureSettings.isShardIndexingPressureEnforced());
}
private void markShardOperationStarted(StatsTracker statsTracker, PerformanceTracker performanceTracker) {
statsTracker.incrementRequestCount();
performanceTracker.incrementTotalOutstandingRequests();
}
private void adjustPerformanceUponCompletion(long bytes, long requestStartTime, StatsTracker statsTracker,
PerformanceTracker performanceTracker) {
long requestEndTime = System.nanoTime();
long requestLatency = TimeUnit.NANOSECONDS.toMillis(requestEndTime - requestStartTime);
performanceTracker.addLatencyInMillis(requestLatency);
performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime);
performanceTracker.resetTotalOutstandingRequests();
if(requestLatency > 0) {
calculateRequestThroughput(bytes, requestLatency, performanceTracker, statsTracker);
}
}
private void calculateRequestThroughput(long bytes, long requestLatency, PerformanceTracker performanceTracker,
StatsTracker statsTracker) {
double requestThroughput = (double) bytes / requestLatency;
performanceTracker.addNewThroughout(requestThroughput);
if (performanceTracker.getThroughputMovingQueueSize() > shardIndexingPressureSettings.getRequestSizeWindow()) {
double front = performanceTracker.getFirstThroughput();
double movingAverage = memoryManager.calculateMovingAverage(performanceTracker.getThroughputMovingAverage(), front,
requestThroughput, shardIndexingPressureSettings.getRequestSizeWindow());
performanceTracker.updateThroughputMovingAverage(Double.doubleToLongBits(movingAverage));
} else {
double movingAverage = (double) statsTracker.getTotalBytes() / performanceTracker.getLatencyInMillis();
performanceTracker.updateThroughputMovingAverage(Double.doubleToLongBits(movingAverage));
}
}
private void markShardOperationComplete(long bytes, long requestStartTime, boolean isShadowModeBreach,
OperationTracker operationTracker, CommonOperationTracker commonOperationTracker) {
commonOperationTracker.incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes);
commonOperationTracker.incrementTotalCombinedCoordinatingAndPrimaryBytes(bytes);
markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, operationTracker);
}
private void markShardOperationComplete(long bytes, long requestStartTime, boolean isShadowModeBreach,
OperationTracker operationTracker) {
StatsTracker statsTracker = operationTracker.getStatsTracker();
statsTracker.incrementCurrentBytes(-bytes);
statsTracker.incrementTotalBytes(bytes);
// In shadow mode if request was intended to be rejected, we do not account it for dynamic rejection parameters
if (isShadowModeBreach == false) {
adjustPerformanceUponCompletion(bytes, requestStartTime, statsTracker, operationTracker.getPerformanceTracker());
}
}
private void tryReleaseTracker(ShardIndexingPressureTracker tracker) {
memoryManager.tryTrackerCleanupFromHotStore(tracker,
() -> (tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes() == 0 &&
tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes() == 0));
}
private void rejectShardRequest(ShardIndexingPressureTracker tracker, long bytes, long nodeTotalBytes, long shardTotalBytes,
RejectionTracker rejectionTracker, String operationType) {
long nodeBytesWithoutOperation = nodeTotalBytes - bytes;
long shardBytesWithoutOperation = shardTotalBytes - bytes;
ShardId shardId = tracker.getShardId();
rejectionTracker.incrementTotalRejections();
throw new OpenSearchRejectedExecutionException("rejected execution of " + operationType + " operation [" +
"shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "], " +
"shard_total_bytes=" + shardBytesWithoutOperation + ", " +
"shard_operation_bytes=" + bytes + ", " +
"shard_max_coordinating_and_primary_bytes=" + tracker.getPrimaryAndCoordinatingLimits() + ", " +
"shard_max_replica_bytes=" + tracker.getReplicaLimits() + "] OR [" +
"node_total_bytes=" + nodeBytesWithoutOperation + ", " +
"node_operation_bytes=" + bytes + ", " +
"node_max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + ", " +
"node_max_replica_bytes=" + replicaLimits + "]", false);
}
public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) {
if (statsFlags.includeOnlyTopIndexingPressureMetrics()) {
return topStats();
} else {
ShardIndexingPressureStats allStats = shardStats();
if (statsFlags.includeAllShardIndexingPressureTrackers()) {
allStats.addAll(coldStats());
}
return allStats;
}
}
ShardIndexingPressureStats shardStats() {
Map<ShardId, IndexingPressurePerShardStats> statsPerShard = new HashMap<>();
boolean isEnforcedMode = shardIndexingPressureSettings.isShardIndexingPressureEnforced();
for (Map.Entry<ShardId, ShardIndexingPressureTracker> shardEntry :
memoryManager.getShardIndexingPressureHotStore().entrySet()) {
IndexingPressurePerShardStats shardStats = new IndexingPressurePerShardStats(shardEntry.getValue(), isEnforcedMode);
statsPerShard.put(shardEntry.getKey(), shardStats);
}
return new ShardIndexingPressureStats(statsPerShard, memoryManager.getTotalNodeLimitsBreachedRejections(),
memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections(),
memoryManager.getTotalThroughputDegradationLimitsBreachedRejections(),
shardIndexingPressureSettings.isShardIndexingPressureEnabled(), isEnforcedMode);
}
ShardIndexingPressureStats coldStats() {
Map<ShardId, IndexingPressurePerShardStats> statsPerShard = new HashMap<>();
boolean isEnforcedMode = shardIndexingPressureSettings.isShardIndexingPressureEnforced();
for (Map.Entry<ShardId, ShardIndexingPressureTracker> shardEntry :
memoryManager.getShardIndexingPressureColdStore().entrySet()) {
IndexingPressurePerShardStats shardStats = new IndexingPressurePerShardStats(shardEntry.getValue(), isEnforcedMode);
statsPerShard.put(shardEntry.getKey(), shardStats);
}
return new ShardIndexingPressureStats(statsPerShard, memoryManager.getTotalNodeLimitsBreachedRejections(),
memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections(),
memoryManager.getTotalThroughputDegradationLimitsBreachedRejections(),
shardIndexingPressureSettings.isShardIndexingPressureEnabled(), isEnforcedMode);
}
ShardIndexingPressureStats topStats() {
return new ShardIndexingPressureStats(Collections.emptyMap(), memoryManager.getTotalNodeLimitsBreachedRejections(),
memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections(),
memoryManager.getTotalThroughputDegradationLimitsBreachedRejections(),
shardIndexingPressureSettings.isShardIndexingPressureEnabled(),
shardIndexingPressureSettings.isShardIndexingPressureEnforced());
}
ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) {
return memoryManager.getShardIndexingPressureTracker(shardId);
}
public boolean isShardIndexingPressureEnabled() {
return shardIndexingPressureSettings.isShardIndexingPressureEnabled();
}
}

View File

@ -282,6 +282,26 @@ public class ShardIndexingPressureMemoryManager {
return shardIndexingPressureStore.getShardIndexingPressureTracker(shardId);
}
Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureHotStore() {
return shardIndexingPressureStore.getShardIndexingPressureHotStore();
}
Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureColdStore() {
return shardIndexingPressureStore.getShardIndexingPressureColdStore();
}
void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, BooleanSupplier condition) {
shardIndexingPressureStore.tryTrackerCleanupFromHotStore(tracker, condition);
}
double calculateMovingAverage(long currentAverage, double frontValue, double currentValue, int count) {
if(count > 0) {
return ((Double.longBitsToDouble(currentAverage) * count) + currentValue - frontValue) / count;
} else {
return currentValue;
}
}
long getTotalNodeLimitsBreachedRejections() {
return totalNodeLimitsBreachedRejections.get();
}
@ -417,7 +437,7 @@ public class ShardIndexingPressureMemoryManager {
*/
private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) {
return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.millis() &&
(requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.nanos() &&
performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests;
}

View File

@ -139,6 +139,10 @@ public class ShardIndexingPressureTracker {
public long getRequestCount() {
return requestCount.get();
}
public long incrementRequestCount() {
return requestCount.incrementAndGet();
}
}
/**
@ -161,6 +165,10 @@ public class ShardIndexingPressureTracker {
return totalRejections.get();
}
public long incrementTotalRejections() {
return totalRejections.incrementAndGet();
}
public long getNodeLimitsBreachedRejections() {
return nodeLimitsBreachedRejections.get();
}
@ -232,6 +240,10 @@ public class ShardIndexingPressureTracker {
return totalOutstandingRequests.incrementAndGet();
}
public void resetTotalOutstandingRequests() {
totalOutstandingRequests.set(0L);
}
public long getThroughputMovingAverage() {
return throughputMovingAverage.get();
}
@ -275,5 +287,9 @@ public class ShardIndexingPressureTracker {
public long getTotalCombinedCoordinatingAndPrimaryBytes() {
return totalCombinedCoordinatingAndPrimaryBytes.get();
}
public long incrementTotalCombinedCoordinatingAndPrimaryBytes(long bytes) {
return totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
}
}
}

View File

@ -0,0 +1,423 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.stats;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.ShardIndexingPressureTracker;
import java.io.IOException;
public class IndexingPressurePerShardStats implements Writeable, ToXContentFragment {
private final String shardId;
private final long totalCombinedCoordinatingAndPrimaryBytes;
private final long totalCoordinatingBytes;
private final long totalPrimaryBytes;
private final long totalReplicaBytes;
private final long currentCombinedCoordinatingAndPrimaryBytes;
private final long currentCoordinatingBytes;
private final long currentPrimaryBytes;
private final long currentReplicaBytes;
private final long totalCoordinatingCount;
private final long totalPrimaryCount;
private final long totalReplicaCount;
private final long coordinatingRejections;
private final long coordinatingNodeLimitsBreachedRejections;
private final long coordinatingLastSuccessfulRequestLimitsBreachedRejections;
private final long coordinatingThroughputDegradationLimitsBreachedRejections;
private final long primaryRejections;
private final long primaryNodeLimitsBreachedRejections;
private final long primaryLastSuccessfulRequestLimitsBreachedRejections;
private final long primaryThroughputDegradationLimitsBreachedRejections;
private final long replicaRejections;
private final long replicaNodeLimitsBreachedRejections;
private final long replicaLastSuccessfulRequestLimitsBreachedRejections;
private final long replicaThroughputDegradationLimitsBreachedRejections;
private final long coordinatingTimeInMillis;
private final long primaryTimeInMillis;
private final long replicaTimeInMillis;
private final long coordinatingLastSuccessfulRequestTimestampInMillis;
private final long primaryLastSuccessfulRequestTimestampInMillis;
private final long replicaLastSuccessfulRequestTimestampInMillis;
private final long currentPrimaryAndCoordinatingLimits;
private final long currentReplicaLimits;
private final boolean shardIndexingPressureEnforced;
public IndexingPressurePerShardStats(StreamInput in) throws IOException {
shardId = in.readString();
shardIndexingPressureEnforced = in.readBoolean();
totalCombinedCoordinatingAndPrimaryBytes = in.readVLong();
totalCoordinatingBytes = in.readVLong();
totalPrimaryBytes = in.readVLong();
totalReplicaBytes = in.readVLong();
currentCombinedCoordinatingAndPrimaryBytes = in.readVLong();
currentCoordinatingBytes = in.readVLong();
currentPrimaryBytes = in.readVLong();
currentReplicaBytes = in.readVLong();
totalCoordinatingCount = in.readVLong();
totalPrimaryCount = in.readVLong();
totalReplicaCount = in.readVLong();
coordinatingRejections = in.readVLong();
coordinatingNodeLimitsBreachedRejections = in.readVLong();
coordinatingLastSuccessfulRequestLimitsBreachedRejections = in.readVLong();
coordinatingThroughputDegradationLimitsBreachedRejections = in.readVLong();
primaryRejections = in.readVLong();
primaryNodeLimitsBreachedRejections = in.readVLong();
primaryLastSuccessfulRequestLimitsBreachedRejections = in.readVLong();
primaryThroughputDegradationLimitsBreachedRejections = in.readVLong();
replicaRejections = in.readVLong();
replicaNodeLimitsBreachedRejections = in.readVLong();
replicaLastSuccessfulRequestLimitsBreachedRejections = in.readVLong();
replicaThroughputDegradationLimitsBreachedRejections = in.readVLong();
coordinatingTimeInMillis = in.readVLong();
primaryTimeInMillis = in.readVLong();
replicaTimeInMillis = in.readVLong();
coordinatingLastSuccessfulRequestTimestampInMillis = in.readVLong();
primaryLastSuccessfulRequestTimestampInMillis = in.readVLong();
replicaLastSuccessfulRequestTimestampInMillis = in.readVLong();
currentPrimaryAndCoordinatingLimits = in.readVLong();
currentReplicaLimits = in.readVLong();
}
public IndexingPressurePerShardStats(ShardIndexingPressureTracker shardIndexingPressureTracker, boolean shardIndexingPressureEnforced) {
shardId = shardIndexingPressureTracker.getShardId().toString();
this.shardIndexingPressureEnforced = shardIndexingPressureEnforced;
totalCombinedCoordinatingAndPrimaryBytes =
shardIndexingPressureTracker.getCommonOperationTracker().getTotalCombinedCoordinatingAndPrimaryBytes();
totalCoordinatingBytes = shardIndexingPressureTracker.getCoordinatingOperationTracker().getStatsTracker().getTotalBytes();
totalPrimaryBytes = shardIndexingPressureTracker.getPrimaryOperationTracker().getStatsTracker().getTotalBytes();
totalReplicaBytes = shardIndexingPressureTracker.getReplicaOperationTracker().getStatsTracker().getTotalBytes();
currentCombinedCoordinatingAndPrimaryBytes =
shardIndexingPressureTracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes();
currentCoordinatingBytes = shardIndexingPressureTracker.getCoordinatingOperationTracker().getStatsTracker().getCurrentBytes();
currentPrimaryBytes = shardIndexingPressureTracker.getPrimaryOperationTracker().getStatsTracker().getCurrentBytes();
currentReplicaBytes = shardIndexingPressureTracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes();
totalCoordinatingCount = shardIndexingPressureTracker.getCoordinatingOperationTracker().getStatsTracker().getRequestCount();
totalPrimaryCount = shardIndexingPressureTracker.getPrimaryOperationTracker().getStatsTracker().getRequestCount();
totalReplicaCount = shardIndexingPressureTracker.getReplicaOperationTracker().getStatsTracker().getRequestCount();
coordinatingRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections();
coordinatingNodeLimitsBreachedRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker()
.getRejectionTracker().getNodeLimitsBreachedRejections();
coordinatingLastSuccessfulRequestLimitsBreachedRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker()
.getRejectionTracker().getLastSuccessfulRequestLimitsBreachedRejections();
coordinatingThroughputDegradationLimitsBreachedRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker()
.getRejectionTracker().getThroughputDegradationLimitsBreachedRejections();
primaryRejections = shardIndexingPressureTracker.getPrimaryOperationTracker().getRejectionTracker().getTotalRejections();
primaryNodeLimitsBreachedRejections = shardIndexingPressureTracker.getPrimaryOperationTracker()
.getRejectionTracker().getNodeLimitsBreachedRejections();
primaryLastSuccessfulRequestLimitsBreachedRejections = shardIndexingPressureTracker.getPrimaryOperationTracker()
.getRejectionTracker().getLastSuccessfulRequestLimitsBreachedRejections();
primaryThroughputDegradationLimitsBreachedRejections = shardIndexingPressureTracker.getPrimaryOperationTracker()
.getRejectionTracker().getThroughputDegradationLimitsBreachedRejections();
replicaRejections = shardIndexingPressureTracker.getReplicaOperationTracker().getRejectionTracker().getTotalRejections();
replicaNodeLimitsBreachedRejections = shardIndexingPressureTracker.getReplicaOperationTracker()
.getRejectionTracker().getNodeLimitsBreachedRejections();
replicaLastSuccessfulRequestLimitsBreachedRejections = shardIndexingPressureTracker.getReplicaOperationTracker()
.getRejectionTracker().getLastSuccessfulRequestLimitsBreachedRejections();
replicaThroughputDegradationLimitsBreachedRejections = shardIndexingPressureTracker.getReplicaOperationTracker()
.getRejectionTracker().getThroughputDegradationLimitsBreachedRejections();
coordinatingTimeInMillis = shardIndexingPressureTracker.getCoordinatingOperationTracker().getPerformanceTracker()
.getLatencyInMillis();
primaryTimeInMillis = shardIndexingPressureTracker.getPrimaryOperationTracker().getPerformanceTracker()
.getLatencyInMillis();
replicaTimeInMillis = shardIndexingPressureTracker.getReplicaOperationTracker().getPerformanceTracker()
.getLatencyInMillis();
coordinatingLastSuccessfulRequestTimestampInMillis = shardIndexingPressureTracker.getCoordinatingOperationTracker()
.getPerformanceTracker().getLastSuccessfulRequestTimestamp();
primaryLastSuccessfulRequestTimestampInMillis = shardIndexingPressureTracker.getPrimaryOperationTracker()
.getPerformanceTracker().getLastSuccessfulRequestTimestamp();
replicaLastSuccessfulRequestTimestampInMillis = shardIndexingPressureTracker.getReplicaOperationTracker()
.getPerformanceTracker().getLastSuccessfulRequestTimestamp();
currentPrimaryAndCoordinatingLimits = shardIndexingPressureTracker.getPrimaryAndCoordinatingLimits();
currentReplicaLimits = shardIndexingPressureTracker.getReplicaLimits();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(shardId);
out.writeBoolean(shardIndexingPressureEnforced);
out.writeVLong(totalCombinedCoordinatingAndPrimaryBytes);
out.writeVLong(totalCoordinatingBytes);
out.writeVLong(totalPrimaryBytes);
out.writeVLong(totalReplicaBytes);
out.writeVLong(currentCombinedCoordinatingAndPrimaryBytes);
out.writeVLong(currentCoordinatingBytes);
out.writeVLong(currentPrimaryBytes);
out.writeVLong(currentReplicaBytes);
out.writeVLong(totalCoordinatingCount);
out.writeVLong(totalPrimaryCount);
out.writeVLong(totalReplicaCount);
out.writeVLong(coordinatingRejections);
out.writeVLong(coordinatingNodeLimitsBreachedRejections);
out.writeVLong(coordinatingLastSuccessfulRequestLimitsBreachedRejections);
out.writeVLong(coordinatingThroughputDegradationLimitsBreachedRejections);
out.writeVLong(primaryRejections);
out.writeVLong(primaryNodeLimitsBreachedRejections);
out.writeVLong(primaryLastSuccessfulRequestLimitsBreachedRejections);
out.writeVLong(primaryThroughputDegradationLimitsBreachedRejections);
out.writeVLong(replicaRejections);
out.writeVLong(replicaNodeLimitsBreachedRejections);
out.writeVLong(replicaLastSuccessfulRequestLimitsBreachedRejections);
out.writeVLong(replicaThroughputDegradationLimitsBreachedRejections);
out.writeVLong(coordinatingTimeInMillis);
out.writeVLong(primaryTimeInMillis);
out.writeVLong(replicaTimeInMillis);
out.writeVLong(coordinatingLastSuccessfulRequestTimestampInMillis);
out.writeVLong(primaryLastSuccessfulRequestTimestampInMillis);
out.writeVLong(replicaLastSuccessfulRequestTimestampInMillis);
out.writeVLong(currentPrimaryAndCoordinatingLimits);
out.writeVLong(currentReplicaLimits);
}
public long getTotalCombinedCoordinatingAndPrimaryBytes() {
return totalCombinedCoordinatingAndPrimaryBytes;
}
public long getTotalCoordinatingBytes() {
return totalCoordinatingBytes;
}
public long getTotalPrimaryBytes() {
return totalPrimaryBytes;
}
public long getTotalReplicaBytes() {
return totalReplicaBytes;
}
public long getCurrentCombinedCoordinatingAndPrimaryBytes() {
return currentCombinedCoordinatingAndPrimaryBytes;
}
public long getCurrentCoordinatingBytes() {
return currentCoordinatingBytes;
}
public long getCurrentPrimaryBytes() {
return currentPrimaryBytes;
}
public long getCurrentReplicaBytes() {
return currentReplicaBytes;
}
public long getCoordinatingRejections() {
return coordinatingRejections;
}
public long getCoordinatingNodeLimitsBreachedRejections() {
return coordinatingNodeLimitsBreachedRejections;
}
public long getCoordinatingLastSuccessfulRequestLimitsBreachedRejections() {
return coordinatingLastSuccessfulRequestLimitsBreachedRejections;
}
public long getCoordinatingThroughputDegradationLimitsBreachedRejections() {
return coordinatingThroughputDegradationLimitsBreachedRejections;
}
public long getPrimaryRejections() {
return primaryRejections;
}
public long getPrimaryNodeLimitsBreachedRejections() {
return primaryNodeLimitsBreachedRejections;
}
public long getPrimaryLastSuccessfulRequestLimitsBreachedRejections() {
return primaryLastSuccessfulRequestLimitsBreachedRejections;
}
public long getPrimaryThroughputDegradationLimitsBreachedRejections() {
return primaryThroughputDegradationLimitsBreachedRejections;
}
public long getReplicaRejections() {
return replicaRejections;
}
public long getReplicaNodeLimitsBreachedRejections() {
return replicaNodeLimitsBreachedRejections;
}
public long getReplicaLastSuccessfulRequestLimitsBreachedRejections() {
return replicaLastSuccessfulRequestLimitsBreachedRejections;
}
public long getReplicaThroughputDegradationLimitsBreachedRejections() {
return replicaThroughputDegradationLimitsBreachedRejections;
}
public long getCurrentPrimaryAndCoordinatingLimits() {
return currentPrimaryAndCoordinatingLimits;
}
public long getCurrentReplicaLimits() {
return currentReplicaLimits;
}
private static final String COORDINATING = "coordinating";
private static final String COORDINATING_IN_BYTES = "coordinating_in_bytes";
private static final String COORDINATING_COUNT = "coordinating_count";
private static final String PRIMARY = "primary";
private static final String PRIMARY_IN_BYTES = "primary_in_bytes";
private static final String PRIMARY_COUNT = "primary_count";
private static final String REPLICA = "replica";
private static final String REPLICA_IN_BYTES = "replica_in_bytes";
private static final String REPLICA_COUNT = "replica_count";
private static final String COORDINATING_REJECTIONS = "coordinating_rejections";
private static final String PRIMARY_REJECTIONS = "primary_rejections";
private static final String REPLICA_REJECTIONS = "replica_rejections";
private static final String BREAKUP_NODE_LIMITS = "node_limits";
private static final String BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS = "no_successful_request_limits";
private static final String BREAKUP_THROUGHPUT_DEGRADATION_LIMIT = "throughput_degradation_limits";
private static final String COORDINATING_TIME_IN_MILLIS = "coordinating_time_in_millis";
private static final String PRIMARY_TIME_IN_MILLIS = "primary_time_in_millis";
private static final String REPLICA_TIME_IN_MILLIS = "replica_time_in_millis";
private static final String COORDINATING_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS =
"coordinating_last_successful_request_timestamp_in_millis";
private static final String PRIMARY_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS =
"primary_last_successful_request_timestamp_in_millis";
private static final String REPLICA_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS = "replica_last_successful_request_timestamp_in_millis";
private static final String CURRENT_COORDINATING_AND_PRIMARY_LIMITS_IN_BYTES = "current_coordinating_and_primary_limits_in_bytes";
private static final String CURRENT_REPLICA_LIMITS_IN_BYTES = "current_replica_limits_in_bytes";
private static final String CURRENT_COORDINATING_AND_PRIMARY_IN_BYTES = "current_coordinating_and_primary_bytes";
private static final String CURRENT_REPLICA_IN_BYTES = "current_replica_bytes";
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(shardId);
builder.startObject("memory");
builder.startObject("current");
builder.humanReadableField(COORDINATING_IN_BYTES, COORDINATING, new ByteSizeValue(currentCoordinatingBytes));
builder.humanReadableField(PRIMARY_IN_BYTES, PRIMARY, new ByteSizeValue(currentPrimaryBytes));
builder.humanReadableField(REPLICA_IN_BYTES, REPLICA, new ByteSizeValue(currentReplicaBytes));
builder.endObject();
builder.startObject("total");
builder.humanReadableField(COORDINATING_IN_BYTES, COORDINATING, new ByteSizeValue(totalCoordinatingBytes));
builder.humanReadableField(PRIMARY_IN_BYTES, PRIMARY, new ByteSizeValue(totalPrimaryBytes));
builder.humanReadableField(REPLICA_IN_BYTES, REPLICA, new ByteSizeValue(totalReplicaBytes));
builder.endObject();
builder.endObject();
builder.startObject("rejection");
builder.startObject("coordinating");
builder.field(COORDINATING_REJECTIONS, coordinatingRejections);
if (shardIndexingPressureEnforced) {
builder.startObject("breakup");
} else {
builder.startObject("breakup_shadow_mode");
}
builder.field(BREAKUP_NODE_LIMITS, coordinatingNodeLimitsBreachedRejections);
builder.field(BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS, coordinatingLastSuccessfulRequestLimitsBreachedRejections);
builder.field(BREAKUP_THROUGHPUT_DEGRADATION_LIMIT, coordinatingThroughputDegradationLimitsBreachedRejections);
builder.endObject();
builder.endObject();
builder.startObject("primary");
builder.field(PRIMARY_REJECTIONS, primaryRejections);
if (shardIndexingPressureEnforced) {
builder.startObject("breakup");
} else {
builder.startObject("breakup_shadow_mode");
}
builder.field(BREAKUP_NODE_LIMITS, primaryNodeLimitsBreachedRejections);
builder.field(BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS, primaryLastSuccessfulRequestLimitsBreachedRejections);
builder.field(BREAKUP_THROUGHPUT_DEGRADATION_LIMIT, primaryThroughputDegradationLimitsBreachedRejections);
builder.endObject();
builder.endObject();
builder.startObject("replica");
builder.field(REPLICA_REJECTIONS, replicaRejections);
if (shardIndexingPressureEnforced) {
builder.startObject("breakup");
} else {
builder.startObject("breakup_shadow_mode");
}
builder.field(BREAKUP_NODE_LIMITS, replicaNodeLimitsBreachedRejections);
builder.field(BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS, replicaLastSuccessfulRequestLimitsBreachedRejections);
builder.field(BREAKUP_THROUGHPUT_DEGRADATION_LIMIT, replicaThroughputDegradationLimitsBreachedRejections);
builder.endObject();
builder.endObject();
builder.endObject();
builder.startObject("last_successful_timestamp");
builder.field(COORDINATING_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS, coordinatingLastSuccessfulRequestTimestampInMillis);
builder.field(PRIMARY_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS, primaryLastSuccessfulRequestTimestampInMillis);
builder.field(REPLICA_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS, replicaLastSuccessfulRequestTimestampInMillis);
builder.endObject();
builder.startObject("indexing");
builder.field(COORDINATING_TIME_IN_MILLIS, coordinatingTimeInMillis);
builder.field(COORDINATING_COUNT, totalCoordinatingCount);
builder.field(PRIMARY_TIME_IN_MILLIS, primaryTimeInMillis);
builder.field(PRIMARY_COUNT, totalPrimaryCount);
builder.field(REPLICA_TIME_IN_MILLIS, replicaTimeInMillis);
builder.field(REPLICA_COUNT, totalReplicaCount);
builder.endObject();
builder.startObject("memory_allocation");
builder.startObject("current");
builder.field(CURRENT_COORDINATING_AND_PRIMARY_IN_BYTES, currentCombinedCoordinatingAndPrimaryBytes);
builder.field(CURRENT_REPLICA_IN_BYTES, currentReplicaBytes);
builder.endObject();
builder.startObject("limit");
builder.field(CURRENT_COORDINATING_AND_PRIMARY_LIMITS_IN_BYTES, currentPrimaryAndCoordinatingLimits);
builder.field(CURRENT_REPLICA_LIMITS_IN_BYTES, currentReplicaLimits);
builder.endObject();
builder.endObject();
return builder.endObject();
}
}

View File

@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.stats;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class ShardIndexingPressureStats implements Writeable, ToXContentFragment {
private final Map<ShardId, IndexingPressurePerShardStats> shardIndexingPressureStore;
private final long totalNodeLimitsBreachedRejections;
private final long totalLastSuccessfulRequestLimitsBreachedRejections;
private final long totalThroughputDegradationLimitsBreachedRejections;
private final boolean shardIndexingPressureEnabled;
private final boolean shardIndexingPressureEnforced;
public ShardIndexingPressureStats(StreamInput in) throws IOException {
int shardEntries = in.readInt();
shardIndexingPressureStore = new HashMap<>();
for (int i = 0; i < shardEntries; i++) {
ShardId shardId = new ShardId(in);
IndexingPressurePerShardStats shardStats = new IndexingPressurePerShardStats(in);
shardIndexingPressureStore.put(shardId, shardStats);
}
totalNodeLimitsBreachedRejections = in.readVLong();
totalLastSuccessfulRequestLimitsBreachedRejections = in.readVLong();
totalThroughputDegradationLimitsBreachedRejections = in.readVLong();
shardIndexingPressureEnabled = in.readBoolean();
shardIndexingPressureEnforced = in.readBoolean();
}
public ShardIndexingPressureStats(Map<ShardId, IndexingPressurePerShardStats> shardIndexingPressureStore,
long totalNodeLimitsBreachedRejections,
long totalLastSuccessfulRequestLimitsBreachedRejections,
long totalThroughputDegradationLimitsBreachedRejections,
boolean shardIndexingPressureEnabled,
boolean shardIndexingPressureEnforced) {
this.shardIndexingPressureStore = shardIndexingPressureStore;
this.totalNodeLimitsBreachedRejections = totalNodeLimitsBreachedRejections;
this.totalLastSuccessfulRequestLimitsBreachedRejections = totalLastSuccessfulRequestLimitsBreachedRejections;
this.totalThroughputDegradationLimitsBreachedRejections = totalThroughputDegradationLimitsBreachedRejections;
this.shardIndexingPressureEnabled = shardIndexingPressureEnabled;
this.shardIndexingPressureEnforced = shardIndexingPressureEnforced;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(shardIndexingPressureStore.size());
for (Map.Entry<ShardId, IndexingPressurePerShardStats> entry : shardIndexingPressureStore.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
out.writeVLong(totalNodeLimitsBreachedRejections);
out.writeVLong(totalLastSuccessfulRequestLimitsBreachedRejections);
out.writeVLong(totalThroughputDegradationLimitsBreachedRejections);
out.writeBoolean(shardIndexingPressureEnabled);
out.writeBoolean(shardIndexingPressureEnforced);
}
public IndexingPressurePerShardStats getIndexingPressureShardStats(ShardId shardId) {
return shardIndexingPressureStore.get(shardId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("shard_indexing_pressure");
builder.startObject("stats");
for (Map.Entry<ShardId, IndexingPressurePerShardStats> entry : shardIndexingPressureStore.entrySet()) {
entry.getValue().toXContent(builder, params);
}
builder.endObject();
if (shardIndexingPressureEnforced) {
builder.startObject("total_rejections_breakup");
} else {
builder.startObject("total_rejections_breakup_shadow_mode");
}
builder.field("node_limits", totalNodeLimitsBreachedRejections);
builder.field("no_successful_request_limits", totalLastSuccessfulRequestLimitsBreachedRejections);
builder.field("throughput_degradation_limits", totalThroughputDegradationLimitsBreachedRejections);
builder.endObject();
builder.field("enabled", shardIndexingPressureEnabled);
builder.field("enforced", shardIndexingPressureEnforced);
return builder.endObject();
}
public void addAll(ShardIndexingPressureStats shardIndexingPressureStats) {
if (this.shardIndexingPressureStore != null) {
this.shardIndexingPressureStore.putAll(shardIndexingPressureStats.shardIndexingPressureStore);
}
}
}

View File

@ -0,0 +1,849 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index;
import org.hamcrest.Matchers;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.IndexingPressurePerShardStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.test.OpenSearchTestCase;
import java.util.concurrent.atomic.AtomicInteger;
public class ShardIndexingPressureConcurrentExecutionTests extends OpenSearchTestCase {
private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100)
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterService clusterService = new ClusterService(settings, clusterSettings, null);
public enum OperationType { COORDINATING, PRIMARY, REPLICA }
public void testCoordinatingPrimaryThreadedUpdateToShardLimits() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
Releasable[] releasable;
if (randomBoolean) {
releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.COORDINATING);
} else {
releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.PRIMARY);
}
if(randomBoolean) {
assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
} else {
assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
}
assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertTrue((double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits() < 0.95);
assertTrue((double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits() > 0.75);
for (int i = 0; i < NUM_THREADS; i++) {
releasable[i].close();
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
if(randomBoolean) {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
} else {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedUpdateToShardLimits() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
Releasable[] releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.REPLICA);
assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentReplicaBytes());
assertTrue((double)(NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentReplicaLimits() < 0.95);
assertTrue((double)(NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentReplicaLimits() > 0.75);
for (int i = 0; i < NUM_THREADS; i++) {
releasable[i].close();
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryThreadedSimultaneousUpdateToShardLimits() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
if (randomBoolean) {
fireAndCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 100, OperationType.COORDINATING);
} else {
fireAndCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 100, OperationType.PRIMARY);
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
if(randomBoolean) {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
} else {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedSimultaneousUpdateToShardLimits() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
fireAndCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 100, OperationType.REPLICA);
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryThreadedUpdateToShardLimitsWithRandomBytes() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 400);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
if (randomBoolean) {
fireAllThenCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.COORDINATING);
} else {
fireAllThenCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.PRIMARY);
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
if(randomBoolean) {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
} else {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedUpdateToShardLimitsWithRandomBytes() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 400);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
fireAllThenCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.REPLICA);
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryThreadedUpdateToShardLimitsAndRejections() throws Exception {
final int NUM_THREADS = 100;
final Thread[] threads = new Thread[NUM_THREADS];
final Releasable[] releasables = new Releasable[NUM_THREADS];
AtomicInteger rejectionCount = new AtomicInteger();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
for (int i = 0; i < NUM_THREADS; i++) {
int counter = i;
threads[i] = new Thread(() -> {
try {
if(randomBoolean) {
releasables[counter] = shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 200, false);
} else {
releasables[counter] = shardIndexingPressure.markPrimaryOperationStarted(shardId1, 200, false);
}
} catch (OpenSearchRejectedExecutionException e) {
rejectionCount.addAndGet(1);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
ShardIndexingPressureStats shardStats = shardIndexingPressure.shardStats();
if(randomBoolean) {
assertEquals(rejectionCount.get(), nodeStats.getCoordinatingRejections());
assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentCoordinatingBytes() < 50 * 200);
} else {
assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentPrimaryBytes() < 50 * 200);
assertEquals(rejectionCount.get(), nodeStats.getPrimaryRejections());
}
assertTrue(nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes() < 50 * 200);
assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentCombinedCoordinatingAndPrimaryBytes() < 50 * 200);
for (int i = 0; i < NUM_THREADS - rejectionCount.get(); i++) {
releasables[i].close();
}
nodeStats = shardIndexingPressure.stats();
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
shardStats = shardIndexingPressure.coldStats();
if(randomBoolean) {
assertEquals(rejectionCount.get(), nodeStats.getCoordinatingRejections());
assertEquals(rejectionCount.get(), shardStats.getIndexingPressureShardStats(shardId1)
.getCoordinatingNodeLimitsBreachedRejections());
assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentCoordinatingBytes());
} else {
assertEquals(rejectionCount.get(), nodeStats.getPrimaryRejections());
assertEquals(rejectionCount.get(), shardStats.getIndexingPressureShardStats(shardId1)
.getPrimaryNodeLimitsBreachedRejections());
assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentPrimaryBytes());
}
assertEquals(0, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getIndexingPressureShardStats(shardId1).getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedUpdateToShardLimitsAndRejections() throws Exception {
final int NUM_THREADS = 100;
final Thread[] threads = new Thread[NUM_THREADS];
final Releasable[] releasables = new Releasable[NUM_THREADS];
AtomicInteger rejectionCount = new AtomicInteger();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
for (int i = 0; i < NUM_THREADS; i++) {
int counter = i;
threads[i] = new Thread(() -> {
try {
releasables[counter] = shardIndexingPressure.markReplicaOperationStarted(shardId1, 300, false);
} catch (OpenSearchRejectedExecutionException e) {
rejectionCount.addAndGet(1);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(rejectionCount.get(), nodeStats.getReplicaRejections());
assertTrue(nodeStats.getCurrentReplicaBytes() < 50 * 300);
ShardIndexingPressureStats shardStats = shardIndexingPressure.shardStats();
assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes() < 50 * 300);
for (int i = 0; i < releasables.length - 1; i++) {
if(releasables[i] != null) {
releasables[i].close();
}
}
nodeStats = shardIndexingPressure.stats();
assertEquals(rejectionCount.get(), nodeStats.getReplicaRejections());
assertEquals(0, nodeStats.getCurrentReplicaBytes());
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
shardStats = shardIndexingPressure.coldStats();
assertEquals(rejectionCount.get(), shardStats.getIndexingPressureShardStats(shardId1)
.getReplicaNodeLimitsBreachedRejections());
assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(15, shardStats.getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryConcurrentUpdatesOnShardIndexingPressureTrackerObjects() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 400);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "new_uuid");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
Releasable[] releasables;
if(randomBoolean) {
releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.COORDINATING);
} else {
releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.PRIMARY);
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats()
.getIndexingPressureShardStats(shardId1);
assertThat(shardStoreStats.getCurrentPrimaryAndCoordinatingLimits(), Matchers.greaterThan(100L));
CommonStatsFlags statsFlag = new CommonStatsFlags();
statsFlag.includeAllShardIndexingPressureTrackers(true);
IndexingPressurePerShardStats shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag)
.getIndexingPressureShardStats(shardId1);;
assertEquals(shardStoreStats.getCurrentPrimaryAndCoordinatingLimits(), shardStoreStats2
.getCurrentPrimaryAndCoordinatingLimits());
statsFlag.includeOnlyTopIndexingPressureMetrics(true);
assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1));
statsFlag.includeOnlyTopIndexingPressureMetrics(false);
for (int i = 0; i < NUM_THREADS; i++) {
releasables[i].close();
}
//No object in host store as no active shards
shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
if(randomBoolean) {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
} else {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1);
assertEquals(shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits(),
shardStoreStats2.getCurrentPrimaryAndCoordinatingLimits());
statsFlag.includeAllShardIndexingPressureTrackers(false);
assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1));
}
public void testReplicaConcurrentUpdatesOnShardIndexingPressureTrackerObjects() throws Exception {
final int NUM_THREADS = scaledRandomIntBetween(100, 400);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "new_uuid");
ShardId shardId1 = new ShardId(index, 0);
final Releasable[] releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 20, OperationType.REPLICA);
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats()
.getIndexingPressureShardStats(shardId1);
assertThat(shardStoreStats.getCurrentReplicaLimits(), Matchers.greaterThan(100L));
CommonStatsFlags statsFlag = new CommonStatsFlags();
statsFlag.includeAllShardIndexingPressureTrackers(true);
IndexingPressurePerShardStats shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag)
.getIndexingPressureShardStats(shardId1);;
assertEquals(shardStoreStats.getCurrentReplicaLimits(), shardStoreStats2.getCurrentReplicaLimits());
statsFlag.includeOnlyTopIndexingPressureMetrics(true);
assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1));
statsFlag.includeOnlyTopIndexingPressureMetrics(false);
for (int i = 0; i < NUM_THREADS; i++) {
releasables[i].close();
}
//No object in host store as no active shards
shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1);
assertNull(shardStoreStats);
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1);;
assertEquals(shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits(),
shardStoreStats2.getCurrentReplicaLimits());
statsFlag.includeAllShardIndexingPressureTrackers(false);
assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1));
}
public void testCoordinatingPrimaryThreadedThroughputDegradationAndRejection() throws Exception {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "15KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 80)
.build();
final int NUM_THREADS = scaledRandomIntBetween(80, 100);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
//Generating a concurrent + sequential load to have a fair throughput
if (randomBoolean) {
fireConcurrentAndParallelRequestsForUniformThroughPut(NUM_THREADS, shardIndexingPressure, shardId1, 100, 100,
OperationType.COORDINATING);
} else {
fireConcurrentAndParallelRequestsForUniformThroughPut(NUM_THREADS, shardIndexingPressure, shardId1, 100, 100,
OperationType.PRIMARY);
}
//Generating a load to such that the requests in the window shows degradation in throughput.
if (randomBoolean) {
fireAllThenCompleteConcurrentRequestsWithUniformDelay(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.get(settings),
shardIndexingPressure, shardId1, 100, 200, OperationType.COORDINATING);
} else {
fireAllThenCompleteConcurrentRequestsWithUniformDelay(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.get(settings),
shardIndexingPressure, shardId1, 100, 200, OperationType.PRIMARY);
}
//Generate a load which breaches both primary parameter
if(randomBoolean) {
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 11 * 1024, false));
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingNodeLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections());
} else {
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markPrimaryOperationStarted(shardId1, 11 * 1024, false));
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryNodeLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryLastSuccessfulRequestLimitsBreachedRejections());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedThroughputDegradationAndRejection() throws Exception {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100)
.build();
final int NUM_THREADS = scaledRandomIntBetween(100, 120);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
//Generating a load to have a fair throughput
fireConcurrentAndParallelRequestsForUniformThroughPut(NUM_THREADS, shardIndexingPressure, shardId1, 100, 100,
OperationType.REPLICA);
//Generating a load to such that the requests in the window shows degradation in throughput.
fireAllThenCompleteConcurrentRequestsWithUniformDelay(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.get(settings),
shardIndexingPressure, shardId1, 100, 200, OperationType.REPLICA);
//Generate a load which breaches both primary parameter
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markReplicaOperationStarted(shardId1, 11 * 1024, false));
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getReplicaRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaNodeLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaLastSuccessfulRequestLimitsBreachedRejections());
}
public void testCoordinatingPrimaryThreadedLastSuccessfulRequestsAndRejection() throws Exception {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.build();
final int NUM_THREADS = scaledRandomIntBetween(110, 150);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
//One request being successful
if(randomBoolean) {
Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 10, false);
coordinating.close();
} else {
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId1, 10, false);
primary.close();
}
//Generating a load such that requests are blocked requests.
Releasable[] releasables;
if (randomBoolean) {
releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 10, OperationType.COORDINATING);
} else {
releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 10, OperationType.PRIMARY);
}
//Mimic the time elapsed after requests being stuck
Thread.sleep(randomIntBetween(50, 100));
//Generate a load which breaches both primary parameter
if(randomBoolean) {
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 200 * 1024, false));
} else {
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markPrimaryOperationStarted(shardId1, 200 * 1024, false));
}
for (int i = 0; i < NUM_THREADS; i++) {
releasables[i].close();
}
if(randomBoolean) {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingNodeLimitsBreachedRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections());
} else {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryNodeLimitsBreachedRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryLastSuccessfulRequestLimitsBreachedRejections());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(256, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedLastSuccessfulRequestsAndRejection() throws Exception {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.build();
final int NUM_THREADS = scaledRandomIntBetween(110, 150);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
//One request being successful
Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId1, 10, false);
replica.close();
//Generating a load such that requests are blocked requests.
final Releasable[] releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 10, OperationType.REPLICA);
//Mimic the time elapsed after requests being stuck
Thread.sleep(randomIntBetween(50, 100));
//Generate a load which breaches both primary parameter
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markReplicaOperationStarted(shardId1, 300 * 1024, false));
for (int i = 0; i < NUM_THREADS; i++) {
releasables[i].close();
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getReplicaRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaNodeLimitsBreachedRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(384, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryThreadedNodeLimitsAndRejection() throws Exception {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.build();
final int NUM_THREADS = scaledRandomIntBetween(100, 150);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
//Generating a load to such that the requests in the window shows degradation in throughput.
Releasable[] releasables;
if (randomBoolean) {
releasables = fireConcurrentRequestsWithUniformDelay(NUM_THREADS, shardIndexingPressure, shardId1, 10,
randomIntBetween(50, 100), OperationType.COORDINATING);
} else {
releasables = fireConcurrentRequestsWithUniformDelay(NUM_THREADS, shardIndexingPressure, shardId1, 10,
randomIntBetween(50, 100), OperationType.PRIMARY);
}
//Generate a load which breaches both primary parameter
if(randomBoolean) {
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 240 * 1024, false));
} else {
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markPrimaryOperationStarted(shardId1, 240 * 1024, false));
}
for (int i = 0; i < NUM_THREADS; i++) {
releasables[i].close();
}
if(randomBoolean) {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingThroughputDegradationLimitsBreachedRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingNodeLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections());
} else {
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryThroughputDegradationLimitsBreachedRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryNodeLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getPrimaryLastSuccessfulRequestLimitsBreachedRejections());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(256, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaThreadedNodeLimitsAndRejection() throws Exception {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.build();
final int NUM_THREADS = scaledRandomIntBetween(100, 150);
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
//Generating a load to such that the requests in the window shows degradation in throughput.
final Releasable[] releasables = fireConcurrentRequestsWithUniformDelay(NUM_THREADS, shardIndexingPressure, shardId1, 10,
randomIntBetween(50, 100), OperationType.COORDINATING);
//Generate a load which breaches both primary parameter
expectThrows(OpenSearchRejectedExecutionException.class,
() -> shardIndexingPressure.markReplicaOperationStarted(shardId1, 340 * 1024, false));
for (int i = 0; i < NUM_THREADS; i++) {
releasables[i].close();
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getReplicaRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaThroughputDegradationLimitsBreachedRejections());
assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaNodeLimitsBreachedRejections());
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1)
.getReplicaLastSuccessfulRequestLimitsBreachedRejections());
assertEquals(384, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits());
}
private void fireAndCompleteConcurrentRequests(int concurrency, ShardIndexingPressure shardIndexingPressure, ShardId shardId,
long bytes, OperationType operationType) throws Exception {
fireAndCompleteConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, randomIntBetween(5, 15),
operationType);
}
private void fireAndCompleteConcurrentRequestsWithUniformDelay(int concurrency, ShardIndexingPressure shardIndexingPressure,
ShardId shardId, long bytes, long delay,
OperationType operationType) throws Exception {
final Thread[] threads = new Thread[concurrency];
for (int i = 0; i < concurrency; i++) {
threads[i] = new Thread(() -> {
if(operationType == OperationType.COORDINATING) {
Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, false);
coordinating.close();
} else if (operationType == OperationType.PRIMARY){
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, false);
primary.close();
} else {
Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, false);
replica.close();
}
});
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
//Do Nothing
}
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
}
private Releasable[] fireConcurrentRequests(int concurrency, ShardIndexingPressure shardIndexingPressure, ShardId shardId,
long bytes, OperationType operationType) throws Exception {
return fireConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, 0, operationType);
}
private Releasable[] fireConcurrentRequestsWithUniformDelay(int concurrency, ShardIndexingPressure shardIndexingPressure,
ShardId shardId, long bytes, long delay,
OperationType operationType) throws Exception {
final Thread[] threads = new Thread[concurrency];
final Releasable[] releasable = new Releasable[concurrency];
for (int i = 0; i < concurrency; i++) {
int counter = i;
threads[i] = new Thread(() -> {
if(operationType == OperationType.COORDINATING) {
releasable[counter] = shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, false);
} else if (operationType == OperationType.PRIMARY){
releasable[counter] = shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, false);
} else {
releasable[counter] = shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, false);
}
try {
Thread.sleep(delay);
} catch (Exception e) {
//Do Nothing
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
return releasable;
}
private void fireAllThenCompleteConcurrentRequests(int concurrency, ShardIndexingPressure shardIndexingPressure, ShardId shardId,
long bytes, OperationType operationType) throws Exception {
fireAllThenCompleteConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, 0, operationType);
}
private void fireAllThenCompleteConcurrentRequestsWithUniformDelay(int concurrency, ShardIndexingPressure shardIndexingPressure,
ShardId shardId, long bytes, long delay,
OperationType operationType) throws Exception {
final Releasable[] releasable = fireConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, delay,
operationType);
for (int i = 0; i < concurrency; i++) {
releasable[i].close();
}
}
private void fireConcurrentAndParallelRequestsForUniformThroughPut(int concurrency, ShardIndexingPressure shardIndexingPressure,
ShardId shardId, long bytes, long delay,
OperationType operationType) throws Exception {
final Thread[] threads = new Thread[concurrency];
for (int i = 0; i < concurrency; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < randomIntBetween(400, 500); j++) {
Releasable releasable;
if(operationType == OperationType.COORDINATING) {
releasable = shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, false);
} else if (operationType == OperationType.PRIMARY){
releasable = shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, false);
} else {
releasable = shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, false);
}
try {
Thread.sleep(delay);
} catch (Exception e) {
//Do Nothing
}
releasable.close();
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
}
}

View File

@ -0,0 +1,820 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.IndexingPressurePerShardStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.test.OpenSearchTestCase;
public class ShardIndexingPressureTests extends OpenSearchTestCase {
private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100)
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterService clusterService = new ClusterService(settings, clusterSettings, null);
public void testMemoryBytesMarkedAndReleased() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 10, false);
Releasable coordinating2 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 50, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 15, true);
Releasable primary2 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 5, false);
Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 25, true);
Releasable replica2 = shardIndexingPressure.markReplicaOperationStarted(shardId, 10, false)) {
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(60, nodeStats.getCurrentCoordinatingBytes());
assertEquals(20, nodeStats.getCurrentPrimaryBytes());
assertEquals(80, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(35, nodeStats.getCurrentReplicaBytes());
IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertEquals(60, shardStats.getCurrentCoordinatingBytes());
assertEquals(20, shardStats.getCurrentPrimaryBytes());
assertEquals(80, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(35, shardStats.getCurrentReplicaBytes());
}
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(0, nodeStats.getCurrentCoordinatingBytes());
assertEquals(0, nodeStats.getCurrentPrimaryBytes());
assertEquals(0, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, nodeStats.getCurrentReplicaBytes());
assertEquals(60, nodeStats.getTotalCoordinatingBytes());
assertEquals(20, nodeStats.getTotalPrimaryBytes());
assertEquals(80, nodeStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(35, nodeStats.getTotalReplicaBytes());
IndexingPressurePerShardStats shardHotStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertNull(shardHotStoreStats);
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(60, shardStats.getTotalCoordinatingBytes());
assertEquals(20, shardStats.getTotalPrimaryBytes());
assertEquals(80, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(35, shardStats.getTotalReplicaBytes());
}
public void testAvoidDoubleAccounting() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 10, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 15)) {
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(10, nodeStats.getCurrentCoordinatingBytes());
assertEquals(15, nodeStats.getCurrentPrimaryBytes());
assertEquals(10, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertEquals(10, shardStats.getCurrentCoordinatingBytes());
assertEquals(15, shardStats.getCurrentPrimaryBytes());
assertEquals(10, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
}
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(0, nodeStats.getCurrentCoordinatingBytes());
assertEquals(0, nodeStats.getCurrentPrimaryBytes());
assertEquals(0, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, nodeStats.getTotalCoordinatingBytes());
assertEquals(15, nodeStats.getTotalPrimaryBytes());
assertEquals(10, nodeStats.getTotalCombinedCoordinatingAndPrimaryBytes());
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertNull(shardStoreStats);
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getTotalCoordinatingBytes());
assertEquals(15, shardStats.getTotalPrimaryBytes());
assertEquals(10, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
}
public void testCoordinatingPrimaryRejections() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1024 * 3, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024 * 3, false);
Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 3, false)) {
if (randomBoolean()) {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markCoordinatingOperationStarted(shardId, 1024 * 2, false));
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(1, nodeStats.getCoordinatingRejections());
assertEquals(1024 * 6, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertEquals(1, shardStats.getCoordinatingRejections());
assertEquals(1024 * 6, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(1, shardStats.getCoordinatingNodeLimitsBreachedRejections());
} else {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markPrimaryOperationStarted(shardId, 1024 * 2, false));
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(1, nodeStats.getPrimaryRejections());
assertEquals(1024 * 6, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats()
.getIndexingPressureShardStats(shardId);
assertEquals(1, shardStats.getPrimaryRejections());
assertEquals(1024 * 6, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(1, shardStats.getPrimaryNodeLimitsBreachedRejections());
}
long preForceRejections = shardIndexingPressure.stats().getPrimaryRejections();
long preForcedShardRejections = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getPrimaryRejections();
// Primary can be forced
Releasable forced = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024 * 2, true);
assertEquals(preForceRejections, shardIndexingPressure.stats().getPrimaryRejections());
assertEquals(1024 * 8, shardIndexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(preForcedShardRejections, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getPrimaryRejections());
assertEquals(1024 * 8, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(preForcedShardRejections, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getPrimaryNodeLimitsBreachedRejections());
forced.close();
// Local to coordinating node primary actions not rejected
IndexingPressureStats preLocalNodeStats = shardIndexingPressure.stats();
IndexingPressurePerShardStats preLocalShardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
Releasable local = shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024 * 2);
assertEquals(preLocalNodeStats.getPrimaryRejections(), shardIndexingPressure.stats().getPrimaryRejections());
assertEquals(1024 * 6, shardIndexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(preLocalNodeStats.getCurrentPrimaryBytes() + 1024 * 2, shardIndexingPressure.stats().getCurrentPrimaryBytes());
assertEquals(preLocalShardStats.getPrimaryRejections(), shardIndexingPressure.shardStats()
.getIndexingPressureShardStats(shardId).getPrimaryRejections());
assertEquals(1024 * 6, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(preLocalShardStats.getCurrentPrimaryBytes() + 1024 * 2, shardIndexingPressure.shardStats()
.getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes());
assertEquals(preLocalShardStats.getPrimaryNodeLimitsBreachedRejections(), shardIndexingPressure.shardStats()
.getIndexingPressureShardStats(shardId).getPrimaryNodeLimitsBreachedRejections());
local.close();
}
assertEquals(1024 * 8, shardIndexingPressure.stats().getTotalCombinedCoordinatingAndPrimaryBytes());
assertNull(shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId));
assertEquals(1024 * 8, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId)
.getTotalCombinedCoordinatingAndPrimaryBytes());
}
public void testReplicaRejections() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1024 * 3, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024 * 3, false);
Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 3, false)) {
// Replica will not be rejected until replica bytes > 15KB
Releasable replica2 = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 9, false);
assertEquals(1024 * 12, shardIndexingPressure.stats().getCurrentReplicaBytes());
assertEquals(1024 * 12, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
// Replica will be rejected once we cross 15KB Shard Limit
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markReplicaOperationStarted(shardId, 1024 * 2, false));
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(1, nodeStats.getReplicaRejections());
assertEquals(1024 * 12, nodeStats.getCurrentReplicaBytes());
IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertEquals(1, shardStats.getReplicaRejections());
assertEquals(1024 * 12, shardStats.getCurrentReplicaBytes());
assertEquals(1, shardStats.getReplicaNodeLimitsBreachedRejections());
// Replica can be forced
Releasable forced = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 2, true);
assertEquals(1, shardIndexingPressure.stats().getReplicaRejections());
assertEquals(1024 * 14, shardIndexingPressure.stats().getCurrentReplicaBytes());
assertEquals(1, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getReplicaRejections());
assertEquals(1024 * 14, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals(1, shardStats.getReplicaNodeLimitsBreachedRejections());
forced.close();
replica2.close();
}
assertEquals(1024 * 14, shardIndexingPressure.stats().getTotalReplicaBytes());
assertNull(shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId));
assertEquals(1024 * 14, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId).getTotalReplicaBytes());
}
public void testCoordinatingPrimaryShardLimitIncrease() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 2, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 2, false)) {
assertEquals(2, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
assertEquals(4, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits()); // Base Limit
if (randomBoolean) {
Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 6, false);
assertEquals(8, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
assertEquals(10, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(11, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits()); // Increased Limit
coordinating1.close();
} else {
Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 6, false);
assertEquals(8, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes());
assertEquals(10, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(11, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits()); // Increased Limit
primary1.close();
}
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertNull(shardStoreStats);
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
if(randomBoolean){
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(8, shardStats.getTotalCoordinatingBytes());
} else {
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(8, shardStats.getTotalPrimaryBytes());
}
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaShardLimitIncrease() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 2, false)) {
assertEquals(2, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals(15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits()); // Base Limit
Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 14, false);
assertEquals(16, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals(18, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits()); // Increased Limit
replica1.close();
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertNull(shardStoreStats);
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(16, shardStats.getTotalReplicaBytes());
assertEquals(15, shardStats.getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryShardLimitIncreaseEvaluateSecondaryParam() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 4 * 1024, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 4 * 1024, false)) {
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCoordinatingBytes());
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryBytes());
assertEquals(8 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(8*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits());
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertNull(shardStoreStats);
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes());
assertEquals(4 * 1024, shardStats.getTotalPrimaryBytes());
assertEquals(8 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
}
public void testReplicaShardLimitIncreaseEvaluateSecondaryParam() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 11 * 1024, false)) {
assertEquals(11 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals((long)(11 * 1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits());
}
IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId);
assertNull(shardStoreStats);
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(11 * 1024, shardStats.getTotalReplicaBytes());
assertEquals(15, shardStats.getCurrentReplicaLimits());
}
public void testCoordinatingPrimaryShardRejectionViaSuccessfulRequestsParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) {
assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes());
assertEquals(2 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(2*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits());
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(1 * 1024, shardStats.getTotalCoordinatingBytes());
assertEquals(1 * 1024, shardStats.getTotalPrimaryBytes());
assertEquals(2 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
Thread.sleep(25);
//Total Bytes are 9*1024 and node limit is 10*1024
if(randomBoolean) {
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 7 * 1024, false);
Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false)) {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markCoordinatingOperationStarted(shardId, 1 * 1024, false));
}
} else {
try (Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 7 * 1024, false);
Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markPrimaryOperationStarted(shardId, 1 * 1024, false));
}
}
shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
if(randomBoolean) {
assertEquals(1, shardStats.getCoordinatingRejections());
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(1, shardStats.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections());
} else {
assertEquals(1, shardStats.getPrimaryRejections());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(1, shardStats.getPrimaryLastSuccessfulRequestLimitsBreachedRejections());
}
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
if(randomBoolean) {
assertEquals(1, nodeStats.getCoordinatingRejections());
assertEquals(0, nodeStats.getCurrentCoordinatingBytes());
} else {
assertEquals(1, nodeStats.getPrimaryRejections());
assertEquals(0, nodeStats.getCurrentPrimaryBytes());
}
}
public void testReplicaShardRejectionViaSuccessfulRequestsParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false)) {
assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals((long)(1*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits());
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(1 * 1024, shardStats.getTotalReplicaBytes());
assertEquals(15, shardStats.getCurrentReplicaLimits());
Thread.sleep(25);
//Total Bytes are 14*1024 and node limit is 15*1024
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 10 * 1024, false);
Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 2 * 1024, false)) {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markReplicaOperationStarted(shardId, 2 * 1024, false));
}
shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(1, shardStats.getReplicaRejections());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(1, shardStats.getReplicaLastSuccessfulRequestLimitsBreachedRejections());
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(1, nodeStats.getReplicaRejections());
assertEquals(0, nodeStats.getCurrentReplicaBytes());
}
public void testCoordinatingPrimaryShardRejectionSkippedInShadowModeViaSuccessfulRequestsParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) {
assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes());
assertEquals(2 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(2*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits());
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(1 * 1024, shardStats.getTotalCoordinatingBytes());
assertEquals(1 * 1024, shardStats.getTotalPrimaryBytes());
assertEquals(2 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
Thread.sleep(25);
//Total Bytes are 9*1024 and node limit is 10*1024
if(randomBoolean) {
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 7 * 1024, false);
Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false)) {
Releasable coordinating2 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false);
coordinating2.close();
}
} else {
try (Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 7 * 1024, false);
Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) {
Releasable primary2 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false);
primary2.close();
}
}
shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
if(randomBoolean) {
assertEquals(0, shardStats.getCoordinatingRejections());
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(1, shardStats.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections());
} else {
assertEquals(0, shardStats.getPrimaryRejections());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(1, shardStats.getPrimaryLastSuccessfulRequestLimitsBreachedRejections());
}
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
if(randomBoolean) {
assertEquals(0, nodeStats.getCoordinatingRejections());
assertEquals(0, nodeStats.getCurrentCoordinatingBytes());
} else {
assertEquals(0, nodeStats.getPrimaryRejections());
assertEquals(0, nodeStats.getCurrentPrimaryBytes());
}
}
public void testReplicaShardRejectionSkippedInShadowModeViaSuccessfulRequestsParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false)) {
assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals((long)(1*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits());
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(1 * 1024, shardStats.getTotalReplicaBytes());
assertEquals(15, shardStats.getCurrentReplicaLimits());
Thread.sleep(25);
//Total Bytes are 14*1024 and node limit is 15*1024
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 10 * 1024, false);
Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 2 * 1024, false)) {
Releasable replica2 = shardIndexingPressure.markReplicaOperationStarted(shardId, 2 * 1024, false);
replica2.close();
}
shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getReplicaRejections());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(1, shardStats.getReplicaLastSuccessfulRequestLimitsBreachedRejections());
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(0, nodeStats.getReplicaRejections());
assertEquals(0, nodeStats.getCurrentReplicaBytes());
}
public void testCoordinatingPrimaryShardRejectionViaThroughputDegradationParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false);
Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 3 * 1024, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false);
Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 3 * 1024, false)) {
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes());
assertEquals(8 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(8*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits());
//Adding delay in the current in flight request to mimic throughput degradation
Thread.sleep(100);
}
if(randomBoolean) {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markCoordinatingOperationStarted(shardId, 8 * 1024, false));
} else {
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markPrimaryOperationStarted(shardId, 8 * 1024, false));
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
if(randomBoolean) {
assertEquals(1, shardStats.getCoordinatingRejections());
assertEquals(1, shardStats.getCoordinatingThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes());
} else {
assertEquals(1, shardStats.getPrimaryRejections());
assertEquals(1, shardStats.getPrimaryThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(4 * 1024, shardStats.getTotalPrimaryBytes());
}
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(8 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
if(randomBoolean) {
assertEquals(1, nodeStats.getCoordinatingRejections());
assertEquals(0, nodeStats.getCurrentCoordinatingBytes());
} else {
assertEquals(1, nodeStats.getPrimaryRejections());
assertEquals(0, nodeStats.getCurrentPrimaryBytes());
}
}
public void testReplicaShardRejectionViaThroughputDegradationParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false);
Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 3 * 1024, false)) {
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals((long)(4*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits());
//Adding delay in the current in flight request to mimic throughput degradation
Thread.sleep(100);
}
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markReplicaOperationStarted(shardId, 12 * 1024, false));
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(1, shardStats.getReplicaRejections());
assertEquals(1, shardStats.getReplicaThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(4 * 1024, shardStats.getTotalReplicaBytes());
assertEquals(15, shardStats.getCurrentReplicaLimits());
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(1, nodeStats.getReplicaRejections());
assertEquals(0, nodeStats.getCurrentReplicaBytes());
}
public void testCoordinatingPrimaryShardRejectionSkippedInShadowModeViaThroughputDegradationParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
boolean randomBoolean = randomBoolean();
try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false);
Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 3 * 1024, false);
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false);
Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 3 * 1024, false)) {
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes());
assertEquals(8 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(8*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentPrimaryAndCoordinatingLimits());
//Adding delay in the current in flight request to mimic throughput degradation
Thread.sleep(100);
}
if(randomBoolean) {
Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 8 * 1024, false);
coordinating.close();
} else {
Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 8 * 1024, false);
primary.close();
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
if(randomBoolean) {
assertEquals(0, shardStats.getCoordinatingRejections());
assertEquals(1, shardStats.getCoordinatingThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(12 * 1024, shardStats.getTotalCoordinatingBytes());
} else {
assertEquals(0, shardStats.getPrimaryRejections());
assertEquals(1, shardStats.getPrimaryThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(12 * 1024, shardStats.getTotalPrimaryBytes());
}
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(16 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
if(randomBoolean) {
assertEquals(0, nodeStats.getCoordinatingRejections());
assertEquals(0, nodeStats.getCurrentCoordinatingBytes());
} else {
assertEquals(0, nodeStats.getPrimaryRejections());
assertEquals(0, nodeStats.getCurrentPrimaryBytes());
}
}
public void testReplicaShardRejectionSkippedInShadowModeViaThroughputDegradationParam() throws InterruptedException {
Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false)
.put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1)
.build();
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false);
Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 3 * 1024, false)) {
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes());
assertEquals((long)(4*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentReplicaLimits());
//Adding delay in the current in flight request to mimic throughput degradation
Thread.sleep(100);
}
Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 12 * 1024, false);
replica.close();
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId);
assertEquals(0, shardStats.getReplicaRejections());
assertEquals(1, shardStats.getReplicaThroughputDegradationLimitsBreachedRejections());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(16 * 1024, shardStats.getTotalReplicaBytes());
assertEquals(15, shardStats.getCurrentReplicaLimits());
IndexingPressureStats nodeStats = shardIndexingPressure.stats();
assertEquals(0, nodeStats.getReplicaRejections());
assertEquals(0, nodeStats.getCurrentReplicaBytes());
}
public void testShardLimitIncreaseMultipleShards() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId1 = new ShardId(index, 0);
ShardId shardId2 = new ShardId(index, 1);
try (Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 4 * 1024, false);
Releasable coordinating2 = shardIndexingPressure.markCoordinatingOperationStarted(shardId2, 4 * 1024, false);) {
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentCoordinatingBytes());
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(4 * 1024 / 0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1)
.getCurrentPrimaryAndCoordinatingLimits());
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId2)
.getCurrentCoordinatingBytes());
assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId2)
.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals((long)(4 * 1024 / 0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId2)
.getCurrentPrimaryAndCoordinatingLimits());
}
IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes());
assertEquals(4 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId2);
assertEquals(0, shardStats.getCurrentCoordinatingBytes());
assertEquals(0, shardStats.getCurrentPrimaryBytes());
assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes());
assertEquals(0, shardStats.getCurrentReplicaBytes());
assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes());
assertEquals(4 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes());
assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits());
}
public void testForceExecutionOnCoordinating() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure
.markCoordinatingOperationStarted(shardId,1024 * 11, false));
try (Releasable ignore = shardIndexingPressure.markCoordinatingOperationStarted(shardId,11 * 1024, true)) {
assertEquals(11 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)
.getCurrentCoordinatingBytes());
}
assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes());
}
public void testAssertionOnReleaseExecutedTwice() {
ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
String assertionMessage = "ShardIndexingPressure Release is called twice";
Releasable releasable = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1024, false);
releasable.close();
expectThrows(AssertionError.class, assertionMessage, releasable::close);
releasable = shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024);
releasable.close();
expectThrows(AssertionError.class, assertionMessage, releasable::close);
releasable = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024, false);
releasable.close();
expectThrows(AssertionError.class, assertionMessage, releasable::close);
releasable = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024, false);
releasable.close();
expectThrows(AssertionError.class, assertionMessage, releasable::close);
}
}