Add plumbing logic for IndexingPressureService in Transport Actions. (#1113)
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
parent
ac3f2af026
commit
cf6b6dfedc
|
@ -75,7 +75,7 @@ import org.opensearch.common.unit.TimeValue;
|
|||
import org.opensearch.common.util.concurrent.AtomicArray;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexNotFoundException;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.VersionType;
|
||||
import org.opensearch.index.seqno.SequenceNumbers;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
|
@ -127,7 +127,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
private final NodeClient client;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final IndexingPressureService indexingPressureService;
|
||||
private final SystemIndices systemIndices;
|
||||
|
||||
@Inject
|
||||
|
@ -135,17 +135,18 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
ClusterService clusterService, IngestService ingestService,
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressureService indexingPressureService,
|
||||
SystemIndices systemIndices) {
|
||||
this(threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
|
||||
indexNameExpressionResolver, autoCreateIndex, indexingPressure, systemIndices, System::nanoTime);
|
||||
indexNameExpressionResolver, autoCreateIndex, indexingPressureService, systemIndices, System::nanoTime);
|
||||
}
|
||||
|
||||
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, IngestService ingestService,
|
||||
TransportShardBulkAction shardBulkAction, NodeClient client,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices,
|
||||
LongSupplier relativeTimeProvider) {
|
||||
AutoCreateIndex autoCreateIndex, IndexingPressureService indexingPressureService,
|
||||
SystemIndices systemIndices, LongSupplier relativeTimeProvider) {
|
||||
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
|
||||
Objects.requireNonNull(relativeTimeProvider);
|
||||
this.threadPool = threadPool;
|
||||
|
@ -157,7 +158,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
this.ingestForwarder = new IngestActionForwarder(transportService);
|
||||
this.client = client;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.indexingPressureService = indexingPressureService;
|
||||
this.systemIndices = systemIndices;
|
||||
clusterService.addStateApplier(this.ingestForwarder);
|
||||
}
|
||||
|
@ -184,7 +185,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
|
||||
final long indexingBytes = bulkRequest.ramBytesUsed();
|
||||
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
|
||||
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
|
||||
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
|
||||
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
|
||||
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
|
||||
try {
|
||||
|
@ -562,7 +563,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
if (task != null) {
|
||||
bulkShardRequest.setParentTask(nodeId, task.getId());
|
||||
}
|
||||
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
|
||||
// Add the shard level accounting for coordinating and supply the listener
|
||||
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
|
||||
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(shardId,
|
||||
bulkShardRequest.ramBytesUsed(), isOnlySystem);
|
||||
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkShardResponse bulkShardResponse) {
|
||||
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
|
||||
|
@ -595,7 +600,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
|
||||
buildTookInMillis(startTimeNanos)));
|
||||
}
|
||||
});
|
||||
}, releasable::close));
|
||||
}
|
||||
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ import org.opensearch.common.unit.TimeValue;
|
|||
import org.opensearch.common.xcontent.ToXContent;
|
||||
import org.opensearch.common.xcontent.XContentHelper;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.engine.Engine;
|
||||
import org.opensearch.index.engine.VersionConflictEngineException;
|
||||
import org.opensearch.index.get.GetResult;
|
||||
|
@ -115,9 +115,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
|
||||
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
|
||||
IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
IndexingPressureService indexingPressureService, SystemIndices systemIndices) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
|
||||
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressureService, systemIndices);
|
||||
this.updateHelper = updateHelper;
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.opensearch.cluster.service.ClusterService;
|
|||
import org.opensearch.common.inject.Inject;
|
||||
import org.opensearch.common.io.stream.StreamInput;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.engine.Engine;
|
||||
import org.opensearch.index.seqno.SequenceNumbers;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
|
@ -80,11 +80,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
|||
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters,
|
||||
IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
IndexingPressureService indexingPressureService, SystemIndices systemIndices) {
|
||||
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION,
|
||||
true, /* we should never reject resync because of thread pool capacity on primary */
|
||||
indexingPressure, systemIndices);
|
||||
indexingPressureService, systemIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.opensearch.common.Nullable;
|
|||
import org.opensearch.common.io.stream.Writeable;
|
||||
import org.opensearch.common.lease.Releasable;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.engine.Engine;
|
||||
import org.opensearch.index.mapper.MapperParsingException;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
|
@ -76,7 +76,7 @@ public abstract class TransportWriteAction<
|
|||
Response extends ReplicationResponse & WriteResponse
|
||||
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
|
||||
|
||||
protected final IndexingPressure indexingPressure;
|
||||
protected final IndexingPressureService indexingPressureService;
|
||||
protected final SystemIndices systemIndices;
|
||||
|
||||
private final Function<IndexShard, String> executorFunction;
|
||||
|
@ -85,13 +85,14 @@ public abstract class TransportWriteAction<
|
|||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
|
||||
Writeable.Reader<ReplicaRequest> replicaRequest, Function<IndexShard, String> executorFunction,
|
||||
boolean forceExecutionOnPrimary, IndexingPressure indexingPressure, SystemIndices systemIndices) {
|
||||
boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService,
|
||||
SystemIndices systemIndices) {
|
||||
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
|
||||
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
|
||||
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
|
||||
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
|
||||
this.executorFunction = executorFunction;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.indexingPressureService = indexingPressureService;
|
||||
this.systemIndices = systemIndices;
|
||||
}
|
||||
|
||||
|
@ -101,7 +102,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
protected Releasable checkOperationLimits(Request request) {
|
||||
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
|
||||
return indexingPressureService.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request));
|
||||
}
|
||||
|
||||
protected boolean force(ReplicatedWriteRequest<?> request) {
|
||||
|
@ -119,7 +120,8 @@ public abstract class TransportWriteAction<
|
|||
// If this primary request was received from a local reroute initiated by the node client, we
|
||||
// must mark a new primary operation local to the coordinating node.
|
||||
if (localRerouteInitiatedByNodeClient) {
|
||||
return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request));
|
||||
return indexingPressureService.markPrimaryOperationLocalToCoordinatingNodeStarted(request.shardId,
|
||||
primaryOperationSize(request));
|
||||
} else {
|
||||
return () -> {};
|
||||
}
|
||||
|
@ -127,7 +129,7 @@ public abstract class TransportWriteAction<
|
|||
// If this primary request was received directly from the network, we must mark a new primary
|
||||
// operation. This happens if the write action skips the reroute step (ex: rsync) or during
|
||||
// primary delegation, after the primary relocation hand-off.
|
||||
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request));
|
||||
return indexingPressureService.markPrimaryOperationStarted(request.shardId, primaryOperationSize(request), force(request));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,7 +139,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
protected Releasable checkReplicaLimits(ReplicaRequest request) {
|
||||
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request));
|
||||
return indexingPressureService.markReplicaOperationStarted(request.shardId, replicaOperationSize(request), force(request));
|
||||
}
|
||||
|
||||
protected long replicaOperationSize(ReplicaRequest request) {
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.opensearch.common.settings.ClusterSettings;
|
|||
import org.opensearch.common.settings.Setting;
|
||||
import org.opensearch.common.settings.Setting.Property;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.node.Node;
|
||||
import org.opensearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -78,6 +79,8 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
|
||||
private RerouteService rerouteService;
|
||||
|
||||
private IndexingPressureService indexingPressureService;
|
||||
|
||||
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
|
||||
this(settings, clusterSettings, new MasterService(settings, clusterSettings, threadPool),
|
||||
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool));
|
||||
|
@ -203,6 +206,20 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
return masterService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter and Setter for IndexingPressureService, This method exposes IndexingPressureService stats to other plugins for usage.
|
||||
* Although Indexing Pressure instances can be accessed via Node and NodeService class but none of them are
|
||||
* present in the createComponents signature of Plugin interface currently. {@link org.opensearch.plugins.Plugin#createComponents}
|
||||
* Going forward, IndexingPressureService will have required constructs for exposing listeners/interfaces for plugin development.(#478)
|
||||
*/
|
||||
public void setIndexingPressureService(IndexingPressureService indexingPressureService) {
|
||||
this.indexingPressureService = indexingPressureService;
|
||||
}
|
||||
|
||||
public IndexingPressureService getIndexingPressureService() {
|
||||
return indexingPressureService;
|
||||
}
|
||||
|
||||
public ClusterApplierService getClusterApplierService() {
|
||||
return clusterApplierService;
|
||||
}
|
||||
|
|
|
@ -25,11 +25,19 @@ public class IndexingPressureService {
|
|||
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
|
||||
}
|
||||
|
||||
public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
|
||||
if (isShardIndexingPressureEnabled() == false) {
|
||||
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
|
||||
} else {
|
||||
return () -> {};
|
||||
}
|
||||
}
|
||||
|
||||
public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
|
||||
if (isShardIndexingPressureEnabled()) {
|
||||
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution);
|
||||
} else {
|
||||
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
|
||||
return () -> {};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.opensearch.ExceptionsHelper;
|
||||
import org.opensearch.action.ActionListener;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.action.support.ActionFilters;
|
||||
import org.opensearch.action.support.ActiveShardCount;
|
||||
import org.opensearch.action.support.WriteResponse;
|
||||
|
@ -55,6 +54,7 @@ import org.opensearch.common.io.stream.StreamOutput;
|
|||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.util.concurrent.ThreadContext;
|
||||
import org.opensearch.index.IndexNotFoundException;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
import org.opensearch.index.shard.IndexShardClosedException;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
|
@ -94,7 +94,7 @@ public class RetentionLeaseSyncAction extends
|
|||
final ThreadPool threadPool,
|
||||
final ShardStateAction shardStateAction,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexingPressure indexingPressure,
|
||||
final IndexingPressureService indexingPressureService,
|
||||
final SystemIndices systemIndices) {
|
||||
super(
|
||||
settings,
|
||||
|
@ -107,7 +107,7 @@ public class RetentionLeaseSyncAction extends
|
|||
actionFilters,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
RetentionLeaseSyncAction.Request::new,
|
||||
ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressure, systemIndices);
|
||||
ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressureService, systemIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.watcher.ResourceWatcherService;
|
||||
import org.opensearch.Assertions;
|
||||
import org.opensearch.Build;
|
||||
|
@ -120,7 +121,6 @@ import org.opensearch.gateway.MetaStateService;
|
|||
import org.opensearch.gateway.PersistedClusterStateService;
|
||||
import org.opensearch.http.HttpServerTransport;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.analysis.AnalysisRegistry;
|
||||
import org.opensearch.index.engine.EngineFactory;
|
||||
import org.opensearch.indices.IndicesModule;
|
||||
|
@ -599,7 +599,10 @@ public class Node implements Closeable {
|
|||
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
|
||||
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
|
||||
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
|
||||
final IndexingPressure indexingLimits = new IndexingPressure(settings);
|
||||
final IndexingPressureService indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
// Going forward, IndexingPressureService will have required constructs for exposing listeners/interfaces for plugin
|
||||
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
|
||||
clusterService.setIndexingPressureService(indexingPressureService);
|
||||
|
||||
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
|
||||
RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,
|
||||
|
@ -628,7 +631,7 @@ public class Node implements Closeable {
|
|||
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
|
||||
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
|
||||
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
|
||||
searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());
|
||||
searchTransportService, indexingPressureService, searchModule.getValuesSourceRegistry().getUsageService());
|
||||
|
||||
final SearchService searchService = newSearchService(clusterService, indicesService,
|
||||
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
|
||||
|
@ -664,7 +667,7 @@ public class Node implements Closeable {
|
|||
b.bind(ScriptService.class).toInstance(scriptService);
|
||||
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
|
||||
b.bind(IngestService.class).toInstance(ingestService);
|
||||
b.bind(IndexingPressure.class).toInstance(indexingLimits);
|
||||
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
|
||||
b.bind(UsageService.class).toInstance(usageService);
|
||||
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
|
||||
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
|
||||
|
|
|
@ -32,7 +32,6 @@
|
|||
|
||||
package org.opensearch.node;
|
||||
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.core.internal.io.IOUtils;
|
||||
import org.opensearch.Build;
|
||||
import org.opensearch.Version;
|
||||
|
@ -46,6 +45,7 @@ import org.opensearch.common.settings.Settings;
|
|||
import org.opensearch.common.settings.SettingsFilter;
|
||||
import org.opensearch.discovery.Discovery;
|
||||
import org.opensearch.http.HttpServerTransport;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.indices.IndicesService;
|
||||
import org.opensearch.indices.breaker.CircuitBreakerService;
|
||||
import org.opensearch.ingest.IngestService;
|
||||
|
@ -74,7 +74,7 @@ public class NodeService implements Closeable {
|
|||
private final HttpServerTransport httpServerTransport;
|
||||
private final ResponseCollectorService responseCollectorService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final IndexingPressure indexingPressure;
|
||||
private final IndexingPressureService indexingPressureService;
|
||||
private final AggregationUsageService aggregationUsageService;
|
||||
|
||||
private final Discovery discovery;
|
||||
|
@ -84,7 +84,7 @@ public class NodeService implements Closeable {
|
|||
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
|
||||
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
|
||||
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
|
||||
SearchTransportService searchTransportService, IndexingPressure indexingPressure,
|
||||
SearchTransportService searchTransportService, IndexingPressureService indexingPressureService,
|
||||
AggregationUsageService aggregationUsageService) {
|
||||
this.settings = settings;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -100,7 +100,7 @@ public class NodeService implements Closeable {
|
|||
this.scriptService = scriptService;
|
||||
this.responseCollectorService = responseCollectorService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.indexingPressure = indexingPressure;
|
||||
this.indexingPressureService = indexingPressureService;
|
||||
this.aggregationUsageService = aggregationUsageService;
|
||||
clusterService.addStateApplier(ingestService);
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ public class NodeService implements Closeable {
|
|||
ingest ? ingestService.stats() : null,
|
||||
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
|
||||
scriptCache ? scriptService.cacheStats() : null,
|
||||
indexingPressure ? this.indexingPressure.stats() : null
|
||||
indexingPressure ? this.indexingPressureService.nodeStats() : null
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,13 +44,14 @@ import org.opensearch.cluster.metadata.Metadata;
|
|||
import org.opensearch.cluster.node.DiscoveryNode;
|
||||
import org.opensearch.cluster.node.DiscoveryNodes;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.unit.TimeValue;
|
||||
import org.opensearch.common.util.concurrent.AtomicArray;
|
||||
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
|
||||
import org.opensearch.index.IndexNotFoundException;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.VersionType;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.indices.SystemIndices;
|
||||
import org.opensearch.tasks.Task;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
|
@ -138,7 +139,9 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends OpenSear
|
|||
final ExecutorService direct = OpenSearchExecutors.newDirectExecutorService();
|
||||
when(threadPool.executor(anyString())).thenReturn(direct);
|
||||
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
|
||||
null, null, null, mock(ActionFilters.class), null, null, new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap())) {
|
||||
null, null, null, mock(ActionFilters.class), null, null,
|
||||
new IndexingPressureService(Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)), new SystemIndices(emptyMap())) {
|
||||
@Override
|
||||
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
|
||||
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
|
||||
|
|
|
@ -67,7 +67,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors;
|
|||
import org.opensearch.common.util.concurrent.ThreadContext;
|
||||
import org.opensearch.index.IndexNotFoundException;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.indices.SystemIndices;
|
||||
import org.opensearch.ingest.IngestService;
|
||||
import org.opensearch.tasks.Task;
|
||||
|
@ -163,7 +163,8 @@ public class TransportBulkActionIngestTests extends OpenSearchTestCase {
|
|||
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
|
||||
new SystemIndices(emptyMap())
|
||||
), new IndexingPressure(SETTINGS), new SystemIndices(emptyMap())
|
||||
), new IndexingPressureService(SETTINGS, new ClusterService(SETTINGS, new ClusterSettings(SETTINGS,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)), new SystemIndices(emptyMap())
|
||||
);
|
||||
}
|
||||
@Override
|
||||
|
|
|
@ -58,7 +58,7 @@ import org.opensearch.common.settings.Settings;
|
|||
import org.opensearch.common.unit.TimeValue;
|
||||
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
|
||||
import org.opensearch.index.IndexNotFoundException;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.VersionType;
|
||||
import org.opensearch.indices.SystemIndexDescriptor;
|
||||
import org.opensearch.indices.SystemIndices;
|
||||
|
@ -105,7 +105,7 @@ public class TransportBulkActionTests extends OpenSearchTestCase {
|
|||
super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null,
|
||||
null, new ActionFilters(Collections.emptySet()), new Resolver(),
|
||||
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())),
|
||||
new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap()));
|
||||
new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -55,8 +55,8 @@ import org.opensearch.common.util.concurrent.AtomicArray;
|
|||
import org.opensearch.common.util.concurrent.ThreadContext;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.index.IndexNotFoundException;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.rest.action.document.RestBulkAction;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.indices.SystemIndices;
|
||||
import org.opensearch.tasks.Task;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
|
@ -266,7 +266,7 @@ public class TransportBulkActionTookTests extends OpenSearchTestCase {
|
|||
actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
autoCreateIndex,
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new IndexingPressureService(Settings.EMPTY, clusterService),
|
||||
new SystemIndices(emptyMap()),
|
||||
relativeTimeProvider);
|
||||
}
|
||||
|
|
|
@ -33,8 +33,6 @@ package org.opensearch.action.resync;
|
|||
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.action.ActionListener;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.action.support.ActionFilters;
|
||||
import org.opensearch.action.support.PlainActionFuture;
|
||||
import org.opensearch.cluster.ClusterState;
|
||||
|
@ -53,6 +51,8 @@ import org.opensearch.common.settings.Settings;
|
|||
import org.opensearch.common.util.PageCacheRecycler;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexService;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
import org.opensearch.index.shard.ReplicationGroup;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
|
@ -162,7 +162,7 @@ public class TransportResyncReplicationActionTests extends OpenSearchTestCase {
|
|||
|
||||
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
|
||||
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
|
||||
new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap()));
|
||||
new IndexingPressureService(Settings.EMPTY, clusterService), new SystemIndices(emptyMap()));
|
||||
|
||||
assertThat(action.globalBlockLevel(), nullValue());
|
||||
assertThat(action.indexBlockLevel(), nullValue());
|
||||
|
|
|
@ -0,0 +1,487 @@
|
|||
/*
|
||||
* Copyright OpenSearch Contributors.
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package org.opensearch.action.support.replication;
|
||||
|
||||
import org.opensearch.action.ActionListener;
|
||||
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.opensearch.action.support.ActionFilters;
|
||||
import org.opensearch.action.support.PlainActionFuture;
|
||||
import org.opensearch.action.support.WriteResponse;
|
||||
import org.opensearch.cluster.ClusterState;
|
||||
import org.opensearch.cluster.action.shard.ShardStateAction;
|
||||
import org.opensearch.cluster.metadata.IndexMetadata;
|
||||
import org.opensearch.cluster.routing.RoutingNode;
|
||||
import org.opensearch.cluster.routing.ShardRouting;
|
||||
import org.opensearch.cluster.routing.ShardRoutingState;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.Nullable;
|
||||
import org.opensearch.common.io.stream.StreamInput;
|
||||
import org.opensearch.common.lease.Releasable;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexService;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.ShardIndexingPressureSettings;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
import org.opensearch.index.shard.IndexShardState;
|
||||
import org.opensearch.index.shard.ReplicationGroup;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.index.shard.ShardNotFoundException;
|
||||
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
|
||||
import org.opensearch.index.stats.IndexingPressurePerShardStats;
|
||||
import org.opensearch.index.translog.Translog;
|
||||
import org.opensearch.indices.IndicesService;
|
||||
import org.opensearch.indices.SystemIndices;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
import org.opensearch.test.transport.CapturingTransport;
|
||||
import org.opensearch.threadpool.TestThreadPool;
|
||||
import org.opensearch.threadpool.ThreadPool;
|
||||
import org.opensearch.transport.TransportChannel;
|
||||
import org.opensearch.transport.TransportResponse;
|
||||
import org.opensearch.transport.TransportService;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state;
|
||||
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.opensearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TransportWriteActionForIndexingPressureTests extends OpenSearchTestCase {
|
||||
private static ThreadPool threadPool;
|
||||
|
||||
private ClusterService clusterService;
|
||||
private TransportService transportService;
|
||||
private CapturingTransport transport;
|
||||
private ShardStateAction shardStateAction;
|
||||
private Translog.Location location;
|
||||
private Releasable releasable;
|
||||
private IndexingPressureService indexingPressureService;
|
||||
|
||||
public static final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new TestThreadPool("ShardReplicationTests");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
transport = new CapturingTransport();
|
||||
clusterService = createClusterService(threadPool);
|
||||
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
|
||||
releasable = mock(Releasable.class);
|
||||
location = mock(Translog.Location.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
clusterService.close();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
|
||||
threadPool = null;
|
||||
}
|
||||
|
||||
public void testIndexingPressureOperationStartedForReplicaNode() {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
|
||||
setState(clusterService, state);
|
||||
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
||||
final ReplicationTask task = maybeTask();
|
||||
final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false)
|
||||
.build();
|
||||
this.indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
|
||||
TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService,
|
||||
shardStateAction, threadPool);
|
||||
|
||||
action.handleReplicaRequest(
|
||||
new TransportReplicationAction.ConcreteReplicaRequest<>(
|
||||
new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong()),
|
||||
createTransportChannel(new PlainActionFuture<>()), task);
|
||||
|
||||
IndexingPressurePerShardStats shardStats =
|
||||
this.indexingPressureService.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
|
||||
|
||||
assertPhase(task, "finished");
|
||||
assertTrue(Objects.isNull(shardStats));
|
||||
}
|
||||
|
||||
public void testIndexingPressureOperationStartedForReplicaShard() {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
|
||||
setState(clusterService, state);
|
||||
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
||||
final ReplicationTask task = maybeTask();
|
||||
final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
|
||||
.build();
|
||||
this.indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
|
||||
TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService,
|
||||
shardStateAction, threadPool);
|
||||
|
||||
action.handleReplicaRequest(
|
||||
new TransportReplicationAction.ConcreteReplicaRequest<>(
|
||||
new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong()),
|
||||
createTransportChannel(new PlainActionFuture<>()), task);
|
||||
|
||||
CommonStatsFlags statsFlag = new CommonStatsFlags();
|
||||
statsFlag.includeAllShardIndexingPressureTrackers(true);
|
||||
IndexingPressurePerShardStats shardStats =
|
||||
this.indexingPressureService.shardStats(statsFlag).getIndexingPressureShardStats(shardId);
|
||||
|
||||
assertPhase(task, "finished");
|
||||
assertTrue(!Objects.isNull(shardStats));
|
||||
assertEquals(100, shardStats.getTotalReplicaBytes());
|
||||
}
|
||||
|
||||
public void testIndexingPressureOperationStartedForPrimaryNode() {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
|
||||
setState(clusterService, state);
|
||||
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
||||
final ReplicationTask task = maybeTask();
|
||||
final Settings settings =
|
||||
Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build();
|
||||
this.indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
|
||||
TestAction action = new TestAction(settings, "internal:testActionWithExceptions", transportService, clusterService,
|
||||
shardStateAction, threadPool);
|
||||
|
||||
action.handlePrimaryRequest(
|
||||
new TransportReplicationAction.ConcreteReplicaRequest<>(
|
||||
new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong()),
|
||||
createTransportChannel(new PlainActionFuture<>()), task);
|
||||
|
||||
IndexingPressurePerShardStats shardStats =
|
||||
this.indexingPressureService.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
|
||||
|
||||
assertPhase(task, "finished");
|
||||
assertTrue(Objects.isNull(shardStats));
|
||||
}
|
||||
|
||||
public void testIndexingPressureOperationStartedForPrimaryShard() {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
|
||||
setState(clusterService, state);
|
||||
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
||||
final ReplicationTask task = maybeTask();
|
||||
final Settings settings =
|
||||
Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true).build();
|
||||
this.indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
|
||||
TestAction action = new TestAction(settings, "internal:testActionWithExceptions", transportService, clusterService,
|
||||
shardStateAction, threadPool);
|
||||
|
||||
action.handlePrimaryRequest(
|
||||
new TransportReplicationAction.ConcreteReplicaRequest<>(
|
||||
new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong()),
|
||||
createTransportChannel(new PlainActionFuture<>()), task);
|
||||
|
||||
CommonStatsFlags statsFlag = new CommonStatsFlags();
|
||||
statsFlag.includeAllShardIndexingPressureTrackers(true);
|
||||
IndexingPressurePerShardStats shardStats =
|
||||
this.indexingPressureService.shardStats(statsFlag).getIndexingPressureShardStats(shardId);
|
||||
|
||||
assertPhase(task, "finished");
|
||||
assertTrue(!Objects.isNull(shardStats));
|
||||
assertEquals(100, shardStats.getTotalPrimaryBytes());
|
||||
}
|
||||
|
||||
public void testIndexingPressureOperationStartedForLocalPrimaryNode() {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
|
||||
setState(clusterService, state);
|
||||
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
||||
final ReplicationTask task = maybeTask();
|
||||
final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false)
|
||||
.build();
|
||||
this.indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
|
||||
TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService,
|
||||
shardStateAction, threadPool);
|
||||
|
||||
action.handlePrimaryRequest(
|
||||
new TransportReplicationAction.ConcreteShardRequest<>(
|
||||
new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
|
||||
true, true),
|
||||
createTransportChannel(new PlainActionFuture<>()), task);
|
||||
|
||||
IndexingPressurePerShardStats shardStats =
|
||||
this.indexingPressureService.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
|
||||
|
||||
assertPhase(task, "finished");
|
||||
assertTrue(Objects.isNull(shardStats));
|
||||
}
|
||||
|
||||
public void testIndexingPressureOperationStartedForLocalPrimaryShard() {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
|
||||
setState(clusterService, state);
|
||||
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
||||
final ReplicationTask task = maybeTask();
|
||||
final Settings settings = Settings.builder().put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
|
||||
.build();
|
||||
this.indexingPressureService = new IndexingPressureService(settings, clusterService);
|
||||
|
||||
TestAction action = new TestAction(settings, "internal:testAction", transportService, clusterService,
|
||||
shardStateAction, threadPool);
|
||||
|
||||
action.handlePrimaryRequest(
|
||||
new TransportReplicationAction.ConcreteShardRequest<>(
|
||||
new TestRequest(), replicaRouting.allocationId().getId(), randomNonNegativeLong(),
|
||||
true, true),
|
||||
createTransportChannel(new PlainActionFuture<>()), task);
|
||||
|
||||
CommonStatsFlags statsFlag = new CommonStatsFlags();
|
||||
statsFlag.includeAllShardIndexingPressureTrackers(true);
|
||||
IndexingPressurePerShardStats shardStats =
|
||||
this.indexingPressureService.shardStats(statsFlag).getIndexingPressureShardStats(shardId);
|
||||
|
||||
assertPhase(task, "finished");
|
||||
assertTrue(!Objects.isNull(shardStats));
|
||||
}
|
||||
|
||||
private final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
private final AtomicBoolean isRelocated = new AtomicBoolean(false);
|
||||
|
||||
private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true);
|
||||
|
||||
/**
|
||||
* Sometimes build a ReplicationTask for tracking the phase of the
|
||||
* TransportReplicationAction. Since TransportReplicationAction has to work
|
||||
* if the task as null just as well as if it is supplied this returns null
|
||||
* half the time.
|
||||
*/
|
||||
ReplicationTask maybeTask() {
|
||||
return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, null) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the task is non-null this asserts that the phrase matches.
|
||||
*/
|
||||
void assertPhase(@Nullable ReplicationTask task, String phase) {
|
||||
assertPhase(task, equalTo(phase));
|
||||
}
|
||||
|
||||
private void assertPhase(@Nullable ReplicationTask task, Matcher<String> phaseMatcher) {
|
||||
if (task != null) {
|
||||
assertThat(task.getPhase(), phaseMatcher);
|
||||
}
|
||||
}
|
||||
|
||||
private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
|
||||
protected TestAction(Settings settings, String actionName, TransportService transportService,
|
||||
ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) {
|
||||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
|
||||
TransportWriteActionForIndexingPressureTests.this.indexingPressureService, new SystemIndices(emptyMap()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TestResponse newResponseInstance(StreamInput in) throws IOException {
|
||||
return new TestResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long primaryOperationSize(TestRequest request) {
|
||||
return 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long replicaOperationSize(TestRequest request) {
|
||||
return 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void dispatchedShardOperationOnPrimary(
|
||||
TestRequest request, IndexShard primary, ActionListener<PrimaryResult<TestRequest, TestResponse>> listener) {
|
||||
ActionListener.completeWith(listener, () -> new WritePrimaryResult<>(request, new TestResponse(), location, null, primary,
|
||||
logger));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
|
||||
ActionListener.completeWith(listener, () -> new WriteReplicaResult<>(request, location, null, replica, logger));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class TestRequest extends ReplicatedWriteRequest<TestRequest> {
|
||||
TestRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
TestRequest() {
|
||||
super(new ShardId("test", "_na_", 0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TestRequest{}";
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestResponse extends ReplicationResponse implements WriteResponse {
|
||||
boolean forcedRefresh;
|
||||
|
||||
@Override
|
||||
public void setForcedRefresh(boolean forcedRefresh) {
|
||||
this.forcedRefresh = forcedRefresh;
|
||||
}
|
||||
}
|
||||
|
||||
private IndicesService mockIndicesService(ClusterService clusterService) {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> {
|
||||
Index index = (Index)invocation.getArguments()[0];
|
||||
final ClusterState state = clusterService.state();
|
||||
final IndexMetadata indexSafe = state.metadata().getIndexSafe(index);
|
||||
return mockIndexService(indexSafe, clusterService);
|
||||
});
|
||||
when(indicesService.indexService(any(Index.class))).then(invocation -> {
|
||||
Index index = (Index) invocation.getArguments()[0];
|
||||
final ClusterState state = clusterService.state();
|
||||
if (state.metadata().hasIndex(index.getName())) {
|
||||
return mockIndexService(clusterService.state().metadata().getIndexSafe(index), clusterService);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return indicesService;
|
||||
}
|
||||
|
||||
private IndexService mockIndexService(final IndexMetadata indexMetaData, ClusterService clusterService) {
|
||||
final IndexService indexService = mock(IndexService.class);
|
||||
when(indexService.getShard(anyInt())).then(invocation -> {
|
||||
int shard = (Integer) invocation.getArguments()[0];
|
||||
final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
|
||||
if (shard > indexMetaData.getNumberOfShards()) {
|
||||
throw new ShardNotFoundException(shardId);
|
||||
}
|
||||
return mockIndexShard(shardId, clusterService);
|
||||
});
|
||||
return indexService;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
|
||||
final IndexShard indexShard = mock(IndexShard.class);
|
||||
when(indexShard.shardId()).thenReturn(shardId);
|
||||
when(indexShard.state()).thenReturn(IndexShardState.STARTED);
|
||||
doAnswer(invocation -> {
|
||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
|
||||
if (isPrimaryMode.get()) {
|
||||
count.incrementAndGet();
|
||||
callback.onResponse(count::decrementAndGet);
|
||||
|
||||
} else {
|
||||
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
|
||||
}
|
||||
return null;
|
||||
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
|
||||
doAnswer(invocation -> {
|
||||
long term = (Long)invocation.getArguments()[0];
|
||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[3];
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
if (term < primaryTerm) {
|
||||
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
|
||||
shardId, term, primaryTerm));
|
||||
}
|
||||
count.incrementAndGet();
|
||||
callback.onResponse(count::decrementAndGet);
|
||||
return null;
|
||||
}).when(indexShard)
|
||||
.acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
|
||||
when(indexShard.getActiveOperationsCount()).thenAnswer(i -> count.get());
|
||||
|
||||
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
|
||||
final ClusterState state = clusterService.state();
|
||||
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
|
||||
final ShardRouting routing = node.getByShardId(shardId);
|
||||
if (routing == null) {
|
||||
throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node");
|
||||
}
|
||||
return routing;
|
||||
});
|
||||
when(indexShard.isRelocatedPrimary()).thenAnswer(invocationOnMock -> isRelocated.get());
|
||||
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
|
||||
when(indexShard.getPendingPrimaryTerm()).thenAnswer(i ->
|
||||
clusterService.state().metadata().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
|
||||
|
||||
ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
|
||||
when(indexShard.getReplicationGroup()).thenReturn(replicationGroup);
|
||||
return indexShard;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transport channel that is needed for testing.
|
||||
*/
|
||||
public TransportChannel createTransportChannel(final PlainActionFuture<TestResponse> listener) {
|
||||
return new TransportChannel() {
|
||||
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) {
|
||||
listener.onResponse(((TestResponse) response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Exception exception) {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelType() {
|
||||
return "replica_test";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -34,7 +34,6 @@ package org.opensearch.action.support.replication;
|
|||
|
||||
import org.opensearch.OpenSearchException;
|
||||
import org.opensearch.action.ActionListener;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.action.support.ActionFilters;
|
||||
import org.opensearch.action.support.ActionTestUtils;
|
||||
import org.opensearch.action.support.PlainActionFuture;
|
||||
|
@ -56,6 +55,7 @@ import org.opensearch.common.lease.Releasable;
|
|||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexService;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.index.shard.ShardNotFoundException;
|
||||
|
@ -382,7 +382,7 @@ public class TransportWriteActionTests extends OpenSearchTestCase {
|
|||
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
|
||||
new IndexingPressure(Settings.EMPTY), new SystemIndices(emptyMap()));
|
||||
new IndexingPressureService(Settings.EMPTY, TransportWriteActionTests.this.clusterService), new SystemIndices(emptyMap()));
|
||||
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
|
||||
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
|
||||
}
|
||||
|
@ -392,7 +392,7 @@ public class TransportWriteActionTests extends OpenSearchTestCase {
|
|||
super(settings, actionName, transportService, clusterService,
|
||||
mockIndicesService(clusterService), threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ignore -> ThreadPool.Names.SAME, false,
|
||||
new IndexingPressure(settings), new SystemIndices(emptyMap()));
|
||||
new IndexingPressureService(settings, clusterService), new SystemIndices(emptyMap()));
|
||||
this.withDocumentFailureOnPrimary = false;
|
||||
this.withDocumentFailureOnReplica = false;
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class IndexingPressureServiceTests extends OpenSearchTestCase {
|
|||
Settings.builder().put(settings), updated, getTestClass().getName());
|
||||
clusterSettings.applySettings(updated.build());
|
||||
|
||||
Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false);
|
||||
Releasable releasable = service.markCoordinatingOperationStarted(1024, false);
|
||||
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
|
||||
assertNull(shardStats);
|
||||
IndexingPressureStats nodeStats = service.nodeStats();
|
||||
|
|
|
@ -40,7 +40,8 @@ import org.opensearch.test.OpenSearchTestCase;
|
|||
|
||||
public class IndexingPressureTests extends OpenSearchTestCase {
|
||||
|
||||
private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB").build();
|
||||
private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
|
||||
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build();
|
||||
|
||||
public void testMemoryBytesMarkedAndReleased() {
|
||||
IndexingPressure indexingPressure = new IndexingPressure(settings);
|
||||
|
|
|
@ -33,7 +33,6 @@
|
|||
package org.opensearch.index.seqno;
|
||||
|
||||
import org.opensearch.action.ActionListener;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.action.support.ActionFilters;
|
||||
import org.opensearch.action.support.ActionTestUtils;
|
||||
import org.opensearch.action.support.PlainActionFuture;
|
||||
|
@ -44,6 +43,7 @@ import org.opensearch.common.settings.Settings;
|
|||
import org.opensearch.core.internal.io.IOUtils;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexService;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.shard.IndexShard;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.indices.IndicesService;
|
||||
|
@ -120,7 +120,7 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new IndexingPressureService(Settings.EMPTY, clusterService),
|
||||
new SystemIndices(emptyMap()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
@ -158,7 +158,7 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new IndexingPressureService(Settings.EMPTY, clusterService),
|
||||
new SystemIndices(emptyMap()));
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
|
@ -199,7 +199,7 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
new IndexingPressure(Settings.EMPTY),
|
||||
new IndexingPressureService(Settings.EMPTY, clusterService),
|
||||
new SystemIndices(emptyMap()));
|
||||
|
||||
assertNull(action.indexBlockLevel());
|
||||
|
|
|
@ -166,7 +166,7 @@ import org.opensearch.env.TestEnvironment;
|
|||
import org.opensearch.gateway.MetaStateService;
|
||||
import org.opensearch.gateway.TransportNodesListGatewayStartedShards;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.IndexingPressureService;
|
||||
import org.opensearch.index.analysis.AnalysisRegistry;
|
||||
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
|
||||
import org.opensearch.index.seqno.RetentionLeaseSyncer;
|
||||
|
@ -1572,7 +1572,7 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase {
|
|||
threadPool,
|
||||
shardStateAction,
|
||||
actionFilters,
|
||||
new IndexingPressure(settings),
|
||||
new IndexingPressureService(settings, clusterService),
|
||||
new SystemIndices(emptyMap()))),
|
||||
new GlobalCheckpointSyncAction(
|
||||
settings,
|
||||
|
@ -1599,7 +1599,7 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase {
|
|||
mappingUpdatedAction.setClient(client);
|
||||
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
|
||||
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
|
||||
actionFilters, new IndexingPressure(settings), new SystemIndices(emptyMap()));
|
||||
actionFilters, new IndexingPressureService(settings, clusterService), new SystemIndices(emptyMap()));
|
||||
actions.put(BulkAction.INSTANCE,
|
||||
new TransportBulkAction(threadPool, transportService, clusterService,
|
||||
new IngestService(
|
||||
|
@ -1608,7 +1608,7 @@ public class SnapshotResiliencyTests extends OpenSearchTestCase {
|
|||
Collections.emptyList(), client),
|
||||
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
|
||||
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())),
|
||||
new IndexingPressure(settings),
|
||||
new IndexingPressureService(settings, clusterService),
|
||||
new SystemIndices(emptyMap())
|
||||
));
|
||||
final RestoreService restoreService = new RestoreService(
|
||||
|
|
Loading…
Reference in New Issue