Delay the request size calculation until required by the indexing pressure framework (#1592)

Logically delay the request size calculation until it's absolutely required by the 
indexing pressure framework.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
Saurabh Singh 2022-01-26 13:06:01 -08:00 committed by GitHub
parent 6bb560f7f1
commit 447b20c457
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 12 deletions

View File

@ -210,9 +210,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override @Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem); final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(bulkRequest::ramBytesUsed, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close); final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try { try {
@ -631,7 +630,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted( final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId, shardId,
bulkShardRequest.ramBytesUsed(), bulkShardRequest::ramBytesUsed,
isOnlySystem isOnlySystem
); );
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() { shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {

View File

@ -13,6 +13,8 @@ import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats;
import java.util.function.LongSupplier;
/** /**
* Sets up classes for node/shard level indexing pressure. * Sets up classes for node/shard level indexing pressure.
* Provides abstraction and orchestration for indexing pressure interfaces when called from Transport Actions or for Stats. * Provides abstraction and orchestration for indexing pressure interfaces when called from Transport Actions or for Stats.
@ -25,22 +27,48 @@ public class IndexingPressureService {
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService); shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
} }
public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) { /**
* Marks the beginning of coordinating operation for an indexing request on the node. Rejects the operation if node's
* memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is disabled. Else empty releasable is returned.
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the node level memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markCoordinatingOperationStarted(LongSupplier bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled() == false) { if (isShardIndexingPressureEnabled() == false) {
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution); return shardIndexingPressure.markCoordinatingOperationStarted(bytes.getAsLong(), forceExecution);
} else { } else {
return () -> {}; return () -> {};
} }
} }
public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { /**
* Marks the beginning of coordinating operation for an indexing request on the Shard. Rejects the operation if shard's
* memory limit is breached.
* Performs the shard level accounting only if shard indexing pressure is enabled. Else empty releasable is returned.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the node level memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markCoordinatingOperationStarted(ShardId shardId, LongSupplier bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) { if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution); return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes.getAsLong(), forceExecution);
} else { } else {
return () -> {}; return () -> {};
} }
} }
/**
* Marks the beginning of primary operation for an indexing request. Rejects the operation if memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is not disabled. Else shard level accounting
* is performed.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) { if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, forceExecution); return shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, forceExecution);
@ -49,6 +77,15 @@ public class IndexingPressureService {
} }
} }
/**
* Marks the beginning of primary operation for an indexing request, when primary shard is local to the coordinator node.
* Rejects the operation if memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is not disabled. Else shard level accounting
* is performed.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) { public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) {
if (isShardIndexingPressureEnabled()) { if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, bytes); return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, bytes);
@ -57,6 +94,15 @@ public class IndexingPressureService {
} }
} }
/**
* Marks the beginning of replication operation for an indexing request. Rejects the operation if memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is not disabled. Else shard level accounting
* is performed.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) { public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) { if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, forceExecution); return shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, forceExecution);

View File

@ -9,7 +9,14 @@
package org.opensearch.index; package org.opensearch.index;
import org.junit.Before; import org.junit.Before;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Requests;
import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
@ -43,11 +50,18 @@ public class IndexingPressureServiceTests extends OpenSearchTestCase {
IndexingPressureService service = new IndexingPressureService(settings, clusterService); IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID"); Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0); ShardId shardId = new ShardId(index, 0);
BulkItemRequest[] items = new BulkItemRequest[1];
Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false); DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id").source(
Requests.INDEX_CONTENT_TYPE,
"foo",
"bar"
);
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, WriteRequest.RefreshPolicy.NONE, items);
Releasable releasable = service.markCoordinatingOperationStarted(shardId, bulkShardRequest::ramBytesUsed, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentCoordinatingBytes()); assertEquals(bulkShardRequest.ramBytesUsed(), shardStats.getCurrentCoordinatingBytes());
releasable.close(); releasable.close();
} }
@ -64,11 +78,12 @@ public class IndexingPressureServiceTests extends OpenSearchTestCase {
); );
clusterSettings.applySettings(updated.build()); clusterSettings.applySettings(updated.build());
Releasable releasable = service.markCoordinatingOperationStarted(1024, false); BulkRequest bulkRequest = new BulkRequest();
Releasable releasable = service.markCoordinatingOperationStarted(bulkRequest::ramBytesUsed, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId); IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats); assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats(); IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentCoordinatingBytes()); assertEquals(bulkRequest.ramBytesUsed(), nodeStats.getCurrentCoordinatingBytes());
releasable.close(); releasable.close();
} }