From cf6b6dfedcda43a37cde6f1b566f581bead82ee0 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Mon, 30 Aug 2021 12:07:31 +0530 Subject: [PATCH] Add plumbing logic for IndexingPressureService in Transport Actions. (#1113) Signed-off-by: Saurabh Singh --- .../action/bulk/TransportBulkAction.java | 31 +- .../action/bulk/TransportShardBulkAction.java | 6 +- .../TransportResyncReplicationAction.java | 6 +- .../replication/TransportWriteAction.java | 18 +- .../cluster/service/ClusterService.java | 17 + .../index/IndexingPressureService.java | 10 +- .../index/seqno/RetentionLeaseSyncAction.java | 6 +- .../main/java/org/opensearch/node/Node.java | 11 +- .../java/org/opensearch/node/NodeService.java | 10 +- ...ActionIndicesThatCannotBeCreatedTests.java | 7 +- .../bulk/TransportBulkActionIngestTests.java | 5 +- .../action/bulk/TransportBulkActionTests.java | 4 +- .../bulk/TransportBulkActionTookTests.java | 4 +- ...TransportResyncReplicationActionTests.java | 6 +- ...rtWriteActionForIndexingPressureTests.java | 487 ++++++++++++++++++ .../TransportWriteActionTests.java | 6 +- .../index/IndexingPressureServiceTests.java | 2 +- .../index/IndexingPressureTests.java | 3 +- .../seqno/RetentionLeaseSyncActionTests.java | 8 +- .../snapshots/SnapshotResiliencyTests.java | 8 +- 20 files changed, 591 insertions(+), 64 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 7aba9a40f05..1734abc11dc 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -75,7 +75,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.Index; import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.IndexingPressure; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; @@ -127,25 +127,26 @@ public class TransportBulkAction extends HandledTransportAction listener) { final long indexingBytes = bulkRequest.ramBytesUsed(); final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); - final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem); + final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; try { @@ -562,7 +563,11 @@ public class TransportBulkAction extends HandledTransportAction() { + // Add the shard level accounting for coordinating and supply the listener + final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); + final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(shardId, + bulkShardRequest.ramBytesUsed(), isOnlySystem); + shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { @@ -584,7 +589,7 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest = request.request(); responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e))); + new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e))); } if (counter.decrementAndGet() == 0) { finishHim(); @@ -595,7 +600,7 @@ public class TransportBulkAction extends HandledTransportAction extends TransportReplicationAction { - protected final IndexingPressure indexingPressure; + protected final IndexingPressureService indexingPressureService; protected final SystemIndices systemIndices; private final Function executorFunction; @@ -85,13 +85,14 @@ public abstract class TransportWriteAction< ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, Writeable.Reader replicaRequest, Function executorFunction, - boolean forceExecutionOnPrimary, IndexingPressure indexingPressure, SystemIndices systemIndices) { + boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService, + SystemIndices systemIndices) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executorFunction = executorFunction; - this.indexingPressure = indexingPressure; + this.indexingPressureService = indexingPressureService; this.systemIndices = systemIndices; } @@ -101,7 +102,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request)); + return indexingPressureService.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request)); } protected boolean force(ReplicatedWriteRequest request) { @@ -119,7 +120,8 @@ public abstract class TransportWriteAction< // If this primary request was received from a local reroute initiated by the node client, we // must mark a new primary operation local to the coordinating node. if (localRerouteInitiatedByNodeClient) { - return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request)); + return indexingPressureService.markPrimaryOperationLocalToCoordinatingNodeStarted(request.shardId, + primaryOperationSize(request)); } else { return () -> {}; } @@ -127,7 +129,7 @@ public abstract class TransportWriteAction< // If this primary request was received directly from the network, we must mark a new primary // operation. This happens if the write action skips the reroute step (ex: rsync) or during // primary delegation, after the primary relocation hand-off. - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request)); + return indexingPressureService.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request)); } } @@ -137,7 +139,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request)); + return indexingPressureService.markReplicaOperationStarted(request.shardId, replicaOperationSize(request), force(request)); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 205eb050562..8834d6aea60 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -49,6 +49,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexingPressureService; import org.opensearch.node.Node; import org.opensearch.threadpool.ThreadPool; @@ -78,6 +79,8 @@ public class ClusterService extends AbstractLifecycleComponent { private RerouteService rerouteService; + private IndexingPressureService indexingPressureService; + public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this(settings, clusterSettings, new MasterService(settings, clusterSettings, threadPool), new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)); @@ -203,6 +206,20 @@ public class ClusterService extends AbstractLifecycleComponent { return masterService; } + /** + * Getter and Setter for IndexingPressureService, This method exposes IndexingPressureService stats to other plugins for usage. + * Although Indexing Pressure instances can be accessed via Node and NodeService class but none of them are + * present in the createComponents signature of Plugin interface currently. {@link org.opensearch.plugins.Plugin#createComponents} + * Going forward, IndexingPressureService will have required constructs for exposing listeners/interfaces for plugin development.(#478) + */ + public void setIndexingPressureService(IndexingPressureService indexingPressureService) { + this.indexingPressureService = indexingPressureService; + } + + public IndexingPressureService getIndexingPressureService() { + return indexingPressureService; + } + public ClusterApplierService getClusterApplierService() { return clusterApplierService; } diff --git a/server/src/main/java/org/opensearch/index/IndexingPressureService.java b/server/src/main/java/org/opensearch/index/IndexingPressureService.java index 02079d7bad6..62172c6c71a 100644 --- a/server/src/main/java/org/opensearch/index/IndexingPressureService.java +++ b/server/src/main/java/org/opensearch/index/IndexingPressureService.java @@ -25,11 +25,19 @@ public class IndexingPressureService { shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); } + public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) { + if (isShardIndexingPressureEnabled() == false) { + return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution); + } else { + return () -> {}; + } + } + public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { if (isShardIndexingPressureEnabled()) { return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution); } else { - return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution); + return () -> {}; } } diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index 135b47cd4af..2ae5ff4ff9d 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -38,7 +38,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; -import org.opensearch.index.IndexingPressure; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.WriteResponse; @@ -55,6 +54,7 @@ import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.ShardId; @@ -94,7 +94,7 @@ public class RetentionLeaseSyncAction extends final ThreadPool threadPool, final ShardStateAction shardStateAction, final ActionFilters actionFilters, - final IndexingPressure indexingPressure, + final IndexingPressureService indexingPressureService, final SystemIndices systemIndices) { super( settings, @@ -107,7 +107,7 @@ public class RetentionLeaseSyncAction extends actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressure, systemIndices); + ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressureService, systemIndices); } @Override diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 87d8d516c4b..d13594c01c8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; +import org.opensearch.index.IndexingPressureService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -120,7 +121,6 @@ import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexSettings; -import org.opensearch.index.IndexingPressure; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.engine.EngineFactory; import org.opensearch.indices.IndicesModule; @@ -599,7 +599,10 @@ public class Node implements Closeable { final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); - final IndexingPressure indexingLimits = new IndexingPressure(settings); + final IndexingPressureService indexingPressureService = new IndexingPressureService(settings, clusterService); + // Going forward, IndexingPressureService will have required constructs for exposing listeners/interfaces for plugin + // development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478). + clusterService.setIndexingPressureService(indexingPressureService); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment, @@ -628,7 +631,7 @@ public class Node implements Closeable { this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, - searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService()); + searchTransportService, indexingPressureService, searchModule.getValuesSourceRegistry().getUsageService()); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, searchModule.getFetchPhase(), @@ -664,7 +667,7 @@ public class Node implements Closeable { b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); - b.bind(IndexingPressure.class).toInstance(indexingLimits); + b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index c20874009c5..188a5a9c821 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -32,7 +32,6 @@ package org.opensearch.node; -import org.opensearch.index.IndexingPressure; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.Build; import org.opensearch.Version; @@ -46,6 +45,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.discovery.Discovery; import org.opensearch.http.HttpServerTransport; +import org.opensearch.index.IndexingPressureService; import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.ingest.IngestService; @@ -74,7 +74,7 @@ public class NodeService implements Closeable { private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; private final SearchTransportService searchTransportService; - private final IndexingPressure indexingPressure; + private final IndexingPressureService indexingPressureService; private final AggregationUsageService aggregationUsageService; private final Discovery discovery; @@ -84,7 +84,7 @@ public class NodeService implements Closeable { CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, - SearchTransportService searchTransportService, IndexingPressure indexingPressure, + SearchTransportService searchTransportService, IndexingPressureService indexingPressureService, AggregationUsageService aggregationUsageService) { this.settings = settings; this.threadPool = threadPool; @@ -100,7 +100,7 @@ public class NodeService implements Closeable { this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; - this.indexingPressure = indexingPressure; + this.indexingPressureService = indexingPressureService; this.aggregationUsageService = aggregationUsageService; clusterService.addStateApplier(ingestService); } @@ -143,7 +143,7 @@ public class NodeService implements Closeable { ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, scriptCache ? scriptService.cacheStats() : null, - indexingPressure ? this.indexingPressure.stats() : null + indexingPressure ? this.indexingPressureService.nodeStats() : null ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index bdb45e66678..79d62b82422 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -44,13 +44,14 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; -import org.opensearch.index.IndexingPressure; import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; @@ -138,7 +139,9 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends OpenSear final ExecutorService direct = OpenSearchExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, - null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap())) { + null, null, null, mock(ActionFilters.class), null, null, + new IndexingPressureService(Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)), new SystemIndices(emptyMap())) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 529078100e4..084d3b060c5 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -67,7 +67,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; -import org.opensearch.index.IndexingPressure; +import org.opensearch.index.IndexingPressureService; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; @@ -163,7 +163,8 @@ public class TransportBulkActionIngestTests extends OpenSearchTestCase { SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), new SystemIndices(emptyMap()) - ), new IndexingPressure(SETTINGS), new SystemIndices(emptyMap()) + ), new IndexingPressureService(SETTINGS, new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)), new SystemIndices(emptyMap()) ); } @Override diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index a4c2203208a..46a8f768273 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -58,7 +58,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.IndexingPressure; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.SystemIndices; @@ -105,7 +105,7 @@ public class TransportBulkActionTests extends OpenSearchTestCase { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), - new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap())); + new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap())); } @Override diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index 3a8afe70a03..5822f572c9d 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -55,8 +55,8 @@ import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexingPressureService; import org.opensearch.rest.action.document.RestBulkAction; -import org.opensearch.index.IndexingPressure; import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchTestCase; @@ -266,7 +266,7 @@ public class TransportBulkActionTookTests extends OpenSearchTestCase { actionFilters, indexNameExpressionResolver, autoCreateIndex, - new IndexingPressure(Settings.EMPTY), + new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()), relativeTimeProvider); } diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index 4e93551a147..595ade7324e 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -33,8 +33,6 @@ package org.opensearch.action.resync; import org.opensearch.Version; import org.opensearch.action.ActionListener; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.IndexingPressure; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; @@ -53,6 +51,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.index.Index; import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.index.shard.ShardId; @@ -162,7 +162,7 @@ public class TransportResyncReplicationActionTests extends OpenSearchTestCase { final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap())); + new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap())); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java new file mode 100644 index 00000000000..411731acb19 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java @@ -0,0 +1,487 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.action.support.replication; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.WriteResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.ShardIndexingPressureSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.shard.ReplicationGroup; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardNotFoundException; +import org.opensearch.index.shard.ShardNotInPrimaryModeException; +import org.opensearch.index.stats.IndexingPressurePerShardStats; +import org.opensearch.index.translog.Translog; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.SystemIndices; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; +import org.hamcrest.Matcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.emptyMap; +import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportWriteActionForIndexingPressureTests extends OpenSearchTestCase { + private static ThreadPool threadPool; + + private ClusterService clusterService; + private TransportService transportService; + private CapturingTransport transport; + private ShardStateAction shardStateAction; + private Translog.Location location; + private Releasable releasable; + private IndexingPressureService indexingPressureService; + + public static final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("ShardReplicationTests"); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = transport.createTransportService(clusterService.getSettings(), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); + releasable = mock(Releasable.class); + location = mock(Translog.Location.class); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + public void testIndexingPressureOperationStartedForReplicaNode() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + final ReplicationTask task = maybeTask(); + final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false) + .build(); + this.indexingPressureService = new IndexingPressureService(settings, clusterService); + + TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService, + shardStateAction, threadPool); + + action.handleReplicaRequest( + new TransportReplicationAction.ConcreteReplicaRequest<>( + new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()), + createTransportChannel(new PlainActionFuture<>()), task); + + IndexingPressurePerShardStats shardStats = + this.indexingPressureService.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + + assertPhase(task, "finished"); + assertTrue(Objects.isNull(shardStats)); + } + + public void testIndexingPressureOperationStartedForReplicaShard() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + final ReplicationTask task = maybeTask(); + final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .build(); + this.indexingPressureService = new IndexingPressureService(settings, clusterService); + + TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService, + shardStateAction, threadPool); + + action.handleReplicaRequest( + new TransportReplicationAction.ConcreteReplicaRequest<>( + new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()), + createTransportChannel(new PlainActionFuture<>()), task); + + CommonStatsFlags statsFlag = new CommonStatsFlags(); + statsFlag.includeAllShardIndexingPressureTrackers(true); + IndexingPressurePerShardStats shardStats = + this.indexingPressureService.shardStats(statsFlag).getIndexingPressureShardStats(shardId); + + assertPhase(task, "finished"); + assertTrue(!Objects.isNull(shardStats)); + assertEquals(100, shardStats.getTotalReplicaBytes()); + } + + public void testIndexingPressureOperationStartedForPrimaryNode() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + final ReplicationTask task = maybeTask(); + final Settings settings = + Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(); + this.indexingPressureService = new IndexingPressureService(settings, clusterService); + + TestAction action = new TestAction(settings, "internal:testActionWithExceptions", transportService, clusterService, + shardStateAction, threadPool); + + action.handlePrimaryRequest( + new TransportReplicationAction.ConcreteReplicaRequest<>( + new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()), + createTransportChannel(new PlainActionFuture<>()), task); + + IndexingPressurePerShardStats shardStats = + this.indexingPressureService.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + + assertPhase(task, "finished"); + assertTrue(Objects.isNull(shardStats)); + } + + public void testIndexingPressureOperationStartedForPrimaryShard() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + final ReplicationTask task = maybeTask(); + final Settings settings = + Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true).build(); + this.indexingPressureService = new IndexingPressureService(settings, clusterService); + + TestAction action = new TestAction(settings, "internal:testActionWithExceptions", transportService, clusterService, + shardStateAction, threadPool); + + action.handlePrimaryRequest( + new TransportReplicationAction.ConcreteReplicaRequest<>( + new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()), + createTransportChannel(new PlainActionFuture<>()), task); + + CommonStatsFlags statsFlag = new CommonStatsFlags(); + statsFlag.includeAllShardIndexingPressureTrackers(true); + IndexingPressurePerShardStats shardStats = + this.indexingPressureService.shardStats(statsFlag).getIndexingPressureShardStats(shardId); + + assertPhase(task, "finished"); + assertTrue(!Objects.isNull(shardStats)); + assertEquals(100, shardStats.getTotalPrimaryBytes()); + } + + public void testIndexingPressureOperationStartedForLocalPrimaryNode() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + final ReplicationTask task = maybeTask(); + final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false) + .build(); + this.indexingPressureService = new IndexingPressureService(settings, clusterService); + + TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService, + shardStateAction, threadPool); + + action.handlePrimaryRequest( + new TransportReplicationAction.ConcreteShardRequest<>( + new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + true, true), + createTransportChannel(new PlainActionFuture<>()), task); + + IndexingPressurePerShardStats shardStats = + this.indexingPressureService.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); + + assertPhase(task, "finished"); + assertTrue(Objects.isNull(shardStats)); + } + + public void testIndexingPressureOperationStartedForLocalPrimaryShard() { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + final ReplicationTask task = maybeTask(); + final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true) + .build(); + this.indexingPressureService = new IndexingPressureService(settings, clusterService); + + TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService, + shardStateAction, threadPool); + + action.handlePrimaryRequest( + new TransportReplicationAction.ConcreteShardRequest<>( + new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(), + true, true), + createTransportChannel(new PlainActionFuture<>()), task); + + CommonStatsFlags statsFlag = new CommonStatsFlags(); + statsFlag.includeAllShardIndexingPressureTrackers(true); + IndexingPressurePerShardStats shardStats = + this.indexingPressureService.shardStats(statsFlag).getIndexingPressureShardStats(shardId); + + assertPhase(task, "finished"); + assertTrue(!Objects.isNull(shardStats)); + } + + private final AtomicInteger count = new AtomicInteger(0); + + private final AtomicBoolean isRelocated = new AtomicBoolean(false); + + private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true); + + /** + * Sometimes build a ReplicationTask for tracking the phase of the + * TransportReplicationAction. Since TransportReplicationAction has to work + * if the task as null just as well as if it is supplied this returns null + * half the time. + */ + ReplicationTask maybeTask() { + return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, null) : null; + } + + /** + * If the task is non-null this asserts that the phrase matches. + */ + void assertPhase(@Nullable ReplicationTask task, String phase) { + assertPhase(task, equalTo(phase)); + } + + private void assertPhase(@Nullable ReplicationTask task, Matcher phaseMatcher) { + if (task != null) { + assertThat(task.getPhase(), phaseMatcher); + } + } + + private class TestAction extends TransportWriteAction { + protected TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, + mockIndicesService(clusterService), threadPool, shardStateAction, + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false, + TransportWriteActionForIndexingPressureTests.this.indexingPressureService, new SystemIndices(emptyMap())); + } + + @Override + protected TestResponse newResponseInstance(StreamInput in) throws IOException { + return new TestResponse(); + } + + @Override + protected long primaryOperationSize(TestRequest request) { + return 100; + } + + @Override + protected long replicaOperationSize(TestRequest request) { + return 100; + } + + @Override + protected void dispatchedShardOperationOnPrimary( + TestRequest request, IndexShard primary, ActionListener> listener) { + ActionListener.completeWith(listener, () -> new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, + logger)); + } + + @Override + protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> new WriteReplicaResult<>(request, location, null, replica, logger)); + } + + } + + private static class TestRequest extends ReplicatedWriteRequest { + TestRequest(StreamInput in) throws IOException { + super(in); + } + + TestRequest() { + super(new ShardId("test", "_na_", 0)); + } + + @Override + public String toString() { + return "TestRequest{}"; + } + } + + private static class TestResponse extends ReplicationResponse implements WriteResponse { + boolean forcedRefresh; + + @Override + public void setForcedRefresh(boolean forcedRefresh) { + this.forcedRefresh = forcedRefresh; + } + } + + private IndicesService mockIndicesService(ClusterService clusterService) { + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> { + Index index = (Index)invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + final IndexMetadata indexSafe = state.metadata().getIndexSafe(index); + return mockIndexService(indexSafe, clusterService); + }); + when(indicesService.indexService(any(Index.class))).then(invocation -> { + Index index = (Index) invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + if (state.metadata().hasIndex(index.getName())) { + return mockIndexService(clusterService.state().metadata().getIndexSafe(index), clusterService); + } else { + return null; + } + }); + return indicesService; + } + + private IndexService mockIndexService(final IndexMetadata indexMetaData, ClusterService clusterService) { + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(anyInt())).then(invocation -> { + int shard = (Integer) invocation.getArguments()[0]; + final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard); + if (shard > indexMetaData.getNumberOfShards()) { + throw new ShardNotFoundException(shardId); + } + return mockIndexShard(shardId, clusterService); + }); + return indexService; + } + + @SuppressWarnings("unchecked") + private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { + final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.shardId()).thenReturn(shardId); + when(indexShard.state()).thenReturn(IndexShardState.STARTED); + doAnswer(invocation -> { + ActionListener callback = (ActionListener) invocation.getArguments()[0]; + if (isPrimaryMode.get()) { + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + + } else { + callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); + } + return null; + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + doAnswer(invocation -> { + long term = (Long)invocation.getArguments()[0]; + ActionListener callback = (ActionListener) invocation.getArguments()[3]; + final long primaryTerm = indexShard.getPendingPrimaryTerm(); + if (term < primaryTerm) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])", + shardId, term, primaryTerm)); + } + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard) + .acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); + when(indexShard.getActiveOperationsCount()).thenAnswer(i -> count.get()); + + when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { + final ClusterState state = clusterService.state(); + final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + final ShardRouting routing = node.getByShardId(shardId); + if (routing == null) { + throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node"); + } + return routing; + }); + when(indexShard.isRelocatedPrimary()).thenAnswer(invocationOnMock -> isRelocated.get()); + doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); + when(indexShard.getPendingPrimaryTerm()).thenAnswer(i -> + clusterService.state().metadata().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); + + ReplicationGroup replicationGroup = mock(ReplicationGroup.class); + when(indexShard.getReplicationGroup()).thenReturn(replicationGroup); + return indexShard; + } + + /** + * Transport channel that is needed for testing. + */ + public TransportChannel createTransportChannel(final PlainActionFuture listener) { + return new TransportChannel() { + + @Override + public String getProfileName() { + return ""; + } + + @Override + public void sendResponse(TransportResponse response) { + listener.onResponse(((TestResponse) response)); + } + + @Override + public void sendResponse(Exception exception) { + listener.onFailure(exception); + } + + @Override + public String getChannelType() { + return "replica_test"; + } + }; + } +} diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java index 3ff154caec6..84e6a7f9c87 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java @@ -34,7 +34,6 @@ package org.opensearch.action.support.replication; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.index.IndexingPressure; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; @@ -56,6 +55,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.index.Index; import org.opensearch.index.IndexService; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -382,7 +382,7 @@ public class TransportWriteActionTests extends OpenSearchTestCase { new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false, - new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap())); + new IndexingPressureService(Settings.EMPTY, TransportWriteActionTests.this.clusterService), new SystemIndices(emptyMap())); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -392,7 +392,7 @@ public class TransportWriteActionTests extends OpenSearchTestCase { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false, - new IndexingPressure(settings), new SystemIndices(emptyMap())); + new IndexingPressureService(settings, clusterService), new SystemIndices(emptyMap())); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java b/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java index a96b0fda3d2..69ad074d6fa 100644 --- a/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingPressureServiceTests.java @@ -60,7 +60,7 @@ public class IndexingPressureServiceTests extends OpenSearchTestCase { Settings.builder().put(settings), updated, getTestClass().getName()); clusterSettings.applySettings(updated.build()); - Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false); + Releasable releasable = service.markCoordinatingOperationStarted(1024, false); IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); assertNull(shardStats); IndexingPressureStats nodeStats = service.nodeStats(); diff --git a/server/src/test/java/org/opensearch/index/IndexingPressureTests.java b/server/src/test/java/org/opensearch/index/IndexingPressureTests.java index 58f492b86ac..48240fc95ca 100644 --- a/server/src/test/java/org/opensearch/index/IndexingPressureTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingPressureTests.java @@ -40,7 +40,8 @@ import org.opensearch.test.OpenSearchTestCase; public class IndexingPressureTests extends OpenSearchTestCase { - private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB").build(); + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB") + .put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(); public void testMemoryBytesMarkedAndReleased() { IndexingPressure indexingPressure = new IndexingPressure(settings); diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java index b2e2b96d2c6..28995786302 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -33,7 +33,6 @@ package org.opensearch.index.seqno; import org.opensearch.action.ActionListener; -import org.opensearch.index.IndexingPressure; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; import org.opensearch.action.support.PlainActionFuture; @@ -44,6 +43,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.Index; import org.opensearch.index.IndexService; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -120,7 +120,7 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new IndexingPressure(Settings.EMPTY), + new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap())); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -158,7 +158,7 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new IndexingPressure(Settings.EMPTY), + new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap())); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -199,7 +199,7 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase { threadPool, shardStateAction, new ActionFilters(Collections.emptySet()), - new IndexingPressure(Settings.EMPTY), + new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap())); assertNull(action.indexBlockLevel()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 7cf1933b59a..055e735272f 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -166,7 +166,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.TransportNodesListGatewayStartedShards; import org.opensearch.index.Index; -import org.opensearch.index.IndexingPressure; +import org.opensearch.index.IndexingPressureService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -1572,7 +1572,7 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase { threadPool, shardStateAction, actionFilters, - new IndexingPressure(settings), + new IndexingPressureService(settings, clusterService), new SystemIndices(emptyMap()))), new GlobalCheckpointSyncAction( settings, @@ -1599,7 +1599,7 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase { mappingUpdatedAction.setClient(client); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters, new IndexingPressure(settings), new SystemIndices(emptyMap())); + actionFilters, new IndexingPressureService(settings, clusterService), new SystemIndices(emptyMap())); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, new IngestService( @@ -1608,7 +1608,7 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase { Collections.emptyList(), client), transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), - new IndexingPressure(settings), + new IndexingPressureService(settings, clusterService), new SystemIndices(emptyMap()) )); final RestoreService restoreService = new RestoreService(