From 5bbbad34d28fbdb1eb3e972fa6ab90cd1a3cb551 Mon Sep 17 00:00:00 2001 From: Saurabh Singh Date: Wed, 7 Jul 2021 10:26:21 +0530 Subject: [PATCH] Add Shard Indexing Pressure Store (#478) (#838) * Add Shard Indexing Pressure Store (#478) Signed-off-by: Saurabh Singh * Added comments and shard allocation based on compute in hot store. Signed-off-by: Saurabh Singh Co-authored-by: Saurabh Singh --- .../common/settings/ClusterSettings.java | 9 +- .../index/ShardIndexingPressureStore.java | 125 ++++++++++++ .../ShardIndexingPressureStoreTests.java | 190 ++++++++++++++++++ 3 files changed, 323 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java create mode 100644 server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 669ec43dca6..41a5f8587ed 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -104,6 +104,8 @@ import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressure; +import org.opensearch.index.ShardIndexingPressureSettings; +import org.opensearch.index.ShardIndexingPressureStore; import org.opensearch.indices.IndexingMemoryController; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; @@ -585,7 +587,12 @@ public final class ClusterSettings extends AbstractScopedSettings { IndexingPressure.MAX_INDEXING_BYTES, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, - NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING))); + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED, + ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED, + ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW, + ShardIndexingPressureSettings.SHARD_MIN_LIMIT, + ShardIndexingPressureStore.MAX_COLD_STORE_SIZE))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java b/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java new file mode 100644 index 00000000000..9b75c99a675 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/ShardIndexingPressureStore.java @@ -0,0 +1,125 @@ +/* + * Copyright OpenSearch Contributors. + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.index; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.index.shard.ShardId; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BooleanSupplier; + +import static java.util.Objects.isNull; + +/** + * Shard indexing pressure store acts as a central repository for all the shard-level tracker objects currently being + * used at the Node level, for tracking indexing pressure requests. + * Store manages the tracker lifecycle, from creation, access, until it is evicted to be collected. + * + * Trackers are maintained at two levels for access simplicity and better memory management: + * + * 1. shardIndexingPressureHotStore : As the name suggests, it is hot store for tracker objects which are currently live i.e. being used + * to track an ongoing request. + * + * 2. shardIndexingPressureColdStore : This acts as the store for all the shard tracking objects which are currently being used + * by the framework. In addition to hot trackers, the recently used trackers which are although not currently live, but again can be used + * in near future, are also part of this store. To limit any memory implications, this store has an upper limit on the maximum number of + * trackers its can hold at any given time, which is a configurable dynamic setting. + * + * Tracking objects when created are part of both the hot store as well as cold store. However, once the object + * is no more live it is removed from the hot store. Objects in the cold store are evicted once the cold store + * reaches its maximum limit. Think of it like a periodic purge when upper limit is hit. + * During get if tracking object is not present in the hot store, a lookup is made into the cache store. If found, + * object is brought into the hot store again, until it remains active. If not present in the either store, a fresh + * object is instantiated and registered in both the stores for concurrent accesses. + * + * Note: The implementation of shardIndexingPressureColdStore methods is such that get, + * update and evict operations can be abstracted out to support any other strategy such as LRU, if + * discovered a need later. + * + */ +public class ShardIndexingPressureStore { + + // This represents the maximum value for the cold store size. + public static final Setting MAX_COLD_STORE_SIZE = + Setting.intSetting("shard_indexing_pressure.cache_store.max_size", 200, 100, 1000, + Setting.Property.NodeScope, Setting.Property.Dynamic); + + private final Map shardIndexingPressureHotStore = + ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map shardIndexingPressureColdStore = new HashMap<>(); + private final ShardIndexingPressureSettings shardIndexingPressureSettings; + + private volatile int maxColdStoreSize; + + public ShardIndexingPressureStore(ShardIndexingPressureSettings shardIndexingPressureSettings, + ClusterSettings clusterSettings, Settings settings) { + this.shardIndexingPressureSettings = shardIndexingPressureSettings; + this.maxColdStoreSize = MAX_COLD_STORE_SIZE.get(settings).intValue(); + clusterSettings.addSettingsUpdateConsumer(MAX_COLD_STORE_SIZE, this::setMaxColdStoreSize); + } + + public ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) { + ShardIndexingPressureTracker tracker = shardIndexingPressureHotStore.get(shardId); + if (isNull(tracker)) { + // Attempt from Indexing pressure cold store + tracker = shardIndexingPressureColdStore.get(shardId); + // If not already present in cold store instantiate a new one + if (isNull(tracker)) { + tracker = shardIndexingPressureHotStore.computeIfAbsent(shardId, (k) -> + new ShardIndexingPressureTracker(shardId, + this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(), + this.shardIndexingPressureSettings.getShardReplicaBaseLimits()) + ); + // Write through into the cold store for future reference + updateShardIndexingPressureColdStore(tracker); + } else { + // Attempt update tracker to the hot store and return the tracker which finally made to the hot store to avoid any race + ShardIndexingPressureTracker newTracker = shardIndexingPressureHotStore.putIfAbsent(shardId, tracker); + tracker = newTracker == null ? tracker : newTracker; + } + } + return tracker; + } + + public Map getShardIndexingPressureHotStore() { + return Collections.unmodifiableMap(shardIndexingPressureHotStore); + } + + public Map getShardIndexingPressureColdStore() { + return Collections.unmodifiableMap(shardIndexingPressureColdStore); + } + + public void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, BooleanSupplier condition) { + if (condition.getAsBoolean()) { + // Try inserting into cold store again in case there was an eviction triggered + shardIndexingPressureColdStore.putIfAbsent(tracker.getShardId(), tracker); + // Remove from the hot store + shardIndexingPressureHotStore.remove(tracker.getShardId(), tracker); + } + } + + /** + * This is used to update the reference of tracker in cold store, to be re-used later of tracker is removed from hot store upon request + * completion. When the cold store size reaches maximum, all the tracker objects in cold store are flushed. Flush is a less frequent + * (periodic) operation, can be sized based on workload. It is okay to not to synchronize counters being flushed, as + * objects in the cold store are only empty references, and can be re-initialized if needed. + */ + private void updateShardIndexingPressureColdStore(ShardIndexingPressureTracker tracker) { + if (shardIndexingPressureColdStore.size() > maxColdStoreSize) { + shardIndexingPressureColdStore.clear(); + } + shardIndexingPressureColdStore.put(tracker.getShardId(), tracker); + } + + private void setMaxColdStoreSize(int maxColdStoreSize) { + this.maxColdStoreSize = maxColdStoreSize; + } +} diff --git a/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java b/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java new file mode 100644 index 00000000000..7310fc54f2c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/ShardIndexingPressureStoreTests.java @@ -0,0 +1,190 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.junit.Before; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Map; + +public class ShardIndexingPressureStoreTests extends OpenSearchTestCase { + + private final Settings settings = Settings.builder() + .put(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.getKey(), 200) + .build(); + private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final ShardIndexingPressureSettings shardIndexingPressureSettings = + new ShardIndexingPressureSettings(new ClusterService(settings, clusterSettings, null), settings, + IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes()); + private ShardIndexingPressureStore store; + private ShardId testShardId; + + @Before + public void beforeTest() { + store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings); + testShardId = new ShardId("index", "uuid", 0); + } + + public void testShardIndexingPressureStoreGet() { + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId); + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(testShardId); + assertEquals(tracker1, tracker2); + } + + public void testGetVerifyTrackerInHotStore() { + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId); + + Map hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(1, hostStoreTrackers.size()); + ShardIndexingPressureTracker hotStoreTracker = hostStoreTrackers.get(testShardId); + assertEquals(tracker, hotStoreTracker); + } + + public void testTrackerCleanupFromHotStore() { + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId); + Map hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(1, hostStoreTrackers.size()); + + store.tryTrackerCleanupFromHotStore(tracker, () -> true); + + hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(0, hostStoreTrackers.size()); + + Map coldStoreTrackers = store.getShardIndexingPressureColdStore(); + assertEquals(1, coldStoreTrackers.size()); + ShardIndexingPressureTracker coldStoreTracker = coldStoreTrackers.get(testShardId); + assertEquals(tracker, coldStoreTracker); + } + + public void testTrackerCleanupSkippedFromHotStore() { + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId); + Map hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(1, hostStoreTrackers.size()); + + store.tryTrackerCleanupFromHotStore(tracker, () -> false); + + hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(1, hostStoreTrackers.size()); + ShardIndexingPressureTracker coldStoreTracker = hostStoreTrackers.get(testShardId); + assertEquals(tracker, coldStoreTracker); + } + + public void testTrackerRestoredToHotStorePostCleanup() { + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId); + Map hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(1, hostStoreTrackers.size()); + + store.tryTrackerCleanupFromHotStore(tracker1, () -> true); + + hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(0, hostStoreTrackers.size()); + + ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(testShardId); + hostStoreTrackers = store.getShardIndexingPressureHotStore(); + assertEquals(1, hostStoreTrackers.size()); + assertEquals(tracker1, tracker2); + } + + public void testTrackerEvictedFromColdStore() { + for (int i = 0; i <= ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings); i++) { + ShardId shardId = new ShardId("index", "uuid", i); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId); + store.tryTrackerCleanupFromHotStore(tracker, () -> true); + assertEquals(i + 1, store.getShardIndexingPressureColdStore().size()); + } + + // Verify cold store size is maximum + assertEquals(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings) + 1, + store.getShardIndexingPressureColdStore().size()); + + // get and remove one more tracker object from hot store + ShardId shardId = new ShardId("index", "uuid", + ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings) + 1); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId); + store.tryTrackerCleanupFromHotStore(tracker, () -> true); + + // Verify all trackers objects purged from cold store except the last + assertEquals(1, store.getShardIndexingPressureColdStore().size()); + assertEquals(tracker, store.getShardIndexingPressureColdStore().get(shardId)); + } + + public void testShardIndexingPressureStoreConcurrentGet() throws Exception { + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId); + final int NUM_THREADS = scaledRandomIntBetween(100, 500); + final Thread[] threads = new Thread[NUM_THREADS]; + final ShardIndexingPressureTracker[] trackers = new ShardIndexingPressureTracker[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + int counter = i; + threads[i] = new Thread(() -> { + trackers[counter] = store.getShardIndexingPressureTracker(testShardId); + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + for (int i = 0; i < NUM_THREADS; i++) { + assertEquals(tracker, trackers[i]); + } + assertEquals(1, store.getShardIndexingPressureHotStore().size()); + assertEquals(1, store.getShardIndexingPressureColdStore().size()); + assertEquals(tracker, store.getShardIndexingPressureHotStore().get(testShardId)); + assertEquals(tracker, store.getShardIndexingPressureColdStore().get(testShardId)); + } + + public void testShardIndexingPressureStoreConcurrentGetAndCleanup() throws Exception { + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId); + final int NUM_THREADS = scaledRandomIntBetween(100, 500); + final Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(() -> { + ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId); + assertEquals(tracker, tracker1); + store.tryTrackerCleanupFromHotStore(tracker, () -> true); + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(0, store.getShardIndexingPressureHotStore().size()); + assertEquals(1, store.getShardIndexingPressureColdStore().size()); + assertEquals(tracker, store.getShardIndexingPressureColdStore().get(testShardId)); + } + + public void testTrackerConcurrentEvictionFromColdStore() throws Exception { + int maxColdStoreSize = ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings); + final int NUM_THREADS = scaledRandomIntBetween(maxColdStoreSize * 2, maxColdStoreSize * 8); + final Thread[] threads = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + int counter = i; + threads[i] = new Thread(() -> { + ShardId shardId = new ShardId("index", "uuid", counter); + ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId); + store.tryTrackerCleanupFromHotStore(tracker, () -> true); + }); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertEquals(0, store.getShardIndexingPressureHotStore().size()); + assertTrue(store.getShardIndexingPressureColdStore().size() <= maxColdStoreSize + 1); + } +}