mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-07 13:38:49 +00:00
* Add Shard Indexing Pressure Store (#478) Signed-off-by: Saurabh Singh <sisurab@amazon.com> * Added comments and shard allocation based on compute in hot store. Signed-off-by: Saurabh Singh <sisurab@amazon.com> Co-authored-by: Saurabh Singh <sisurab@amazon.com>
This commit is contained in:
parent
90000a3f53
commit
5bbbad34d2
@ -104,6 +104,8 @@ import org.opensearch.http.HttpTransportSettings;
|
||||
import org.opensearch.index.IndexModule;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.IndexingPressure;
|
||||
import org.opensearch.index.ShardIndexingPressureSettings;
|
||||
import org.opensearch.index.ShardIndexingPressureStore;
|
||||
import org.opensearch.indices.IndexingMemoryController;
|
||||
import org.opensearch.indices.IndicesQueryCache;
|
||||
import org.opensearch.indices.IndicesRequestCache;
|
||||
@ -585,7 +587,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
IndexingPressure.MAX_INDEXING_BYTES,
|
||||
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
|
||||
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
|
||||
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING)));
|
||||
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
|
||||
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
|
||||
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
|
||||
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,
|
||||
ShardIndexingPressureSettings.SHARD_MIN_LIMIT,
|
||||
ShardIndexingPressureStore.MAX_COLD_STORE_SIZE)));
|
||||
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
|
||||
|
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Copyright OpenSearch Contributors.
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package org.opensearch.index;
|
||||
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Setting;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.BooleanSupplier;
|
||||
|
||||
import static java.util.Objects.isNull;
|
||||
|
||||
/**
|
||||
* Shard indexing pressure store acts as a central repository for all the shard-level tracker objects currently being
|
||||
* used at the Node level, for tracking indexing pressure requests.
|
||||
* Store manages the tracker lifecycle, from creation, access, until it is evicted to be collected.
|
||||
*
|
||||
* Trackers are maintained at two levels for access simplicity and better memory management:
|
||||
*
|
||||
* 1. shardIndexingPressureHotStore : As the name suggests, it is hot store for tracker objects which are currently live i.e. being used
|
||||
* to track an ongoing request.
|
||||
*
|
||||
* 2. shardIndexingPressureColdStore : This acts as the store for all the shard tracking objects which are currently being used
|
||||
* by the framework. In addition to hot trackers, the recently used trackers which are although not currently live, but again can be used
|
||||
* in near future, are also part of this store. To limit any memory implications, this store has an upper limit on the maximum number of
|
||||
* trackers its can hold at any given time, which is a configurable dynamic setting.
|
||||
*
|
||||
* Tracking objects when created are part of both the hot store as well as cold store. However, once the object
|
||||
* is no more live it is removed from the hot store. Objects in the cold store are evicted once the cold store
|
||||
* reaches its maximum limit. Think of it like a periodic purge when upper limit is hit.
|
||||
* During get if tracking object is not present in the hot store, a lookup is made into the cache store. If found,
|
||||
* object is brought into the hot store again, until it remains active. If not present in the either store, a fresh
|
||||
* object is instantiated and registered in both the stores for concurrent accesses.
|
||||
*
|
||||
* Note: The implementation of shardIndexingPressureColdStore methods is such that get,
|
||||
* update and evict operations can be abstracted out to support any other strategy such as LRU, if
|
||||
* discovered a need later.
|
||||
*
|
||||
*/
|
||||
public class ShardIndexingPressureStore {
|
||||
|
||||
// This represents the maximum value for the cold store size.
|
||||
public static final Setting<Integer> MAX_COLD_STORE_SIZE =
|
||||
Setting.intSetting("shard_indexing_pressure.cache_store.max_size", 200, 100, 1000,
|
||||
Setting.Property.NodeScope, Setting.Property.Dynamic);
|
||||
|
||||
private final Map<ShardId, ShardIndexingPressureTracker> shardIndexingPressureHotStore =
|
||||
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
private final Map<ShardId, ShardIndexingPressureTracker> shardIndexingPressureColdStore = new HashMap<>();
|
||||
private final ShardIndexingPressureSettings shardIndexingPressureSettings;
|
||||
|
||||
private volatile int maxColdStoreSize;
|
||||
|
||||
public ShardIndexingPressureStore(ShardIndexingPressureSettings shardIndexingPressureSettings,
|
||||
ClusterSettings clusterSettings, Settings settings) {
|
||||
this.shardIndexingPressureSettings = shardIndexingPressureSettings;
|
||||
this.maxColdStoreSize = MAX_COLD_STORE_SIZE.get(settings).intValue();
|
||||
clusterSettings.addSettingsUpdateConsumer(MAX_COLD_STORE_SIZE, this::setMaxColdStoreSize);
|
||||
}
|
||||
|
||||
public ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) {
|
||||
ShardIndexingPressureTracker tracker = shardIndexingPressureHotStore.get(shardId);
|
||||
if (isNull(tracker)) {
|
||||
// Attempt from Indexing pressure cold store
|
||||
tracker = shardIndexingPressureColdStore.get(shardId);
|
||||
// If not already present in cold store instantiate a new one
|
||||
if (isNull(tracker)) {
|
||||
tracker = shardIndexingPressureHotStore.computeIfAbsent(shardId, (k) ->
|
||||
new ShardIndexingPressureTracker(shardId,
|
||||
this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(),
|
||||
this.shardIndexingPressureSettings.getShardReplicaBaseLimits())
|
||||
);
|
||||
// Write through into the cold store for future reference
|
||||
updateShardIndexingPressureColdStore(tracker);
|
||||
} else {
|
||||
// Attempt update tracker to the hot store and return the tracker which finally made to the hot store to avoid any race
|
||||
ShardIndexingPressureTracker newTracker = shardIndexingPressureHotStore.putIfAbsent(shardId, tracker);
|
||||
tracker = newTracker == null ? tracker : newTracker;
|
||||
}
|
||||
}
|
||||
return tracker;
|
||||
}
|
||||
|
||||
public Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureHotStore() {
|
||||
return Collections.unmodifiableMap(shardIndexingPressureHotStore);
|
||||
}
|
||||
|
||||
public Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureColdStore() {
|
||||
return Collections.unmodifiableMap(shardIndexingPressureColdStore);
|
||||
}
|
||||
|
||||
public void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, BooleanSupplier condition) {
|
||||
if (condition.getAsBoolean()) {
|
||||
// Try inserting into cold store again in case there was an eviction triggered
|
||||
shardIndexingPressureColdStore.putIfAbsent(tracker.getShardId(), tracker);
|
||||
// Remove from the hot store
|
||||
shardIndexingPressureHotStore.remove(tracker.getShardId(), tracker);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used to update the reference of tracker in cold store, to be re-used later of tracker is removed from hot store upon request
|
||||
* completion. When the cold store size reaches maximum, all the tracker objects in cold store are flushed. Flush is a less frequent
|
||||
* (periodic) operation, can be sized based on workload. It is okay to not to synchronize counters being flushed, as
|
||||
* objects in the cold store are only empty references, and can be re-initialized if needed.
|
||||
*/
|
||||
private void updateShardIndexingPressureColdStore(ShardIndexingPressureTracker tracker) {
|
||||
if (shardIndexingPressureColdStore.size() > maxColdStoreSize) {
|
||||
shardIndexingPressureColdStore.clear();
|
||||
}
|
||||
shardIndexingPressureColdStore.put(tracker.getShardId(), tracker);
|
||||
}
|
||||
|
||||
private void setMaxColdStoreSize(int maxColdStoreSize) {
|
||||
this.maxColdStoreSize = maxColdStoreSize;
|
||||
}
|
||||
}
|
@ -0,0 +1,190 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.index;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.opensearch.cluster.service.ClusterService;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.test.OpenSearchTestCase;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class ShardIndexingPressureStoreTests extends OpenSearchTestCase {
|
||||
|
||||
private final Settings settings = Settings.builder()
|
||||
.put(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.getKey(), 200)
|
||||
.build();
|
||||
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
private final ShardIndexingPressureSettings shardIndexingPressureSettings =
|
||||
new ShardIndexingPressureSettings(new ClusterService(settings, clusterSettings, null), settings,
|
||||
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());
|
||||
private ShardIndexingPressureStore store;
|
||||
private ShardId testShardId;
|
||||
|
||||
@Before
|
||||
public void beforeTest() {
|
||||
store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings);
|
||||
testShardId = new ShardId("index", "uuid", 0);
|
||||
}
|
||||
|
||||
public void testShardIndexingPressureStoreGet() {
|
||||
ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId);
|
||||
ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(testShardId);
|
||||
assertEquals(tracker1, tracker2);
|
||||
}
|
||||
|
||||
public void testGetVerifyTrackerInHotStore() {
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
|
||||
|
||||
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(1, hostStoreTrackers.size());
|
||||
ShardIndexingPressureTracker hotStoreTracker = hostStoreTrackers.get(testShardId);
|
||||
assertEquals(tracker, hotStoreTracker);
|
||||
}
|
||||
|
||||
public void testTrackerCleanupFromHotStore() {
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
|
||||
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(1, hostStoreTrackers.size());
|
||||
|
||||
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
|
||||
|
||||
hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(0, hostStoreTrackers.size());
|
||||
|
||||
Map<ShardId, ShardIndexingPressureTracker> coldStoreTrackers = store.getShardIndexingPressureColdStore();
|
||||
assertEquals(1, coldStoreTrackers.size());
|
||||
ShardIndexingPressureTracker coldStoreTracker = coldStoreTrackers.get(testShardId);
|
||||
assertEquals(tracker, coldStoreTracker);
|
||||
}
|
||||
|
||||
public void testTrackerCleanupSkippedFromHotStore() {
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
|
||||
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(1, hostStoreTrackers.size());
|
||||
|
||||
store.tryTrackerCleanupFromHotStore(tracker, () -> false);
|
||||
|
||||
hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(1, hostStoreTrackers.size());
|
||||
ShardIndexingPressureTracker coldStoreTracker = hostStoreTrackers.get(testShardId);
|
||||
assertEquals(tracker, coldStoreTracker);
|
||||
}
|
||||
|
||||
public void testTrackerRestoredToHotStorePostCleanup() {
|
||||
ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId);
|
||||
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(1, hostStoreTrackers.size());
|
||||
|
||||
store.tryTrackerCleanupFromHotStore(tracker1, () -> true);
|
||||
|
||||
hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(0, hostStoreTrackers.size());
|
||||
|
||||
ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(testShardId);
|
||||
hostStoreTrackers = store.getShardIndexingPressureHotStore();
|
||||
assertEquals(1, hostStoreTrackers.size());
|
||||
assertEquals(tracker1, tracker2);
|
||||
}
|
||||
|
||||
public void testTrackerEvictedFromColdStore() {
|
||||
for (int i = 0; i <= ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings); i++) {
|
||||
ShardId shardId = new ShardId("index", "uuid", i);
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId);
|
||||
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
|
||||
assertEquals(i + 1, store.getShardIndexingPressureColdStore().size());
|
||||
}
|
||||
|
||||
// Verify cold store size is maximum
|
||||
assertEquals(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings) + 1,
|
||||
store.getShardIndexingPressureColdStore().size());
|
||||
|
||||
// get and remove one more tracker object from hot store
|
||||
ShardId shardId = new ShardId("index", "uuid",
|
||||
ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings) + 1);
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId);
|
||||
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
|
||||
|
||||
// Verify all trackers objects purged from cold store except the last
|
||||
assertEquals(1, store.getShardIndexingPressureColdStore().size());
|
||||
assertEquals(tracker, store.getShardIndexingPressureColdStore().get(shardId));
|
||||
}
|
||||
|
||||
public void testShardIndexingPressureStoreConcurrentGet() throws Exception {
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
|
||||
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
|
||||
final Thread[] threads = new Thread[NUM_THREADS];
|
||||
final ShardIndexingPressureTracker[] trackers = new ShardIndexingPressureTracker[NUM_THREADS];
|
||||
for (int i = 0; i < NUM_THREADS; i++) {
|
||||
int counter = i;
|
||||
threads[i] = new Thread(() -> {
|
||||
trackers[counter] = store.getShardIndexingPressureTracker(testShardId);
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
for (int i = 0; i < NUM_THREADS; i++) {
|
||||
assertEquals(tracker, trackers[i]);
|
||||
}
|
||||
assertEquals(1, store.getShardIndexingPressureHotStore().size());
|
||||
assertEquals(1, store.getShardIndexingPressureColdStore().size());
|
||||
assertEquals(tracker, store.getShardIndexingPressureHotStore().get(testShardId));
|
||||
assertEquals(tracker, store.getShardIndexingPressureColdStore().get(testShardId));
|
||||
}
|
||||
|
||||
public void testShardIndexingPressureStoreConcurrentGetAndCleanup() throws Exception {
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
|
||||
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
|
||||
final Thread[] threads = new Thread[NUM_THREADS];
|
||||
for (int i = 0; i < NUM_THREADS; i++) {
|
||||
threads[i] = new Thread(() -> {
|
||||
ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId);
|
||||
assertEquals(tracker, tracker1);
|
||||
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
assertEquals(0, store.getShardIndexingPressureHotStore().size());
|
||||
assertEquals(1, store.getShardIndexingPressureColdStore().size());
|
||||
assertEquals(tracker, store.getShardIndexingPressureColdStore().get(testShardId));
|
||||
}
|
||||
|
||||
public void testTrackerConcurrentEvictionFromColdStore() throws Exception {
|
||||
int maxColdStoreSize = ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings);
|
||||
final int NUM_THREADS = scaledRandomIntBetween(maxColdStoreSize * 2, maxColdStoreSize * 8);
|
||||
final Thread[] threads = new Thread[NUM_THREADS];
|
||||
for (int i = 0; i < NUM_THREADS; i++) {
|
||||
int counter = i;
|
||||
threads[i] = new Thread(() -> {
|
||||
ShardId shardId = new ShardId("index", "uuid", counter);
|
||||
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId);
|
||||
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
assertEquals(0, store.getShardIndexingPressureHotStore().size());
|
||||
assertTrue(store.getShardIndexingPressureColdStore().size() <= maxColdStoreSize + 1);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user