diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java index 5b0c4ce0f6e..ea8d7148a30 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java @@ -36,6 +36,7 @@ import org.opensearch.LegacyESVersion; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.ShardIndexingPressureSettings; import java.io.IOException; import java.util.Collections; @@ -53,6 +54,8 @@ public class CommonStatsFlags implements Writeable, Cloneable { private String[] completionDataFields = null; private boolean includeSegmentFileSizes = false; private boolean includeUnloadedSegments = false; + private boolean includeAllShardIndexingPressureTrackers = false; + private boolean includeOnlyTopIndexingPressureMetrics = false; /** * @param flags flags to set. If no flags are supplied, default flags will be set. @@ -80,6 +83,10 @@ public class CommonStatsFlags implements Writeable, Cloneable { if (in.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) { includeUnloadedSegments = in.readBoolean(); } + if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) { + includeAllShardIndexingPressureTrackers = in.readBoolean(); + includeOnlyTopIndexingPressureMetrics = in.readBoolean(); + } } @Override @@ -98,6 +105,10 @@ public class CommonStatsFlags implements Writeable, Cloneable { if (out.getVersion().onOrAfter(LegacyESVersion.V_7_2_0)) { out.writeBoolean(includeUnloadedSegments); } + if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) { + out.writeBoolean(includeAllShardIndexingPressureTrackers); + out.writeBoolean(includeOnlyTopIndexingPressureMetrics); + } } /** @@ -111,6 +122,8 @@ public class CommonStatsFlags implements Writeable, Cloneable { completionDataFields = null; includeSegmentFileSizes = false; includeUnloadedSegments = false; + includeAllShardIndexingPressureTrackers = false; + includeOnlyTopIndexingPressureMetrics = false; return this; } @@ -125,6 +138,8 @@ public class CommonStatsFlags implements Writeable, Cloneable { completionDataFields = null; includeSegmentFileSizes = false; includeUnloadedSegments = false; + includeAllShardIndexingPressureTrackers = false; + includeOnlyTopIndexingPressureMetrics = false; return this; } @@ -198,10 +213,28 @@ public class CommonStatsFlags implements Writeable, Cloneable { return this; } + public CommonStatsFlags includeAllShardIndexingPressureTrackers(boolean includeAllShardPressureTrackers) { + this.includeAllShardIndexingPressureTrackers = includeAllShardPressureTrackers; + return this; + } + + public CommonStatsFlags includeOnlyTopIndexingPressureMetrics(boolean includeOnlyTopIndexingPressureMetrics) { + this.includeOnlyTopIndexingPressureMetrics = includeOnlyTopIndexingPressureMetrics; + return this; + } + public boolean includeUnloadedSegments() { return this.includeUnloadedSegments; } + public boolean includeAllShardIndexingPressureTrackers() { + return this.includeAllShardIndexingPressureTrackers; + } + + public boolean includeOnlyTopIndexingPressureMetrics() { + return this.includeOnlyTopIndexingPressureMetrics; + } + public boolean includeSegmentFileSizes() { return this.includeSegmentFileSizes; } diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressure.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressure.java new file mode 100644 index 00000000000..eda6a00ad09 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressure.java @@ -0,0 +1,350 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.index; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.ShardIndexingPressureTracker.CommonOperationTracker; +import org.opensearch.index.ShardIndexingPressureTracker.OperationTracker; +import org.opensearch.index.ShardIndexingPressureTracker.PerformanceTracker; +import org.opensearch.index.ShardIndexingPressureTracker.RejectionTracker; +import org.opensearch.index.ShardIndexingPressureTracker.StatsTracker; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.stats.ShardIndexingPressureStats; +import org.opensearch.index.stats.IndexingPressurePerShardStats; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Shard Indexing Pressure is a framework level artefact build on top of IndexingPressure to track incoming indexing request, per shard. + * The interfaces provided by this class will be used by Transport Action layers to start accounting for an incoming request. + * Interfaces returns Releasable which when triggered will release the acquired accounting tokens values and also + * perform necessary actions such as throughput evaluation once the request completes. + * Consumers of these interfaces are expected to trigger close on releasable, reliably for consistency. + * + * Overall ShardIndexingPressure provides: + * 1. Memory Accounting at shard level. This can be enabled/disabled based on dynamic setting. + * 2. Memory Accounting at Node level. Tracking is done using the IndexingPressure artefacts to support feature seamless toggling. + * 3. Interfaces to access the statistics for shard trackers. + */ +public class ShardIndexingPressure extends IndexingPressure { + + private static final Logger logger = LogManager.getLogger(ShardIndexingPressure.class); + private final ShardIndexingPressureSettings shardIndexingPressureSettings; + private final ShardIndexingPressureMemoryManager memoryManager; + + ShardIndexingPressure(Settings settings, ClusterService clusterService) { + super(settings); + shardIndexingPressureSettings = new ShardIndexingPressureSettings(clusterService, settings, primaryAndCoordinatingLimits); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + this.memoryManager = new ShardIndexingPressureMemoryManager(shardIndexingPressureSettings, clusterSettings, settings); + } + + public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { + if(0 == bytes) { return () -> {}; } + + long requestStartTime = System.nanoTime(); + ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId); + long nodeCombinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + long nodeReplicaBytes = currentReplicaBytes.get(); + long nodeTotalBytes = nodeCombinedBytes + nodeReplicaBytes; + long shardCombinedBytes = tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(bytes); + + boolean shardLevelLimitBreached = false; + if (forceExecution == false) { + boolean nodeLevelLimitBreached = memoryManager.isCoordinatingNodeLimitBreached(tracker, nodeTotalBytes); + if (nodeLevelLimitBreached == false) { + shardLevelLimitBreached = memoryManager.isCoordinatingShardLimitBreached(tracker, nodeTotalBytes, requestStartTime); + } + + if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) { + coordinatingRejections.getAndIncrement(); + currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes); + tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes); + rejectShardRequest(tracker, bytes, nodeTotalBytes, shardCombinedBytes, + tracker.getCoordinatingOperationTracker().getRejectionTracker(), "coordinating"); + } + } + currentCoordinatingBytes.addAndGet(bytes); + totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + totalCoordinatingBytes.addAndGet(bytes); + + StatsTracker statsTracker = tracker.getCoordinatingOperationTracker().getStatsTracker(); + statsTracker.incrementCurrentBytes(bytes); + markShardOperationStarted(statsTracker, tracker.getCoordinatingOperationTracker().getPerformanceTracker()); + boolean isShadowModeBreach = shardLevelLimitBreached; + + return wrapReleasable(() -> { + currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes); + currentCoordinatingBytes.addAndGet(-bytes); + markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getCoordinatingOperationTracker(), + tracker.getCommonOperationTracker()); + memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker); + tryReleaseTracker(tracker); + }); + } + + public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) { + if(bytes == 0) { return () -> {}; } + + ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId); + + currentPrimaryBytes.addAndGet(bytes); + totalPrimaryBytes.addAndGet(bytes); + tracker.getPrimaryOperationTracker().getStatsTracker().incrementCurrentBytes(bytes); + tracker.getPrimaryOperationTracker().getStatsTracker().incrementTotalBytes(bytes); + + return wrapReleasable(() -> { + currentPrimaryBytes.addAndGet(-bytes); + tracker.getPrimaryOperationTracker().getStatsTracker().incrementCurrentBytes(-bytes); + }); + } + + public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { + if(0 == bytes) { return () -> {}; } + + long requestStartTime = System.nanoTime(); + ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId); + long nodeCombinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + long nodeReplicaBytes = currentReplicaBytes.get(); + long nodeTotalBytes = nodeCombinedBytes + nodeReplicaBytes; + long shardCombinedBytes = tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(bytes); + + boolean shardLevelLimitBreached = false; + if (forceExecution == false) { + boolean nodeLevelLimitBreached = memoryManager.isPrimaryNodeLimitBreached(tracker, nodeTotalBytes); + if (nodeLevelLimitBreached == false) { + shardLevelLimitBreached = memoryManager.isPrimaryShardLimitBreached(tracker, nodeTotalBytes, requestStartTime); + } + + if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) { + primaryRejections.getAndIncrement(); + currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes); + tracker.getCommonOperationTracker().incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes); + rejectShardRequest(tracker, bytes, nodeTotalBytes, shardCombinedBytes, + tracker.getPrimaryOperationTracker().getRejectionTracker(), "primary"); + } + } + currentPrimaryBytes.addAndGet(bytes); + totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + totalPrimaryBytes.addAndGet(bytes); + + StatsTracker statsTracker = tracker.getPrimaryOperationTracker().getStatsTracker(); + statsTracker.incrementCurrentBytes(bytes); + markShardOperationStarted(statsTracker, tracker.getPrimaryOperationTracker().getPerformanceTracker()); + boolean isShadowModeBreach = shardLevelLimitBreached; + + return wrapReleasable(() -> { + currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes); + currentPrimaryBytes.addAndGet(-bytes); + markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getPrimaryOperationTracker(), + tracker.getCommonOperationTracker()); + memoryManager.decreaseShardPrimaryAndCoordinatingLimits(tracker); + tryReleaseTracker(tracker); + }); + } + + public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { + if(0 == bytes) { return () -> {}; } + + long requestStartTime = System.nanoTime(); + ShardIndexingPressureTracker tracker = getShardIndexingPressureTracker(shardId); + long nodeReplicaBytes = currentReplicaBytes.addAndGet(bytes); + long shardReplicaBytes = tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(bytes); + + boolean shardLevelLimitBreached = false; + if (forceExecution == false) { + boolean nodeLevelLimitBreached = memoryManager.isReplicaNodeLimitBreached(tracker, nodeReplicaBytes); + if (nodeLevelLimitBreached == false) { + shardLevelLimitBreached = memoryManager.isReplicaShardLimitBreached(tracker, nodeReplicaBytes, requestStartTime); + } + + if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) { + replicaRejections.getAndIncrement(); + currentReplicaBytes.addAndGet(-bytes); + tracker.getReplicaOperationTracker().getStatsTracker().incrementCurrentBytes(-bytes); + rejectShardRequest(tracker, bytes, nodeReplicaBytes, shardReplicaBytes, + tracker.getReplicaOperationTracker().getRejectionTracker(), "replica"); + } + } + totalReplicaBytes.addAndGet(bytes); + + StatsTracker statsTracker = tracker.getReplicaOperationTracker().getStatsTracker(); + markShardOperationStarted(statsTracker, tracker.getReplicaOperationTracker().getPerformanceTracker()); + boolean isShadowModeBreach = shardLevelLimitBreached; + + return wrapReleasable(() -> { + currentReplicaBytes.addAndGet(-bytes); + markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, tracker.getReplicaOperationTracker()); + memoryManager.decreaseShardReplicaLimits(tracker); + tryReleaseTracker(tracker); + }); + } + + private static Releasable wrapReleasable(Releasable releasable) { + final AtomicBoolean called = new AtomicBoolean(); + return () -> { + if (called.compareAndSet(false, true)) { + releasable.close(); + } else { + logger.error("ShardIndexingPressure Release is called twice", new IllegalStateException("Releasable is called twice")); + assert false : "ShardIndexingPressure Release is called twice"; + } + }; + } + + private boolean shouldRejectRequest(boolean nodeLevelLimitBreached, boolean shardLevelLimitBreached) { + return nodeLevelLimitBreached || + (shardLevelLimitBreached && shardIndexingPressureSettings.isShardIndexingPressureEnforced()); + } + + private void markShardOperationStarted(StatsTracker statsTracker, PerformanceTracker performanceTracker) { + statsTracker.incrementRequestCount(); + performanceTracker.incrementTotalOutstandingRequests(); + } + + private void adjustPerformanceUponCompletion(long bytes, long requestStartTime, StatsTracker statsTracker, + PerformanceTracker performanceTracker) { + long requestEndTime = System.nanoTime(); + long requestLatency = TimeUnit.NANOSECONDS.toMillis(requestEndTime - requestStartTime); + + performanceTracker.addLatencyInMillis(requestLatency); + performanceTracker.updateLastSuccessfulRequestTimestamp(requestEndTime); + performanceTracker.resetTotalOutstandingRequests(); + + if(requestLatency > 0) { + calculateRequestThroughput(bytes, requestLatency, performanceTracker, statsTracker); + } + } + + private void calculateRequestThroughput(long bytes, long requestLatency, PerformanceTracker performanceTracker, + StatsTracker statsTracker) { + double requestThroughput = (double) bytes / requestLatency; + performanceTracker.addNewThroughout(requestThroughput); + if (performanceTracker.getThroughputMovingQueueSize() > shardIndexingPressureSettings.getRequestSizeWindow()) { + double front = performanceTracker.getFirstThroughput(); + double movingAverage = memoryManager.calculateMovingAverage(performanceTracker.getThroughputMovingAverage(), front, + requestThroughput, shardIndexingPressureSettings.getRequestSizeWindow()); + performanceTracker.updateThroughputMovingAverage(Double.doubleToLongBits(movingAverage)); + } else { + double movingAverage = (double) statsTracker.getTotalBytes() / performanceTracker.getLatencyInMillis(); + performanceTracker.updateThroughputMovingAverage(Double.doubleToLongBits(movingAverage)); + } + } + + private void markShardOperationComplete(long bytes, long requestStartTime, boolean isShadowModeBreach, + OperationTracker operationTracker, CommonOperationTracker commonOperationTracker) { + commonOperationTracker.incrementCurrentCombinedCoordinatingAndPrimaryBytes(-bytes); + commonOperationTracker.incrementTotalCombinedCoordinatingAndPrimaryBytes(bytes); + markShardOperationComplete(bytes, requestStartTime, isShadowModeBreach, operationTracker); + } + + private void markShardOperationComplete(long bytes, long requestStartTime, boolean isShadowModeBreach, + OperationTracker operationTracker) { + + StatsTracker statsTracker = operationTracker.getStatsTracker(); + statsTracker.incrementCurrentBytes(-bytes); + statsTracker.incrementTotalBytes(bytes); + + // In shadow mode if request was intended to be rejected, we do not account it for dynamic rejection parameters + if (isShadowModeBreach == false) { + adjustPerformanceUponCompletion(bytes, requestStartTime, statsTracker, operationTracker.getPerformanceTracker()); + } + } + + private void tryReleaseTracker(ShardIndexingPressureTracker tracker) { + memoryManager.tryTrackerCleanupFromHotStore(tracker, + () -> (tracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes() == 0 && + tracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes() == 0)); + } + + private void rejectShardRequest(ShardIndexingPressureTracker tracker, long bytes, long nodeTotalBytes, long shardTotalBytes, + RejectionTracker rejectionTracker, String operationType) { + long nodeBytesWithoutOperation = nodeTotalBytes - bytes; + long shardBytesWithoutOperation = shardTotalBytes - bytes; + ShardId shardId = tracker.getShardId(); + + rejectionTracker.incrementTotalRejections(); + throw new OpenSearchRejectedExecutionException("rejected execution of " + operationType + " operation [" + + "shard_detail=[" + shardId.getIndexName() + "][" + shardId.id() + "], " + + "shard_total_bytes=" + shardBytesWithoutOperation + ", " + + "shard_operation_bytes=" + bytes + ", " + + "shard_max_coordinating_and_primary_bytes=" + tracker.getPrimaryAndCoordinatingLimits() + ", " + + "shard_max_replica_bytes=" + tracker.getReplicaLimits() + "] OR [" + + "node_total_bytes=" + nodeBytesWithoutOperation + ", " + + "node_operation_bytes=" + bytes + ", " + + "node_max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + ", " + + "node_max_replica_bytes=" + replicaLimits + "]", false); + } + + public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) { + if (statsFlags.includeOnlyTopIndexingPressureMetrics()) { + return topStats(); + } else { + ShardIndexingPressureStats allStats = shardStats(); + if (statsFlags.includeAllShardIndexingPressureTrackers()) { + allStats.addAll(coldStats()); + } + return allStats; + } + } + + ShardIndexingPressureStats shardStats() { + Map statsPerShard = new HashMap<>(); + boolean isEnforcedMode = shardIndexingPressureSettings.isShardIndexingPressureEnforced(); + + for (Map.Entry shardEntry : + memoryManager.getShardIndexingPressureHotStore().entrySet()) { + IndexingPressurePerShardStats shardStats = new IndexingPressurePerShardStats(shardEntry.getValue(), isEnforcedMode); + statsPerShard.put(shardEntry.getKey(), shardStats); + } + return new ShardIndexingPressureStats(statsPerShard, memoryManager.getTotalNodeLimitsBreachedRejections(), + memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections(), + memoryManager.getTotalThroughputDegradationLimitsBreachedRejections(), + shardIndexingPressureSettings.isShardIndexingPressureEnabled(), isEnforcedMode); + } + + ShardIndexingPressureStats coldStats() { + Map statsPerShard = new HashMap<>(); + boolean isEnforcedMode = shardIndexingPressureSettings.isShardIndexingPressureEnforced(); + + for (Map.Entry shardEntry : + memoryManager.getShardIndexingPressureColdStore().entrySet()) { + IndexingPressurePerShardStats shardStats = new IndexingPressurePerShardStats(shardEntry.getValue(), isEnforcedMode); + statsPerShard.put(shardEntry.getKey(), shardStats); + } + return new ShardIndexingPressureStats(statsPerShard, memoryManager.getTotalNodeLimitsBreachedRejections(), + memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections(), + memoryManager.getTotalThroughputDegradationLimitsBreachedRejections(), + shardIndexingPressureSettings.isShardIndexingPressureEnabled(), isEnforcedMode); + } + + ShardIndexingPressureStats topStats() { + return new ShardIndexingPressureStats(Collections.emptyMap(), memoryManager.getTotalNodeLimitsBreachedRejections(), + memoryManager.getTotalLastSuccessfulRequestLimitsBreachedRejections(), + memoryManager.getTotalThroughputDegradationLimitsBreachedRejections(), + shardIndexingPressureSettings.isShardIndexingPressureEnabled(), + shardIndexingPressureSettings.isShardIndexingPressureEnforced()); + } + + ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) { + return memoryManager.getShardIndexingPressureTracker(shardId); + } + + public boolean isShardIndexingPressureEnabled() { + return shardIndexingPressureSettings.isShardIndexingPressureEnabled(); + } +} diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java index d65ff885b83..271bd30cf35 100644 --- a/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureMemoryManager.java @@ -282,6 +282,26 @@ public class ShardIndexingPressureMemoryManager { return shardIndexingPressureStore.getShardIndexingPressureTracker(shardId); } + Map getShardIndexingPressureHotStore() { + return shardIndexingPressureStore.getShardIndexingPressureHotStore(); + } + + Map getShardIndexingPressureColdStore() { + return shardIndexingPressureStore.getShardIndexingPressureColdStore(); + } + + void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, BooleanSupplier condition) { + shardIndexingPressureStore.tryTrackerCleanupFromHotStore(tracker, condition); + } + + double calculateMovingAverage(long currentAverage, double frontValue, double currentValue, int count) { + if(count > 0) { + return ((Double.longBitsToDouble(currentAverage) * count) + currentValue - frontValue) / count; + } else { + return currentValue; + } + } + long getTotalNodeLimitsBreachedRejections() { return totalNodeLimitsBreachedRejections.get(); } @@ -417,7 +437,7 @@ public class ShardIndexingPressureMemoryManager { */ private boolean evaluateLastSuccessfulRequestDurationLimitsBreached(PerformanceTracker performanceTracker, long requestStartTime) { return (performanceTracker.getLastSuccessfulRequestTimestamp() > 0) && - (requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.millis() && + (requestStartTime - performanceTracker.getLastSuccessfulRequestTimestamp()) > this.successfulRequestElapsedTimeout.nanos() && performanceTracker.getTotalOutstandingRequests() > this.maxOutstandingRequests; } diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java index a2ebc45cf12..e63425eef82 100644 --- a/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureTracker.java @@ -139,6 +139,10 @@ public class ShardIndexingPressureTracker { public long getRequestCount() { return requestCount.get(); } + + public long incrementRequestCount() { + return requestCount.incrementAndGet(); + } } /** @@ -161,6 +165,10 @@ public class ShardIndexingPressureTracker { return totalRejections.get(); } + public long incrementTotalRejections() { + return totalRejections.incrementAndGet(); + } + public long getNodeLimitsBreachedRejections() { return nodeLimitsBreachedRejections.get(); } @@ -232,6 +240,10 @@ public class ShardIndexingPressureTracker { return totalOutstandingRequests.incrementAndGet(); } + public void resetTotalOutstandingRequests() { + totalOutstandingRequests.set(0L); + } + public long getThroughputMovingAverage() { return throughputMovingAverage.get(); } @@ -275,5 +287,9 @@ public class ShardIndexingPressureTracker { public long getTotalCombinedCoordinatingAndPrimaryBytes() { return totalCombinedCoordinatingAndPrimaryBytes.get(); } + + public long incrementTotalCombinedCoordinatingAndPrimaryBytes(long bytes) { + return totalCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + } } } diff --git a/server/src/main/java/org/opensearch/index/stats/IndexingPressurePerShardStats.java b/server/src/main/java/org/opensearch/index/stats/IndexingPressurePerShardStats.java new file mode 100644 index 00000000000..ea4bd29e12b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/stats/IndexingPressurePerShardStats.java @@ -0,0 +1,423 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.stats; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.ShardIndexingPressureTracker; + +import java.io.IOException; + +public class IndexingPressurePerShardStats implements Writeable, ToXContentFragment { + + private final String shardId; + + private final long totalCombinedCoordinatingAndPrimaryBytes; + private final long totalCoordinatingBytes; + private final long totalPrimaryBytes; + private final long totalReplicaBytes; + + private final long currentCombinedCoordinatingAndPrimaryBytes; + private final long currentCoordinatingBytes; + private final long currentPrimaryBytes; + private final long currentReplicaBytes; + + private final long totalCoordinatingCount; + private final long totalPrimaryCount; + private final long totalReplicaCount; + + private final long coordinatingRejections; + private final long coordinatingNodeLimitsBreachedRejections; + private final long coordinatingLastSuccessfulRequestLimitsBreachedRejections; + private final long coordinatingThroughputDegradationLimitsBreachedRejections; + + private final long primaryRejections; + private final long primaryNodeLimitsBreachedRejections; + private final long primaryLastSuccessfulRequestLimitsBreachedRejections; + private final long primaryThroughputDegradationLimitsBreachedRejections; + + private final long replicaRejections; + private final long replicaNodeLimitsBreachedRejections; + private final long replicaLastSuccessfulRequestLimitsBreachedRejections; + private final long replicaThroughputDegradationLimitsBreachedRejections; + + private final long coordinatingTimeInMillis; + private final long primaryTimeInMillis; + private final long replicaTimeInMillis; + + private final long coordinatingLastSuccessfulRequestTimestampInMillis; + private final long primaryLastSuccessfulRequestTimestampInMillis; + private final long replicaLastSuccessfulRequestTimestampInMillis; + + private final long currentPrimaryAndCoordinatingLimits; + private final long currentReplicaLimits; + + private final boolean shardIndexingPressureEnforced; + + public IndexingPressurePerShardStats(StreamInput in) throws IOException { + shardId = in.readString(); + shardIndexingPressureEnforced = in.readBoolean(); + + totalCombinedCoordinatingAndPrimaryBytes = in.readVLong(); + totalCoordinatingBytes = in.readVLong(); + totalPrimaryBytes = in.readVLong(); + totalReplicaBytes = in.readVLong(); + + currentCombinedCoordinatingAndPrimaryBytes = in.readVLong(); + currentCoordinatingBytes = in.readVLong(); + currentPrimaryBytes = in.readVLong(); + currentReplicaBytes = in.readVLong(); + + totalCoordinatingCount = in.readVLong(); + totalPrimaryCount = in.readVLong(); + totalReplicaCount = in.readVLong(); + + coordinatingRejections = in.readVLong(); + coordinatingNodeLimitsBreachedRejections = in.readVLong(); + coordinatingLastSuccessfulRequestLimitsBreachedRejections = in.readVLong(); + coordinatingThroughputDegradationLimitsBreachedRejections = in.readVLong(); + + primaryRejections = in.readVLong(); + primaryNodeLimitsBreachedRejections = in.readVLong(); + primaryLastSuccessfulRequestLimitsBreachedRejections = in.readVLong(); + primaryThroughputDegradationLimitsBreachedRejections = in.readVLong(); + + replicaRejections = in.readVLong(); + replicaNodeLimitsBreachedRejections = in.readVLong(); + replicaLastSuccessfulRequestLimitsBreachedRejections = in.readVLong(); + replicaThroughputDegradationLimitsBreachedRejections = in.readVLong(); + + coordinatingTimeInMillis = in.readVLong(); + primaryTimeInMillis = in.readVLong(); + replicaTimeInMillis = in.readVLong(); + + coordinatingLastSuccessfulRequestTimestampInMillis = in.readVLong(); + primaryLastSuccessfulRequestTimestampInMillis = in.readVLong(); + replicaLastSuccessfulRequestTimestampInMillis = in.readVLong(); + + currentPrimaryAndCoordinatingLimits = in.readVLong(); + currentReplicaLimits = in.readVLong(); + } + + public IndexingPressurePerShardStats(ShardIndexingPressureTracker shardIndexingPressureTracker, boolean shardIndexingPressureEnforced) { + + shardId = shardIndexingPressureTracker.getShardId().toString(); + this.shardIndexingPressureEnforced = shardIndexingPressureEnforced; + + totalCombinedCoordinatingAndPrimaryBytes = + shardIndexingPressureTracker.getCommonOperationTracker().getTotalCombinedCoordinatingAndPrimaryBytes(); + totalCoordinatingBytes = shardIndexingPressureTracker.getCoordinatingOperationTracker().getStatsTracker().getTotalBytes(); + totalPrimaryBytes = shardIndexingPressureTracker.getPrimaryOperationTracker().getStatsTracker().getTotalBytes(); + totalReplicaBytes = shardIndexingPressureTracker.getReplicaOperationTracker().getStatsTracker().getTotalBytes(); + + currentCombinedCoordinatingAndPrimaryBytes = + shardIndexingPressureTracker.getCommonOperationTracker().getCurrentCombinedCoordinatingAndPrimaryBytes(); + currentCoordinatingBytes = shardIndexingPressureTracker.getCoordinatingOperationTracker().getStatsTracker().getCurrentBytes(); + currentPrimaryBytes = shardIndexingPressureTracker.getPrimaryOperationTracker().getStatsTracker().getCurrentBytes(); + currentReplicaBytes = shardIndexingPressureTracker.getReplicaOperationTracker().getStatsTracker().getCurrentBytes(); + + totalCoordinatingCount = shardIndexingPressureTracker.getCoordinatingOperationTracker().getStatsTracker().getRequestCount(); + totalPrimaryCount = shardIndexingPressureTracker.getPrimaryOperationTracker().getStatsTracker().getRequestCount(); + totalReplicaCount = shardIndexingPressureTracker.getReplicaOperationTracker().getStatsTracker().getRequestCount(); + + coordinatingRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker().getRejectionTracker().getTotalRejections(); + coordinatingNodeLimitsBreachedRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker() + .getRejectionTracker().getNodeLimitsBreachedRejections(); + coordinatingLastSuccessfulRequestLimitsBreachedRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker() + .getRejectionTracker().getLastSuccessfulRequestLimitsBreachedRejections(); + coordinatingThroughputDegradationLimitsBreachedRejections = shardIndexingPressureTracker.getCoordinatingOperationTracker() + .getRejectionTracker().getThroughputDegradationLimitsBreachedRejections(); + + primaryRejections = shardIndexingPressureTracker.getPrimaryOperationTracker().getRejectionTracker().getTotalRejections(); + primaryNodeLimitsBreachedRejections = shardIndexingPressureTracker.getPrimaryOperationTracker() + .getRejectionTracker().getNodeLimitsBreachedRejections(); + primaryLastSuccessfulRequestLimitsBreachedRejections = shardIndexingPressureTracker.getPrimaryOperationTracker() + .getRejectionTracker().getLastSuccessfulRequestLimitsBreachedRejections(); + primaryThroughputDegradationLimitsBreachedRejections = shardIndexingPressureTracker.getPrimaryOperationTracker() + .getRejectionTracker().getThroughputDegradationLimitsBreachedRejections(); + + replicaRejections = shardIndexingPressureTracker.getReplicaOperationTracker().getRejectionTracker().getTotalRejections(); + replicaNodeLimitsBreachedRejections = shardIndexingPressureTracker.getReplicaOperationTracker() + .getRejectionTracker().getNodeLimitsBreachedRejections(); + replicaLastSuccessfulRequestLimitsBreachedRejections = shardIndexingPressureTracker.getReplicaOperationTracker() + .getRejectionTracker().getLastSuccessfulRequestLimitsBreachedRejections(); + replicaThroughputDegradationLimitsBreachedRejections = shardIndexingPressureTracker.getReplicaOperationTracker() + .getRejectionTracker().getThroughputDegradationLimitsBreachedRejections(); + + coordinatingTimeInMillis = shardIndexingPressureTracker.getCoordinatingOperationTracker().getPerformanceTracker() + .getLatencyInMillis(); + primaryTimeInMillis = shardIndexingPressureTracker.getPrimaryOperationTracker().getPerformanceTracker() + .getLatencyInMillis(); + replicaTimeInMillis = shardIndexingPressureTracker.getReplicaOperationTracker().getPerformanceTracker() + .getLatencyInMillis(); + + coordinatingLastSuccessfulRequestTimestampInMillis = shardIndexingPressureTracker.getCoordinatingOperationTracker() + .getPerformanceTracker().getLastSuccessfulRequestTimestamp(); + primaryLastSuccessfulRequestTimestampInMillis = shardIndexingPressureTracker.getPrimaryOperationTracker() + .getPerformanceTracker().getLastSuccessfulRequestTimestamp(); + replicaLastSuccessfulRequestTimestampInMillis = shardIndexingPressureTracker.getReplicaOperationTracker() + .getPerformanceTracker().getLastSuccessfulRequestTimestamp(); + + currentPrimaryAndCoordinatingLimits = shardIndexingPressureTracker.getPrimaryAndCoordinatingLimits(); + currentReplicaLimits = shardIndexingPressureTracker.getReplicaLimits(); + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(shardId); + out.writeBoolean(shardIndexingPressureEnforced); + + out.writeVLong(totalCombinedCoordinatingAndPrimaryBytes); + out.writeVLong(totalCoordinatingBytes); + out.writeVLong(totalPrimaryBytes); + out.writeVLong(totalReplicaBytes); + + out.writeVLong(currentCombinedCoordinatingAndPrimaryBytes); + out.writeVLong(currentCoordinatingBytes); + out.writeVLong(currentPrimaryBytes); + out.writeVLong(currentReplicaBytes); + + out.writeVLong(totalCoordinatingCount); + out.writeVLong(totalPrimaryCount); + out.writeVLong(totalReplicaCount); + + out.writeVLong(coordinatingRejections); + out.writeVLong(coordinatingNodeLimitsBreachedRejections); + out.writeVLong(coordinatingLastSuccessfulRequestLimitsBreachedRejections); + out.writeVLong(coordinatingThroughputDegradationLimitsBreachedRejections); + + out.writeVLong(primaryRejections); + out.writeVLong(primaryNodeLimitsBreachedRejections); + out.writeVLong(primaryLastSuccessfulRequestLimitsBreachedRejections); + out.writeVLong(primaryThroughputDegradationLimitsBreachedRejections); + + out.writeVLong(replicaRejections); + out.writeVLong(replicaNodeLimitsBreachedRejections); + out.writeVLong(replicaLastSuccessfulRequestLimitsBreachedRejections); + out.writeVLong(replicaThroughputDegradationLimitsBreachedRejections); + + out.writeVLong(coordinatingTimeInMillis); + out.writeVLong(primaryTimeInMillis); + out.writeVLong(replicaTimeInMillis); + + out.writeVLong(coordinatingLastSuccessfulRequestTimestampInMillis); + out.writeVLong(primaryLastSuccessfulRequestTimestampInMillis); + out.writeVLong(replicaLastSuccessfulRequestTimestampInMillis); + + out.writeVLong(currentPrimaryAndCoordinatingLimits); + out.writeVLong(currentReplicaLimits); + } + + public long getTotalCombinedCoordinatingAndPrimaryBytes() { + return totalCombinedCoordinatingAndPrimaryBytes; + } + + public long getTotalCoordinatingBytes() { + return totalCoordinatingBytes; + } + + public long getTotalPrimaryBytes() { + return totalPrimaryBytes; + } + + public long getTotalReplicaBytes() { + return totalReplicaBytes; + } + + public long getCurrentCombinedCoordinatingAndPrimaryBytes() { + return currentCombinedCoordinatingAndPrimaryBytes; + } + + public long getCurrentCoordinatingBytes() { + return currentCoordinatingBytes; + } + + public long getCurrentPrimaryBytes() { + return currentPrimaryBytes; + } + + public long getCurrentReplicaBytes() { + return currentReplicaBytes; + } + + public long getCoordinatingRejections() { + return coordinatingRejections; + } + + public long getCoordinatingNodeLimitsBreachedRejections() { + return coordinatingNodeLimitsBreachedRejections; + } + + public long getCoordinatingLastSuccessfulRequestLimitsBreachedRejections() { + return coordinatingLastSuccessfulRequestLimitsBreachedRejections; + } + + public long getCoordinatingThroughputDegradationLimitsBreachedRejections() { + return coordinatingThroughputDegradationLimitsBreachedRejections; + } + + public long getPrimaryRejections() { + return primaryRejections; + } + + public long getPrimaryNodeLimitsBreachedRejections() { + return primaryNodeLimitsBreachedRejections; + } + + public long getPrimaryLastSuccessfulRequestLimitsBreachedRejections() { + return primaryLastSuccessfulRequestLimitsBreachedRejections; + } + + public long getPrimaryThroughputDegradationLimitsBreachedRejections() { + return primaryThroughputDegradationLimitsBreachedRejections; + } + + public long getReplicaRejections() { + return replicaRejections; + } + + public long getReplicaNodeLimitsBreachedRejections() { + return replicaNodeLimitsBreachedRejections; + } + + public long getReplicaLastSuccessfulRequestLimitsBreachedRejections() { + return replicaLastSuccessfulRequestLimitsBreachedRejections; + } + + public long getReplicaThroughputDegradationLimitsBreachedRejections() { + return replicaThroughputDegradationLimitsBreachedRejections; + } + + public long getCurrentPrimaryAndCoordinatingLimits() { + return currentPrimaryAndCoordinatingLimits; + } + + public long getCurrentReplicaLimits() { + return currentReplicaLimits; + } + + private static final String COORDINATING = "coordinating"; + private static final String COORDINATING_IN_BYTES = "coordinating_in_bytes"; + private static final String COORDINATING_COUNT = "coordinating_count"; + private static final String PRIMARY = "primary"; + private static final String PRIMARY_IN_BYTES = "primary_in_bytes"; + private static final String PRIMARY_COUNT = "primary_count"; + private static final String REPLICA = "replica"; + private static final String REPLICA_IN_BYTES = "replica_in_bytes"; + private static final String REPLICA_COUNT = "replica_count"; + private static final String COORDINATING_REJECTIONS = "coordinating_rejections"; + private static final String PRIMARY_REJECTIONS = "primary_rejections"; + private static final String REPLICA_REJECTIONS = "replica_rejections"; + private static final String BREAKUP_NODE_LIMITS = "node_limits"; + private static final String BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS = "no_successful_request_limits"; + private static final String BREAKUP_THROUGHPUT_DEGRADATION_LIMIT = "throughput_degradation_limits"; + private static final String COORDINATING_TIME_IN_MILLIS = "coordinating_time_in_millis"; + private static final String PRIMARY_TIME_IN_MILLIS = "primary_time_in_millis"; + private static final String REPLICA_TIME_IN_MILLIS = "replica_time_in_millis"; + private static final String COORDINATING_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS = + "coordinating_last_successful_request_timestamp_in_millis"; + private static final String PRIMARY_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS = + "primary_last_successful_request_timestamp_in_millis"; + private static final String REPLICA_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS = "replica_last_successful_request_timestamp_in_millis"; + private static final String CURRENT_COORDINATING_AND_PRIMARY_LIMITS_IN_BYTES = "current_coordinating_and_primary_limits_in_bytes"; + private static final String CURRENT_REPLICA_LIMITS_IN_BYTES = "current_replica_limits_in_bytes"; + private static final String CURRENT_COORDINATING_AND_PRIMARY_IN_BYTES = "current_coordinating_and_primary_bytes"; + private static final String CURRENT_REPLICA_IN_BYTES = "current_replica_bytes"; + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(shardId); + + builder.startObject("memory"); + builder.startObject("current"); + builder.humanReadableField(COORDINATING_IN_BYTES, COORDINATING, new ByteSizeValue(currentCoordinatingBytes)); + builder.humanReadableField(PRIMARY_IN_BYTES, PRIMARY, new ByteSizeValue(currentPrimaryBytes)); + builder.humanReadableField(REPLICA_IN_BYTES, REPLICA, new ByteSizeValue(currentReplicaBytes)); + builder.endObject(); + builder.startObject("total"); + builder.humanReadableField(COORDINATING_IN_BYTES, COORDINATING, new ByteSizeValue(totalCoordinatingBytes)); + builder.humanReadableField(PRIMARY_IN_BYTES, PRIMARY, new ByteSizeValue(totalPrimaryBytes)); + builder.humanReadableField(REPLICA_IN_BYTES, REPLICA, new ByteSizeValue(totalReplicaBytes)); + builder.endObject(); + builder.endObject(); + + builder.startObject("rejection"); + builder.startObject("coordinating"); + builder.field(COORDINATING_REJECTIONS, coordinatingRejections); + if (shardIndexingPressureEnforced) { + builder.startObject("breakup"); + } else { + builder.startObject("breakup_shadow_mode"); + } + builder.field(BREAKUP_NODE_LIMITS, coordinatingNodeLimitsBreachedRejections); + builder.field(BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS, coordinatingLastSuccessfulRequestLimitsBreachedRejections); + builder.field(BREAKUP_THROUGHPUT_DEGRADATION_LIMIT, coordinatingThroughputDegradationLimitsBreachedRejections); + builder.endObject(); + builder.endObject(); + builder.startObject("primary"); + builder.field(PRIMARY_REJECTIONS, primaryRejections); + if (shardIndexingPressureEnforced) { + builder.startObject("breakup"); + } else { + builder.startObject("breakup_shadow_mode"); + } + builder.field(BREAKUP_NODE_LIMITS, primaryNodeLimitsBreachedRejections); + builder.field(BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS, primaryLastSuccessfulRequestLimitsBreachedRejections); + builder.field(BREAKUP_THROUGHPUT_DEGRADATION_LIMIT, primaryThroughputDegradationLimitsBreachedRejections); + builder.endObject(); + builder.endObject(); + builder.startObject("replica"); + builder.field(REPLICA_REJECTIONS, replicaRejections); + if (shardIndexingPressureEnforced) { + builder.startObject("breakup"); + } else { + builder.startObject("breakup_shadow_mode"); + } + builder.field(BREAKUP_NODE_LIMITS, replicaNodeLimitsBreachedRejections); + builder.field(BREAKUP_NO_SUCCESSFUL_REQUEST_LIMITS, replicaLastSuccessfulRequestLimitsBreachedRejections); + builder.field(BREAKUP_THROUGHPUT_DEGRADATION_LIMIT, replicaThroughputDegradationLimitsBreachedRejections); + builder.endObject(); + builder.endObject(); + builder.endObject(); + + builder.startObject("last_successful_timestamp"); + builder.field(COORDINATING_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS, coordinatingLastSuccessfulRequestTimestampInMillis); + builder.field(PRIMARY_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS, primaryLastSuccessfulRequestTimestampInMillis); + builder.field(REPLICA_LAST_SUCCESSFUL_REQUEST_TIMESTAMP_IN_MILLIS, replicaLastSuccessfulRequestTimestampInMillis); + builder.endObject(); + + builder.startObject("indexing"); + builder.field(COORDINATING_TIME_IN_MILLIS, coordinatingTimeInMillis); + builder.field(COORDINATING_COUNT, totalCoordinatingCount); + builder.field(PRIMARY_TIME_IN_MILLIS, primaryTimeInMillis); + builder.field(PRIMARY_COUNT, totalPrimaryCount); + builder.field(REPLICA_TIME_IN_MILLIS, replicaTimeInMillis); + builder.field(REPLICA_COUNT, totalReplicaCount); + builder.endObject(); + + builder.startObject("memory_allocation"); + builder.startObject("current"); + builder.field(CURRENT_COORDINATING_AND_PRIMARY_IN_BYTES, currentCombinedCoordinatingAndPrimaryBytes); + builder.field(CURRENT_REPLICA_IN_BYTES, currentReplicaBytes); + builder.endObject(); + builder.startObject("limit"); + builder.field(CURRENT_COORDINATING_AND_PRIMARY_LIMITS_IN_BYTES, currentPrimaryAndCoordinatingLimits); + builder.field(CURRENT_REPLICA_LIMITS_IN_BYTES, currentReplicaLimits); + builder.endObject(); + builder.endObject(); + + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/index/stats/ShardIndexingPressureStats.java b/server/src/main/java/org/opensearch/index/stats/ShardIndexingPressureStats.java new file mode 100644 index 00000000000..f2f04d3d60e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/stats/ShardIndexingPressureStats.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.stats; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class ShardIndexingPressureStats implements Writeable, ToXContentFragment { + + private final Map shardIndexingPressureStore; + private final long totalNodeLimitsBreachedRejections; + private final long totalLastSuccessfulRequestLimitsBreachedRejections; + private final long totalThroughputDegradationLimitsBreachedRejections; + private final boolean shardIndexingPressureEnabled; + private final boolean shardIndexingPressureEnforced; + + public ShardIndexingPressureStats(StreamInput in) throws IOException { + int shardEntries = in.readInt(); + shardIndexingPressureStore = new HashMap<>(); + for (int i = 0; i < shardEntries; i++) { + ShardId shardId = new ShardId(in); + IndexingPressurePerShardStats shardStats = new IndexingPressurePerShardStats(in); + shardIndexingPressureStore.put(shardId, shardStats); + } + totalNodeLimitsBreachedRejections = in.readVLong(); + totalLastSuccessfulRequestLimitsBreachedRejections = in.readVLong(); + totalThroughputDegradationLimitsBreachedRejections = in.readVLong(); + shardIndexingPressureEnabled = in.readBoolean(); + shardIndexingPressureEnforced = in.readBoolean(); + } + + public ShardIndexingPressureStats(Map shardIndexingPressureStore, + long totalNodeLimitsBreachedRejections, + long totalLastSuccessfulRequestLimitsBreachedRejections, + long totalThroughputDegradationLimitsBreachedRejections, + boolean shardIndexingPressureEnabled, + boolean shardIndexingPressureEnforced) { + this.shardIndexingPressureStore = shardIndexingPressureStore; + this.totalNodeLimitsBreachedRejections = totalNodeLimitsBreachedRejections; + this.totalLastSuccessfulRequestLimitsBreachedRejections = totalLastSuccessfulRequestLimitsBreachedRejections; + this.totalThroughputDegradationLimitsBreachedRejections = totalThroughputDegradationLimitsBreachedRejections; + this.shardIndexingPressureEnabled = shardIndexingPressureEnabled; + this.shardIndexingPressureEnforced = shardIndexingPressureEnforced; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(shardIndexingPressureStore.size()); + for (Map.Entry entry : shardIndexingPressureStore.entrySet()) { + entry.getKey().writeTo(out); + entry.getValue().writeTo(out); + } + out.writeVLong(totalNodeLimitsBreachedRejections); + out.writeVLong(totalLastSuccessfulRequestLimitsBreachedRejections); + out.writeVLong(totalThroughputDegradationLimitsBreachedRejections); + out.writeBoolean(shardIndexingPressureEnabled); + out.writeBoolean(shardIndexingPressureEnforced); + } + + public IndexingPressurePerShardStats getIndexingPressureShardStats(ShardId shardId) { + return shardIndexingPressureStore.get(shardId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject("shard_indexing_pressure"); + builder.startObject("stats"); + for (Map.Entry entry : shardIndexingPressureStore.entrySet()) { + entry.getValue().toXContent(builder, params); + } + builder.endObject(); + if (shardIndexingPressureEnforced) { + builder.startObject("total_rejections_breakup"); + } else { + builder.startObject("total_rejections_breakup_shadow_mode"); + } + builder.field("node_limits", totalNodeLimitsBreachedRejections); + builder.field("no_successful_request_limits", totalLastSuccessfulRequestLimitsBreachedRejections); + builder.field("throughput_degradation_limits", totalThroughputDegradationLimitsBreachedRejections); + builder.endObject(); + builder.field("enabled", shardIndexingPressureEnabled); + builder.field("enforced", shardIndexingPressureEnforced); + return builder.endObject(); + } + + public void addAll(ShardIndexingPressureStats shardIndexingPressureStats) { + if (this.shardIndexingPressureStore != null) { + this.shardIndexingPressureStore.putAll(shardIndexingPressureStats.shardIndexingPressureStore); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java new file mode 100644 index 00000000000..40c9a32b4ff --- /dev/null +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureConcurrentExecutionTests.java @@ -0,0 +1,849 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.hamcrest.Matchers; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.stats.IndexingPressurePerShardStats; +import org.opensearch.index.stats.IndexingPressureStats; +import org.opensearch.index.stats.ShardIndexingPressureStats; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.concurrent.atomic.AtomicInteger; + +public class ShardIndexingPressureConcurrentExecutionTests extends OpenSearchTestCase { + + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100) + .build(); + + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + public enum OperationType { COORDINATING, PRIMARY, REPLICA } + + public void testCoordinatingPrimaryThreadedUpdateToShardLimits() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 500); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + Releasable[] releasable; + if (randomBoolean) { + releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.COORDINATING); + } else { + releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.PRIMARY); + } + + if(randomBoolean) { + assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + } else { + assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + } + assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertTrue((double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits() < 0.95); + assertTrue((double) (NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits() > 0.75); + + for (int i = 0; i < NUM_THREADS; i++) { + releasable[i].close(); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + if(randomBoolean) { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + } else { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedUpdateToShardLimits() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 500); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + Releasable[] releasable = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.REPLICA); + + assertEquals(NUM_THREADS * 15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentReplicaBytes()); + assertTrue((double)(NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentReplicaLimits() < 0.95); + assertTrue((double)(NUM_THREADS * 15) / shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentReplicaLimits() > 0.75); + + for (int i = 0; i < NUM_THREADS; i++) { + releasable[i].close(); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryThreadedSimultaneousUpdateToShardLimits() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 500); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + + if (randomBoolean) { + fireAndCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 100, OperationType.COORDINATING); + } else { + fireAndCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 100, OperationType.PRIMARY); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + if(randomBoolean) { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + } else { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedSimultaneousUpdateToShardLimits() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 500); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + fireAndCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 100, OperationType.REPLICA); + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryThreadedUpdateToShardLimitsWithRandomBytes() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 400); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + + if (randomBoolean) { + fireAllThenCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.COORDINATING); + } else { + fireAllThenCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.PRIMARY); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + if(randomBoolean) { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + } else { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedUpdateToShardLimitsWithRandomBytes() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 400); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + fireAllThenCompleteConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.REPLICA); + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryThreadedUpdateToShardLimitsAndRejections() throws Exception { + final int NUM_THREADS = 100; + final Thread[] threads = new Thread[NUM_THREADS]; + final Releasable[] releasables = new Releasable[NUM_THREADS]; + AtomicInteger rejectionCount = new AtomicInteger(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + for (int i = 0; i < NUM_THREADS; i++) { + int counter = i; + threads[i] = new Thread(() -> { + try { + if(randomBoolean) { + releasables[counter] = shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 200, false); + } else { + releasables[counter] = shardIndexingPressure.markPrimaryOperationStarted(shardId1, 200, false); + } + } catch (OpenSearchRejectedExecutionException e) { + rejectionCount.addAndGet(1); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + ShardIndexingPressureStats shardStats = shardIndexingPressure.shardStats(); + if(randomBoolean) { + assertEquals(rejectionCount.get(), nodeStats.getCoordinatingRejections()); + assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentCoordinatingBytes() < 50 * 200); + } else { + assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentPrimaryBytes() < 50 * 200); + assertEquals(rejectionCount.get(), nodeStats.getPrimaryRejections()); + } + assertTrue(nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes() < 50 * 200); + assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentCombinedCoordinatingAndPrimaryBytes() < 50 * 200); + + for (int i = 0; i < NUM_THREADS - rejectionCount.get(); i++) { + releasables[i].close(); + } + + nodeStats = shardIndexingPressure.stats(); + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + shardStats = shardIndexingPressure.coldStats(); + if(randomBoolean) { + assertEquals(rejectionCount.get(), nodeStats.getCoordinatingRejections()); + assertEquals(rejectionCount.get(), shardStats.getIndexingPressureShardStats(shardId1) + .getCoordinatingNodeLimitsBreachedRejections()); + assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentCoordinatingBytes()); + } else { + assertEquals(rejectionCount.get(), nodeStats.getPrimaryRejections()); + assertEquals(rejectionCount.get(), shardStats.getIndexingPressureShardStats(shardId1) + .getPrimaryNodeLimitsBreachedRejections()); + assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentPrimaryBytes()); + } + + assertEquals(0, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getIndexingPressureShardStats(shardId1).getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedUpdateToShardLimitsAndRejections() throws Exception { + final int NUM_THREADS = 100; + final Thread[] threads = new Thread[NUM_THREADS]; + final Releasable[] releasables = new Releasable[NUM_THREADS]; + AtomicInteger rejectionCount = new AtomicInteger(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + for (int i = 0; i < NUM_THREADS; i++) { + int counter = i; + threads[i] = new Thread(() -> { + try { + releasables[counter] = shardIndexingPressure.markReplicaOperationStarted(shardId1, 300, false); + } catch (OpenSearchRejectedExecutionException e) { + rejectionCount.addAndGet(1); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(rejectionCount.get(), nodeStats.getReplicaRejections()); + assertTrue(nodeStats.getCurrentReplicaBytes() < 50 * 300); + + ShardIndexingPressureStats shardStats = shardIndexingPressure.shardStats(); + assertTrue(shardStats.getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes() < 50 * 300); + + for (int i = 0; i < releasables.length - 1; i++) { + if(releasables[i] != null) { + releasables[i].close(); + } + } + + nodeStats = shardIndexingPressure.stats(); + assertEquals(rejectionCount.get(), nodeStats.getReplicaRejections()); + assertEquals(0, nodeStats.getCurrentReplicaBytes()); + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + shardStats = shardIndexingPressure.coldStats(); + assertEquals(rejectionCount.get(), shardStats.getIndexingPressureShardStats(shardId1) + .getReplicaNodeLimitsBreachedRejections()); + assertEquals(0, shardStats.getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(15, shardStats.getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryConcurrentUpdatesOnShardIndexingPressureTrackerObjects() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 400); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "new_uuid"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + Releasable[] releasables; + if(randomBoolean) { + releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.COORDINATING); + } else { + releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 15, OperationType.PRIMARY); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats() + .getIndexingPressureShardStats(shardId1); + assertThat(shardStoreStats.getCurrentPrimaryAndCoordinatingLimits(), Matchers.greaterThan(100L)); + + CommonStatsFlags statsFlag = new CommonStatsFlags(); + statsFlag.includeAllShardIndexingPressureTrackers(true); + IndexingPressurePerShardStats shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag) + .getIndexingPressureShardStats(shardId1);; + assertEquals(shardStoreStats.getCurrentPrimaryAndCoordinatingLimits(), shardStoreStats2 + .getCurrentPrimaryAndCoordinatingLimits()); + + statsFlag.includeOnlyTopIndexingPressureMetrics(true); + assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1)); + statsFlag.includeOnlyTopIndexingPressureMetrics(false); + + for (int i = 0; i < NUM_THREADS; i++) { + releasables[i].close(); + } + + //No object in host store as no active shards + shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + if(randomBoolean) { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + } else { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + + shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1); + assertEquals(shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits(), + shardStoreStats2.getCurrentPrimaryAndCoordinatingLimits()); + + statsFlag.includeAllShardIndexingPressureTrackers(false); + assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1)); + } + + public void testReplicaConcurrentUpdatesOnShardIndexingPressureTrackerObjects() throws Exception { + final int NUM_THREADS = scaledRandomIntBetween(100, 400); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "new_uuid"); + ShardId shardId1 = new ShardId(index, 0); + + final Releasable[] releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 20, OperationType.REPLICA); + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats() + .getIndexingPressureShardStats(shardId1); + assertThat(shardStoreStats.getCurrentReplicaLimits(), Matchers.greaterThan(100L)); + + CommonStatsFlags statsFlag = new CommonStatsFlags(); + statsFlag.includeAllShardIndexingPressureTrackers(true); + IndexingPressurePerShardStats shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag) + .getIndexingPressureShardStats(shardId1);; + assertEquals(shardStoreStats.getCurrentReplicaLimits(), shardStoreStats2.getCurrentReplicaLimits()); + + statsFlag.includeOnlyTopIndexingPressureMetrics(true); + assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1)); + statsFlag.includeOnlyTopIndexingPressureMetrics(false); + + for (int i = 0; i < NUM_THREADS; i++) { + releasables[i].close(); + } + + //No object in host store as no active shards + shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1); + assertNull(shardStoreStats); + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + + shardStoreStats2 = shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1);; + assertEquals(shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits(), + shardStoreStats2.getCurrentReplicaLimits()); + + statsFlag.includeAllShardIndexingPressureTrackers(false); + assertNull(shardIndexingPressure.shardStats(statsFlag).getIndexingPressureShardStats(shardId1)); + } + + public void testCoordinatingPrimaryThreadedThroughputDegradationAndRejection() throws Exception { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "15KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 80) + .build(); + final int NUM_THREADS = scaledRandomIntBetween(80, 100); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + + //Generating a concurrent + sequential load to have a fair throughput + if (randomBoolean) { + fireConcurrentAndParallelRequestsForUniformThroughPut(NUM_THREADS, shardIndexingPressure, shardId1, 100, 100, + OperationType.COORDINATING); + } else { + fireConcurrentAndParallelRequestsForUniformThroughPut(NUM_THREADS, shardIndexingPressure, shardId1, 100, 100, + OperationType.PRIMARY); + } + + //Generating a load to such that the requests in the window shows degradation in throughput. + if (randomBoolean) { + fireAllThenCompleteConcurrentRequestsWithUniformDelay(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.get(settings), + shardIndexingPressure, shardId1, 100, 200, OperationType.COORDINATING); + } else { + fireAllThenCompleteConcurrentRequestsWithUniformDelay(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.get(settings), + shardIndexingPressure, shardId1, 100, 200, OperationType.PRIMARY); + } + + //Generate a load which breaches both primary parameter + if(randomBoolean) { + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 11 * 1024, false)); + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingNodeLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingLastSuccessfulRequestLimitsBreachedRejections()); + } else { + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markPrimaryOperationStarted(shardId1, 11 * 1024, false)); + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryNodeLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryLastSuccessfulRequestLimitsBreachedRejections()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedThroughputDegradationAndRejection() throws Exception { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100) + .build(); + final int NUM_THREADS = scaledRandomIntBetween(100, 120); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + + //Generating a load to have a fair throughput + fireConcurrentAndParallelRequestsForUniformThroughPut(NUM_THREADS, shardIndexingPressure, shardId1, 100, 100, + OperationType.REPLICA); + + //Generating a load to such that the requests in the window shows degradation in throughput. + fireAllThenCompleteConcurrentRequestsWithUniformDelay(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.get(settings), + shardIndexingPressure, shardId1, 100, 200, OperationType.REPLICA); + + //Generate a load which breaches both primary parameter + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markReplicaOperationStarted(shardId1, 11 * 1024, false)); + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(15, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getReplicaRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaNodeLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaLastSuccessfulRequestLimitsBreachedRejections()); + } + + public void testCoordinatingPrimaryThreadedLastSuccessfulRequestsAndRejection() throws Exception { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .build(); + final int NUM_THREADS = scaledRandomIntBetween(110, 150); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + + //One request being successful + if(randomBoolean) { + Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 10, false); + coordinating.close(); + } else { + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId1, 10, false); + primary.close(); + } + + //Generating a load such that requests are blocked requests. + Releasable[] releasables; + if (randomBoolean) { + releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 10, OperationType.COORDINATING); + } else { + releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 10, OperationType.PRIMARY); + } + + //Mimic the time elapsed after requests being stuck + Thread.sleep(randomIntBetween(50, 100)); + + //Generate a load which breaches both primary parameter + if(randomBoolean) { + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 200 * 1024, false)); + } else { + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markPrimaryOperationStarted(shardId1, 200 * 1024, false)); + } + + for (int i = 0; i < NUM_THREADS; i++) { + releasables[i].close(); + } + + if(randomBoolean) { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingNodeLimitsBreachedRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingLastSuccessfulRequestLimitsBreachedRejections()); + } else { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryNodeLimitsBreachedRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryLastSuccessfulRequestLimitsBreachedRejections()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(256, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedLastSuccessfulRequestsAndRejection() throws Exception { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .build(); + final int NUM_THREADS = scaledRandomIntBetween(110, 150); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + + //One request being successful + Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId1, 10, false); + replica.close(); + + //Generating a load such that requests are blocked requests. + final Releasable[] releasables = fireConcurrentRequests(NUM_THREADS, shardIndexingPressure, shardId1, 10, OperationType.REPLICA); + //Mimic the time elapsed after requests being stuck + Thread.sleep(randomIntBetween(50, 100)); + + //Generate a load which breaches both primary parameter + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markReplicaOperationStarted(shardId1, 300 * 1024, false)); + + for (int i = 0; i < NUM_THREADS; i++) { + releasables[i].close(); + } + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getReplicaRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaNodeLimitsBreachedRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(384, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryThreadedNodeLimitsAndRejection() throws Exception { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .build(); + final int NUM_THREADS = scaledRandomIntBetween(100, 150); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + + //Generating a load to such that the requests in the window shows degradation in throughput. + Releasable[] releasables; + if (randomBoolean) { + releasables = fireConcurrentRequestsWithUniformDelay(NUM_THREADS, shardIndexingPressure, shardId1, 10, + randomIntBetween(50, 100), OperationType.COORDINATING); + } else { + releasables = fireConcurrentRequestsWithUniformDelay(NUM_THREADS, shardIndexingPressure, shardId1, 10, + randomIntBetween(50, 100), OperationType.PRIMARY); + } + + //Generate a load which breaches both primary parameter + if(randomBoolean) { + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 240 * 1024, false)); + } else { + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markPrimaryOperationStarted(shardId1, 240 * 1024, false)); + } + + for (int i = 0; i < NUM_THREADS; i++) { + releasables[i].close(); + } + + if(randomBoolean) { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingThroughputDegradationLimitsBreachedRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingNodeLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCoordinatingLastSuccessfulRequestLimitsBreachedRejections()); + } else { + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryThroughputDegradationLimitsBreachedRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryNodeLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getPrimaryLastSuccessfulRequestLimitsBreachedRejections()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(256, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaThreadedNodeLimitsAndRejection() throws Exception { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "250KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 100) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .build(); + final int NUM_THREADS = scaledRandomIntBetween(100, 150); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + + //Generating a load to such that the requests in the window shows degradation in throughput. + final Releasable[] releasables = fireConcurrentRequestsWithUniformDelay(NUM_THREADS, shardIndexingPressure, shardId1, 10, + randomIntBetween(50, 100), OperationType.COORDINATING); + + //Generate a load which breaches both primary parameter + expectThrows(OpenSearchRejectedExecutionException.class, + () -> shardIndexingPressure.markReplicaOperationStarted(shardId1, 340 * 1024, false)); + + for (int i = 0; i < NUM_THREADS; i++) { + releasables[i].close(); + } + + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaBytes()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getReplicaRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaThroughputDegradationLimitsBreachedRejections()); + assertEquals(1, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaNodeLimitsBreachedRejections()); + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1) + .getReplicaLastSuccessfulRequestLimitsBreachedRejections()); + assertEquals(384, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1).getCurrentReplicaLimits()); + } + + private void fireAndCompleteConcurrentRequests(int concurrency, ShardIndexingPressure shardIndexingPressure, ShardId shardId, + long bytes, OperationType operationType) throws Exception { + fireAndCompleteConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, randomIntBetween(5, 15), + operationType); + } + + private void fireAndCompleteConcurrentRequestsWithUniformDelay(int concurrency, ShardIndexingPressure shardIndexingPressure, + ShardId shardId, long bytes, long delay, + OperationType operationType) throws Exception { + final Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + threads[i] = new Thread(() -> { + if(operationType == OperationType.COORDINATING) { + Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, false); + coordinating.close(); + } else if (operationType == OperationType.PRIMARY){ + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, false); + primary.close(); + } else { + Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, false); + replica.close(); + } + }); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + //Do Nothing + } + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + } + + private Releasable[] fireConcurrentRequests(int concurrency, ShardIndexingPressure shardIndexingPressure, ShardId shardId, + long bytes, OperationType operationType) throws Exception { + return fireConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, 0, operationType); + } + + private Releasable[] fireConcurrentRequestsWithUniformDelay(int concurrency, ShardIndexingPressure shardIndexingPressure, + ShardId shardId, long bytes, long delay, + OperationType operationType) throws Exception { + final Thread[] threads = new Thread[concurrency]; + final Releasable[] releasable = new Releasable[concurrency]; + for (int i = 0; i < concurrency; i++) { + int counter = i; + threads[i] = new Thread(() -> { + if(operationType == OperationType.COORDINATING) { + releasable[counter] = shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, false); + } else if (operationType == OperationType.PRIMARY){ + releasable[counter] = shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, false); + } else { + releasable[counter] = shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, false); + } + try { + Thread.sleep(delay); + } catch (Exception e) { + //Do Nothing + } + }); + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + return releasable; + } + + private void fireAllThenCompleteConcurrentRequests(int concurrency, ShardIndexingPressure shardIndexingPressure, ShardId shardId, + long bytes, OperationType operationType) throws Exception { + + + fireAllThenCompleteConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, 0, operationType); + } + + private void fireAllThenCompleteConcurrentRequestsWithUniformDelay(int concurrency, ShardIndexingPressure shardIndexingPressure, + ShardId shardId, long bytes, long delay, + OperationType operationType) throws Exception { + + final Releasable[] releasable = fireConcurrentRequestsWithUniformDelay(concurrency, shardIndexingPressure, shardId, bytes, delay, + operationType); + for (int i = 0; i < concurrency; i++) { + releasable[i].close(); + } + } + + private void fireConcurrentAndParallelRequestsForUniformThroughPut(int concurrency, ShardIndexingPressure shardIndexingPressure, + ShardId shardId, long bytes, long delay, + OperationType operationType) throws Exception { + final Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + threads[i] = new Thread(() -> { + for (int j = 0; j < randomIntBetween(400, 500); j++) { + Releasable releasable; + if(operationType == OperationType.COORDINATING) { + releasable = shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, false); + } else if (operationType == OperationType.PRIMARY){ + releasable = shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, false); + } else { + releasable = shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, false); + } + try { + Thread.sleep(delay); + } catch (Exception e) { + //Do Nothing + } + releasable.close(); + } + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java new file mode 100644 index 00000000000..ae799cb3187 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureTests.java @@ -0,0 +1,820 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.stats.IndexingPressurePerShardStats; +import org.opensearch.index.stats.IndexingPressureStats; +import org.opensearch.test.OpenSearchTestCase; + +public class ShardIndexingPressureTests extends OpenSearchTestCase { + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100) + .build(); + + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + + public void testMemoryBytesMarkedAndReleased() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 10, false); + Releasable coordinating2 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 50, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 15, true); + Releasable primary2 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 5, false); + Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 25, true); + Releasable replica2 = shardIndexingPressure.markReplicaOperationStarted(shardId, 10, false)) { + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(60, nodeStats.getCurrentCoordinatingBytes()); + assertEquals(20, nodeStats.getCurrentPrimaryBytes()); + assertEquals(80, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(35, nodeStats.getCurrentReplicaBytes()); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertEquals(60, shardStats.getCurrentCoordinatingBytes()); + assertEquals(20, shardStats.getCurrentPrimaryBytes()); + assertEquals(80, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(35, shardStats.getCurrentReplicaBytes()); + + } + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(0, nodeStats.getCurrentCoordinatingBytes()); + assertEquals(0, nodeStats.getCurrentPrimaryBytes()); + assertEquals(0, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, nodeStats.getCurrentReplicaBytes()); + assertEquals(60, nodeStats.getTotalCoordinatingBytes()); + assertEquals(20, nodeStats.getTotalPrimaryBytes()); + assertEquals(80, nodeStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(35, nodeStats.getTotalReplicaBytes()); + + IndexingPressurePerShardStats shardHotStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertNull(shardHotStoreStats); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(60, shardStats.getTotalCoordinatingBytes()); + assertEquals(20, shardStats.getTotalPrimaryBytes()); + assertEquals(80, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(35, shardStats.getTotalReplicaBytes()); + } + + public void testAvoidDoubleAccounting() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 10, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 15)) { + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(10, nodeStats.getCurrentCoordinatingBytes()); + assertEquals(15, nodeStats.getCurrentPrimaryBytes()); + assertEquals(10, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertEquals(10, shardStats.getCurrentCoordinatingBytes()); + assertEquals(15, shardStats.getCurrentPrimaryBytes()); + assertEquals(10, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + } + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(0, nodeStats.getCurrentCoordinatingBytes()); + assertEquals(0, nodeStats.getCurrentPrimaryBytes()); + assertEquals(0, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, nodeStats.getTotalCoordinatingBytes()); + assertEquals(15, nodeStats.getTotalPrimaryBytes()); + assertEquals(10, nodeStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertNull(shardStoreStats); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getTotalCoordinatingBytes()); + assertEquals(15, shardStats.getTotalPrimaryBytes()); + assertEquals(10, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + } + + public void testCoordinatingPrimaryRejections() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1024 * 3, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024 * 3, false); + Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 3, false)) { + if (randomBoolean()) { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markCoordinatingOperationStarted(shardId, 1024 * 2, false)); + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(1, nodeStats.getCoordinatingRejections()); + assertEquals(1024 * 6, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertEquals(1, shardStats.getCoordinatingRejections()); + assertEquals(1024 * 6, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(1, shardStats.getCoordinatingNodeLimitsBreachedRejections()); + } else { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markPrimaryOperationStarted(shardId, 1024 * 2, false)); + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(1, nodeStats.getPrimaryRejections()); + assertEquals(1024 * 6, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats() + .getIndexingPressureShardStats(shardId); + assertEquals(1, shardStats.getPrimaryRejections()); + assertEquals(1024 * 6, nodeStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(1, shardStats.getPrimaryNodeLimitsBreachedRejections()); + } + long preForceRejections = shardIndexingPressure.stats().getPrimaryRejections(); + long preForcedShardRejections = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getPrimaryRejections(); + // Primary can be forced + Releasable forced = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024 * 2, true); + assertEquals(preForceRejections, shardIndexingPressure.stats().getPrimaryRejections()); + assertEquals(1024 * 8, shardIndexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + + assertEquals(preForcedShardRejections, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getPrimaryRejections()); + assertEquals(1024 * 8, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(preForcedShardRejections, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getPrimaryNodeLimitsBreachedRejections()); + forced.close(); + + // Local to coordinating node primary actions not rejected + IndexingPressureStats preLocalNodeStats = shardIndexingPressure.stats(); + IndexingPressurePerShardStats preLocalShardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + Releasable local = shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024 * 2); + assertEquals(preLocalNodeStats.getPrimaryRejections(), shardIndexingPressure.stats().getPrimaryRejections()); + assertEquals(1024 * 6, shardIndexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(preLocalNodeStats.getCurrentPrimaryBytes() + 1024 * 2, shardIndexingPressure.stats().getCurrentPrimaryBytes()); + + assertEquals(preLocalShardStats.getPrimaryRejections(), shardIndexingPressure.shardStats() + .getIndexingPressureShardStats(shardId).getPrimaryRejections()); + assertEquals(1024 * 6, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(preLocalShardStats.getCurrentPrimaryBytes() + 1024 * 2, shardIndexingPressure.shardStats() + .getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes()); + assertEquals(preLocalShardStats.getPrimaryNodeLimitsBreachedRejections(), shardIndexingPressure.shardStats() + .getIndexingPressureShardStats(shardId).getPrimaryNodeLimitsBreachedRejections()); + local.close(); + } + + assertEquals(1024 * 8, shardIndexingPressure.stats().getTotalCombinedCoordinatingAndPrimaryBytes()); + assertNull(shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)); + assertEquals(1024 * 8, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId) + .getTotalCombinedCoordinatingAndPrimaryBytes()); + } + + public void testReplicaRejections() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1024 * 3, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024 * 3, false); + Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 3, false)) { + // Replica will not be rejected until replica bytes > 15KB + Releasable replica2 = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 9, false); + assertEquals(1024 * 12, shardIndexingPressure.stats().getCurrentReplicaBytes()); + assertEquals(1024 * 12, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + // Replica will be rejected once we cross 15KB Shard Limit + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markReplicaOperationStarted(shardId, 1024 * 2, false)); + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(1, nodeStats.getReplicaRejections()); + assertEquals(1024 * 12, nodeStats.getCurrentReplicaBytes()); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertEquals(1, shardStats.getReplicaRejections()); + assertEquals(1024 * 12, shardStats.getCurrentReplicaBytes()); + assertEquals(1, shardStats.getReplicaNodeLimitsBreachedRejections()); + + // Replica can be forced + Releasable forced = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024 * 2, true); + assertEquals(1, shardIndexingPressure.stats().getReplicaRejections()); + assertEquals(1024 * 14, shardIndexingPressure.stats().getCurrentReplicaBytes()); + + assertEquals(1, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getReplicaRejections()); + assertEquals(1024 * 14, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals(1, shardStats.getReplicaNodeLimitsBreachedRejections()); + forced.close(); + + replica2.close(); + } + + assertEquals(1024 * 14, shardIndexingPressure.stats().getTotalReplicaBytes()); + assertNull(shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId)); + assertEquals(1024 * 14, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId).getTotalReplicaBytes()); + } + + public void testCoordinatingPrimaryShardLimitIncrease() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 2, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 2, false)) { + assertEquals(2, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + assertEquals(4, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); // Base Limit + if (randomBoolean) { + Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 6, false); + assertEquals(8, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + assertEquals(10, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(11, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); // Increased Limit + coordinating1.close(); + } else { + Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 6, false); + assertEquals(8, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes()); + assertEquals(10, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(11, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); // Increased Limit + primary1.close(); + } + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertNull(shardStoreStats); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + if(randomBoolean){ + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(8, shardStats.getTotalCoordinatingBytes()); + } else { + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(8, shardStats.getTotalPrimaryBytes()); + } + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaShardLimitIncrease() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 2, false)) { + assertEquals(2, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals(15, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); // Base Limit + + Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 14, false); + assertEquals(16, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals(18, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); // Increased Limit + replica1.close(); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertNull(shardStoreStats); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(16, shardStats.getTotalReplicaBytes()); + assertEquals(15, shardStats.getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryShardLimitIncreaseEvaluateSecondaryParam() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 4 * 1024, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 4 * 1024, false)) { + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCoordinatingBytes()); + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryBytes()); + assertEquals(8 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(8*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertNull(shardStoreStats); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes()); + assertEquals(4 * 1024, shardStats.getTotalPrimaryBytes()); + assertEquals(8 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testReplicaShardLimitIncreaseEvaluateSecondaryParam() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 11 * 1024, false)) { + assertEquals(11 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals((long)(11 * 1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); + } + + IndexingPressurePerShardStats shardStoreStats = shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId); + assertNull(shardStoreStats); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(11 * 1024, shardStats.getTotalReplicaBytes()); + assertEquals(15, shardStats.getCurrentReplicaLimits()); + } + + public void testCoordinatingPrimaryShardRejectionViaSuccessfulRequestsParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) { + assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes()); + assertEquals(2 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(2*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(1 * 1024, shardStats.getTotalCoordinatingBytes()); + assertEquals(1 * 1024, shardStats.getTotalPrimaryBytes()); + assertEquals(2 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + + Thread.sleep(25); + //Total Bytes are 9*1024 and node limit is 10*1024 + if(randomBoolean) { + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 7 * 1024, false); + Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false)) { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markCoordinatingOperationStarted(shardId, 1 * 1024, false)); + } + } else { + try (Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 7 * 1024, false); + Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markPrimaryOperationStarted(shardId, 1 * 1024, false)); + } + } + + shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + if(randomBoolean) { + assertEquals(1, shardStats.getCoordinatingRejections()); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(1, shardStats.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections()); + } else { + assertEquals(1, shardStats.getPrimaryRejections()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(1, shardStats.getPrimaryLastSuccessfulRequestLimitsBreachedRejections()); + } + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + if(randomBoolean) { + assertEquals(1, nodeStats.getCoordinatingRejections()); + assertEquals(0, nodeStats.getCurrentCoordinatingBytes()); + } else { + assertEquals(1, nodeStats.getPrimaryRejections()); + assertEquals(0, nodeStats.getCurrentPrimaryBytes()); + } + } + + public void testReplicaShardRejectionViaSuccessfulRequestsParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false)) { + assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals((long)(1*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(1 * 1024, shardStats.getTotalReplicaBytes()); + assertEquals(15, shardStats.getCurrentReplicaLimits()); + + Thread.sleep(25); + //Total Bytes are 14*1024 and node limit is 15*1024 + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 10 * 1024, false); + Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 2 * 1024, false)) { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markReplicaOperationStarted(shardId, 2 * 1024, false)); + } + + shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(1, shardStats.getReplicaRejections()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(1, shardStats.getReplicaLastSuccessfulRequestLimitsBreachedRejections()); + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(1, nodeStats.getReplicaRejections()); + assertEquals(0, nodeStats.getCurrentReplicaBytes()); + } + + public void testCoordinatingPrimaryShardRejectionSkippedInShadowModeViaSuccessfulRequestsParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) { + assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes()); + assertEquals(2 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(2*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(1 * 1024, shardStats.getTotalCoordinatingBytes()); + assertEquals(1 * 1024, shardStats.getTotalPrimaryBytes()); + assertEquals(2 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + + Thread.sleep(25); + //Total Bytes are 9*1024 and node limit is 10*1024 + if(randomBoolean) { + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 7 * 1024, false); + Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false)) { + Releasable coordinating2 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false); + coordinating2.close(); + } + } else { + try (Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 7 * 1024, false); + Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false)) { + Releasable primary2 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false); + primary2.close(); + } + } + + shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + if(randomBoolean) { + assertEquals(0, shardStats.getCoordinatingRejections()); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(1, shardStats.getCoordinatingLastSuccessfulRequestLimitsBreachedRejections()); + } else { + assertEquals(0, shardStats.getPrimaryRejections()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(1, shardStats.getPrimaryLastSuccessfulRequestLimitsBreachedRejections()); + } + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + if(randomBoolean) { + assertEquals(0, nodeStats.getCoordinatingRejections()); + assertEquals(0, nodeStats.getCurrentCoordinatingBytes()); + } else { + assertEquals(0, nodeStats.getPrimaryRejections()); + assertEquals(0, nodeStats.getCurrentPrimaryBytes()); + } + } + + public void testReplicaShardRejectionSkippedInShadowModeViaSuccessfulRequestsParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1) + .put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false)) { + assertEquals(1 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals((long)(1*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(1 * 1024, shardStats.getTotalReplicaBytes()); + assertEquals(15, shardStats.getCurrentReplicaLimits()); + + Thread.sleep(25); + //Total Bytes are 14*1024 and node limit is 15*1024 + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 10 * 1024, false); + Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 2 * 1024, false)) { + Releasable replica2 = shardIndexingPressure.markReplicaOperationStarted(shardId, 2 * 1024, false); + replica2.close(); + } + + shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getReplicaRejections()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(1, shardStats.getReplicaLastSuccessfulRequestLimitsBreachedRejections()); + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(0, nodeStats.getReplicaRejections()); + assertEquals(0, nodeStats.getCurrentReplicaBytes()); + } + + public void testCoordinatingPrimaryShardRejectionViaThroughputDegradationParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false); + Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 3 * 1024, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false); + Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 3 * 1024, false)) { + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes()); + assertEquals(8 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(8*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); + //Adding delay in the current in flight request to mimic throughput degradation + Thread.sleep(100); + } + if(randomBoolean) { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markCoordinatingOperationStarted(shardId, 8 * 1024, false)); + } else { + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markPrimaryOperationStarted(shardId, 8 * 1024, false)); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + if(randomBoolean) { + assertEquals(1, shardStats.getCoordinatingRejections()); + assertEquals(1, shardStats.getCoordinatingThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes()); + } else { + assertEquals(1, shardStats.getPrimaryRejections()); + assertEquals(1, shardStats.getPrimaryThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(4 * 1024, shardStats.getTotalPrimaryBytes()); + } + + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(8 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + if(randomBoolean) { + assertEquals(1, nodeStats.getCoordinatingRejections()); + assertEquals(0, nodeStats.getCurrentCoordinatingBytes()); + } else { + assertEquals(1, nodeStats.getPrimaryRejections()); + assertEquals(0, nodeStats.getCurrentPrimaryBytes()); + } + } + + public void testReplicaShardRejectionViaThroughputDegradationParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false); + Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 3 * 1024, false)) { + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals((long)(4*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); + //Adding delay in the current in flight request to mimic throughput degradation + Thread.sleep(100); + } + + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markReplicaOperationStarted(shardId, 12 * 1024, false)); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(1, shardStats.getReplicaRejections()); + assertEquals(1, shardStats.getReplicaThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(4 * 1024, shardStats.getTotalReplicaBytes()); + assertEquals(15, shardStats.getCurrentReplicaLimits()); + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(1, nodeStats.getReplicaRejections()); + assertEquals(0, nodeStats.getCurrentReplicaBytes()); + } + + public void testCoordinatingPrimaryShardRejectionSkippedInShadowModeViaThroughputDegradationParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + boolean randomBoolean = randomBoolean(); + try (Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1 * 1024, false); + Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 3 * 1024, false); + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1 * 1024, false); + Releasable primary1 = shardIndexingPressure.markPrimaryOperationStarted(shardId, 3 * 1024, false)) { + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentPrimaryBytes()); + assertEquals(8 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(8*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentPrimaryAndCoordinatingLimits()); + //Adding delay in the current in flight request to mimic throughput degradation + Thread.sleep(100); + } + if(randomBoolean) { + Releasable coordinating = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 8 * 1024, false); + coordinating.close(); + } else { + Releasable primary = shardIndexingPressure.markPrimaryOperationStarted(shardId, 8 * 1024, false); + primary.close(); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + if(randomBoolean) { + assertEquals(0, shardStats.getCoordinatingRejections()); + assertEquals(1, shardStats.getCoordinatingThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(12 * 1024, shardStats.getTotalCoordinatingBytes()); + } else { + assertEquals(0, shardStats.getPrimaryRejections()); + assertEquals(1, shardStats.getPrimaryThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(12 * 1024, shardStats.getTotalPrimaryBytes()); + } + + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(16 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + if(randomBoolean) { + assertEquals(0, nodeStats.getCoordinatingRejections()); + assertEquals(0, nodeStats.getCurrentCoordinatingBytes()); + } else { + assertEquals(0, nodeStats.getPrimaryRejections()); + assertEquals(0, nodeStats.getCurrentPrimaryBytes()); + } + } + + public void testReplicaShardRejectionSkippedInShadowModeViaThroughputDegradationParam() throws InterruptedException { + Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false) + .put(ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS.getKey(), 1) + .put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 1) + .build(); + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + try (Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 1 * 1024, false); + Releasable replica1 = shardIndexingPressure.markReplicaOperationStarted(shardId, 3 * 1024, false)) { + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId).getCurrentReplicaBytes()); + assertEquals((long)(4*1024/0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentReplicaLimits()); + //Adding delay in the current in flight request to mimic throughput degradation + Thread.sleep(100); + } + + Releasable replica = shardIndexingPressure.markReplicaOperationStarted(shardId, 12 * 1024, false); + replica.close(); + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId); + assertEquals(0, shardStats.getReplicaRejections()); + assertEquals(1, shardStats.getReplicaThroughputDegradationLimitsBreachedRejections()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(16 * 1024, shardStats.getTotalReplicaBytes()); + assertEquals(15, shardStats.getCurrentReplicaLimits()); + + IndexingPressureStats nodeStats = shardIndexingPressure.stats(); + assertEquals(0, nodeStats.getReplicaRejections()); + assertEquals(0, nodeStats.getCurrentReplicaBytes()); + } + + public void testShardLimitIncreaseMultipleShards() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId1 = new ShardId(index, 0); + ShardId shardId2 = new ShardId(index, 1); + try (Releasable coordinating1 = shardIndexingPressure.markCoordinatingOperationStarted(shardId1, 4 * 1024, false); + Releasable coordinating2 = shardIndexingPressure.markCoordinatingOperationStarted(shardId2, 4 * 1024, false);) { + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentCoordinatingBytes()); + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(4 * 1024 / 0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId1) + .getCurrentPrimaryAndCoordinatingLimits()); + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId2) + .getCurrentCoordinatingBytes()); + assertEquals(4 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId2) + .getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals((long)(4 * 1024 / 0.85), shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId2) + .getCurrentPrimaryAndCoordinatingLimits()); + } + + IndexingPressurePerShardStats shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId1); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes()); + assertEquals(4 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + + shardStats = shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId2); + assertEquals(0, shardStats.getCurrentCoordinatingBytes()); + assertEquals(0, shardStats.getCurrentPrimaryBytes()); + assertEquals(0, shardStats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, shardStats.getCurrentReplicaBytes()); + assertEquals(4 * 1024, shardStats.getTotalCoordinatingBytes()); + assertEquals(4 * 1024, shardStats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, shardStats.getCurrentPrimaryAndCoordinatingLimits()); + } + + public void testForceExecutionOnCoordinating() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + expectThrows(OpenSearchRejectedExecutionException.class, () -> shardIndexingPressure + .markCoordinatingOperationStarted(shardId,1024 * 11, false)); + try (Releasable ignore = shardIndexingPressure.markCoordinatingOperationStarted(shardId,11 * 1024, true)) { + assertEquals(11 * 1024, shardIndexingPressure.shardStats().getIndexingPressureShardStats(shardId) + .getCurrentCoordinatingBytes()); + } + assertEquals(0, shardIndexingPressure.coldStats().getIndexingPressureShardStats(shardId).getCurrentCoordinatingBytes()); + } + + public void testAssertionOnReleaseExecutedTwice() { + ShardIndexingPressure shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); + Index index = new Index("IndexName", "UUID"); + ShardId shardId = new ShardId(index, 0); + String assertionMessage = "ShardIndexingPressure Release is called twice"; + + Releasable releasable = shardIndexingPressure.markCoordinatingOperationStarted(shardId, 1024, false); + releasable.close(); + expectThrows(AssertionError.class, assertionMessage, releasable::close); + + releasable = shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024); + releasable.close(); + expectThrows(AssertionError.class, assertionMessage, releasable::close); + + releasable = shardIndexingPressure.markPrimaryOperationStarted(shardId, 1024, false); + releasable.close(); + expectThrows(AssertionError.class, assertionMessage, releasable::close); + + releasable = shardIndexingPressure.markReplicaOperationStarted(shardId, 1024, false); + releasable.close(); + expectThrows(AssertionError.class, assertionMessage, releasable::close); + } +}