Add Indexing Pressure Service which acts as orchestrator for indexing pressure interfaces. (#1084)

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
Saurabh Singh 2021-08-20 01:07:15 +05:30 committed by Rabi Panda
parent 7fbeb87f95
commit ac3f2af026
2 changed files with 234 additions and 0 deletions

View File

@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.index;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
/**
* Sets up classes for node/shard level indexing pressure.
* Provides abstraction and orchestration for indexing pressure interfaces when called from Transport Actions or for Stats.
*/
public class IndexingPressureService {
private final ShardIndexingPressure shardIndexingPressure;
public IndexingPressureService(Settings settings, ClusterService clusterService) {
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
}
public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution);
} else {
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
}
}
public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, forceExecution);
} else {
return shardIndexingPressure.markPrimaryOperationStarted(bytes, forceExecution);
}
}
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, bytes);
} else {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(bytes);
}
}
public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, forceExecution);
} else {
return shardIndexingPressure.markReplicaOperationStarted(bytes, forceExecution);
}
}
public IndexingPressureStats nodeStats() {
return shardIndexingPressure.stats();
}
public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) {
return shardIndexingPressure.shardStats(statsFlags);
}
private boolean isShardIndexingPressureEnabled() {
return shardIndexingPressure.isShardIndexingPressureEnabled();
}
}

View File

@ -0,0 +1,163 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index;
import org.junit.Before;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.stats.IndexingPressurePerShardStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.test.OpenSearchTestCase;
public class IndexingPressureServiceTests extends OpenSearchTestCase {
private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS.getKey(), 1)
.put(ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT.getKey(), "20ms")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 100)
.build();
private ClusterSettings clusterSettings;
private ClusterService clusterService;
@Before
public void beforeTest() {
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterService = new ClusterService(settings, clusterSettings, null);
}
public void testCoordinatingOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentCoordinatingBytes());
releasable.close();
}
public void testCoordinatingOperationForIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Settings.Builder updated = Settings.builder();
clusterSettings.updateDynamicSettings(Settings.builder()
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(),
Settings.builder().put(settings), updated, getTestClass().getName());
clusterSettings.applySettings(updated.build());
Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentCoordinatingBytes());
releasable.close();
}
public void testPrimaryOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Releasable releasable = service.markPrimaryOperationStarted(shardId, 1024, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentPrimaryBytes());
releasable.close();
}
public void testPrimaryOperationForIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Settings.Builder updated = Settings.builder();
clusterSettings.updateDynamicSettings(Settings.builder()
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(),
Settings.builder().put(settings), updated, getTestClass().getName());
clusterSettings.applySettings(updated.build());
Releasable releasable = service.markPrimaryOperationStarted(shardId, 1024, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentPrimaryBytes());
releasable.close();
}
public void testLocalPrimaryOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Releasable releasable = service.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentPrimaryBytes());
releasable.close();
}
public void testLocalPrimaryOperationForIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Settings.Builder updated = Settings.builder();
clusterSettings.updateDynamicSettings(Settings.builder()
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(),
Settings.builder().put(settings), updated, getTestClass().getName());
clusterSettings.applySettings(updated.build());
Releasable releasable = service.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, 1024);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentPrimaryBytes());
releasable.close();
}
public void testReplicaOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Releasable releasable = service.markReplicaOperationStarted(shardId, 1024, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentReplicaBytes());
releasable.close();
}
public void testReplicaOperationForIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
Settings.Builder updated = Settings.builder();
clusterSettings.updateDynamicSettings(Settings.builder()
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false).build(),
Settings.builder().put(settings), updated, getTestClass().getName());
clusterSettings.applySettings(updated.build());
Releasable releasable = service.markReplicaOperationStarted(shardId, 1024, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentReplicaBytes());
releasable.close();
}
}