mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
fix IndexingMemoryController to use existing Settings API support to parse % or byte size, and to enforce min/max values; don't try to share string constants for settings in tests
This commit is contained in:
parent
36e99c5f8d
commit
529087301a
core/src
main/java/org/elasticsearch
test/java/org/elasticsearch/indices
@ -523,9 +523,9 @@ public class Setting<T> extends ToXContentToBytes {
|
|||||||
return new Setting<>(key, defaultValue, (s) -> ByteSizeValue.parseBytesSizeValue(s, key), properties);
|
return new Setting<>(key, defaultValue, (s) -> ByteSizeValue.parseBytesSizeValue(s, key), properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Setting<ByteSizeValue> byteSizeSetting(String key, ByteSizeValue value, ByteSizeValue minValue, ByteSizeValue maxValue,
|
public static Setting<ByteSizeValue> byteSizeSetting(String key, ByteSizeValue defaultValue, ByteSizeValue minValue, ByteSizeValue maxValue,
|
||||||
Property... properties) {
|
Property... properties) {
|
||||||
return byteSizeSetting(key, (s) -> value.toString(), minValue, maxValue, properties);
|
return byteSizeSetting(key, (s) -> defaultValue.toString(), minValue, maxValue, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Setting<ByteSizeValue> byteSizeSetting(String key, Function<Settings, String> defaultValue,
|
public static Setting<ByteSizeValue> byteSizeSetting(String key, Function<Settings, String> defaultValue,
|
||||||
|
@ -52,37 +52,27 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
|
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
|
||||||
|
|
||||||
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
|
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
|
||||||
public static final Setting<String> INDEX_BUFFER_SIZE_SETTING = new Setting<String>("indices.memory.index_buffer_size", (s) -> "10%",
|
public static final Setting<ByteSizeValue> INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.index_buffer_size", "10%", Property.NodeScope);
|
||||||
(s) -> {
|
|
||||||
if (s.endsWith("%")) {
|
|
||||||
try {
|
|
||||||
Double.parseDouble(s.substring(0, s.length() - 1));
|
|
||||||
} catch (NumberFormatException nfe) {
|
|
||||||
throw new IllegalArgumentException("unknown value for [indices.memory.index_buffer_size]: must be X% or a size value (e.g. XMB) but was: " + s);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
ByteSizeValue.parseBytesSizeValue(s, "indices.memory.index_buffer_size");
|
|
||||||
} catch (Throwable t) {
|
|
||||||
throw new IllegalArgumentException("unknown value for [indices.memory.index_buffer_size]: must be X% or a size value (e.g. XMB) but " + t.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return s;
|
|
||||||
},
|
|
||||||
Property.NodeScope);
|
|
||||||
|
|
||||||
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
|
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
|
||||||
public static final Setting<ByteSizeValue> MIN_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.min_index_buffer_size", new ByteSizeValue(48, ByteSizeUnit.MB), Property.NodeScope);
|
public static final Setting<ByteSizeValue> MIN_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.min_index_buffer_size",
|
||||||
|
new ByteSizeValue(48, ByteSizeUnit.MB),
|
||||||
|
new ByteSizeValue(0, ByteSizeUnit.BYTES),
|
||||||
|
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
|
||||||
|
Property.NodeScope);
|
||||||
|
|
||||||
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
|
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
|
||||||
public static final Setting<ByteSizeValue> MAX_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.max_index_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
|
public static final Setting<ByteSizeValue> MAX_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.max_index_buffer_size",
|
||||||
|
new ByteSizeValue(-1),
|
||||||
|
new ByteSizeValue(-1),
|
||||||
|
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
|
||||||
|
Property.NodeScope);
|
||||||
|
|
||||||
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
|
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
|
||||||
public static final Setting<TimeValue> SHARD_INACTIVE_TIME_SETTING = Setting.timeSetting("indices.memory.shard_inactive_time", TimeValue.timeValueMinutes(5), Property.NodeScope);
|
public static final Setting<TimeValue> SHARD_INACTIVE_TIME_SETTING = Setting.positiveTimeSetting("indices.memory.shard_inactive_time", TimeValue.timeValueMinutes(5), Property.NodeScope);
|
||||||
|
|
||||||
/** How frequently we check indexing memory usage (default: 5 seconds). */
|
/** How frequently we check indexing memory usage (default: 5 seconds). */
|
||||||
public static final Setting<TimeValue> SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting.timeSetting("indices.memory.interval", TimeValue.timeValueSeconds(5), Property.NodeScope);
|
public static final Setting<TimeValue> SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting.positiveTimeSetting("indices.memory.interval", TimeValue.timeValueSeconds(5), Property.NodeScope);
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
@ -111,22 +101,20 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||||||
super(settings);
|
super(settings);
|
||||||
this.indexShards = indexServices;
|
this.indexShards = indexServices;
|
||||||
|
|
||||||
ByteSizeValue indexingBuffer;
|
ByteSizeValue indexingBuffer = INDEX_BUFFER_SIZE_SETTING.get(settings);
|
||||||
String indexingBufferSetting = INDEX_BUFFER_SIZE_SETTING.get(settings);
|
|
||||||
if (indexingBufferSetting.endsWith("%")) {
|
String indexingBufferSetting = settings.get(INDEX_BUFFER_SIZE_SETTING.getKey());
|
||||||
double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1));
|
// null means we used the default (10%)
|
||||||
indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
|
if (indexingBufferSetting == null || indexingBufferSetting.endsWith("%")) {
|
||||||
|
// We only apply the min/max when % value was used for the index buffer:
|
||||||
ByteSizeValue minIndexingBuffer = MIN_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
|
ByteSizeValue minIndexingBuffer = MIN_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
|
||||||
ByteSizeValue maxIndexingBuffer = MAX_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
|
ByteSizeValue maxIndexingBuffer = MAX_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
|
||||||
|
|
||||||
if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) {
|
if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) {
|
||||||
indexingBuffer = minIndexingBuffer;
|
indexingBuffer = minIndexingBuffer;
|
||||||
}
|
}
|
||||||
if (maxIndexingBuffer.bytes() != -1 && indexingBuffer.bytes() > maxIndexingBuffer.bytes()) {
|
if (maxIndexingBuffer.bytes() != -1 && indexingBuffer.bytes() > maxIndexingBuffer.bytes()) {
|
||||||
indexingBuffer = maxIndexingBuffer;
|
indexingBuffer = maxIndexingBuffer;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING.getKey());
|
|
||||||
}
|
}
|
||||||
this.indexingBuffer = indexingBuffer;
|
this.indexingBuffer = indexingBuffer;
|
||||||
|
|
||||||
|
@ -56,8 +56,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
@ -76,7 +74,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
|
|
||||||
public MockController(Settings settings) {
|
public MockController(Settings settings) {
|
||||||
super(Settings.builder()
|
super(Settings.builder()
|
||||||
.put(SHARD_MEMORY_INTERVAL_TIME_SETTING, "200h") // disable it
|
.put("indices.memory.interval", "200h") // disable it
|
||||||
.put(settings)
|
.put(settings)
|
||||||
.build(),
|
.build(),
|
||||||
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
||||||
@ -176,12 +174,12 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void testShardAdditionAndRemoval() {
|
public void testShardAdditionAndRemoval() {
|
||||||
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build());
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||||
|
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "4mb").build());
|
.put("indices.memory.index_buffer_size", "4mb").build());
|
||||||
IndexShard shard0 = test.getShard(0);
|
IndexShard shard0 = test.getShard(0);
|
||||||
controller.simulateIndexing(shard0);
|
controller.simulateIndexing(shard0);
|
||||||
controller.assertBuffer(shard0, 1);
|
controller.assertBuffer(shard0, 1);
|
||||||
@ -209,12 +207,12 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
|
|
||||||
public void testActiveInactive() {
|
public void testActiveInactive() {
|
||||||
|
|
||||||
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
createIndex("test", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build());
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||||
|
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "5mb")
|
.put("indices.memory.index_buffer_size", "5mb")
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
IndexShard shard0 = test.getShard(0);
|
IndexShard shard0 = test.getShard(0);
|
||||||
@ -248,27 +246,59 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
|
|
||||||
public void testMinBufferSizes() {
|
public void testMinBufferSizes() {
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "0.001%")
|
.put("indices.memory.index_buffer_size", "0.001%")
|
||||||
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING.getKey(), "6mb").build());
|
.put("indices.memory.min_index_buffer_size", "6mb").build());
|
||||||
|
|
||||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNegativeMinIndexBufferSize() {
|
||||||
|
Exception e = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> new MockController(Settings.builder()
|
||||||
|
.put("indices.memory.min_index_buffer_size", "-6mb").build()));
|
||||||
|
assertEquals("Failed to parse value [-6mb] for setting [indices.memory.min_index_buffer_size] must be >= 0b", e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNegativeInterval() {
|
||||||
|
Exception e = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> new MockController(Settings.builder()
|
||||||
|
.put("indices.memory.interval", "-42s").build()));
|
||||||
|
assertEquals("Failed to parse value [-42s] for setting [indices.memory.interval] must be >= 0s", e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNegativeShardInactiveTime() {
|
||||||
|
Exception e = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> new MockController(Settings.builder()
|
||||||
|
.put("indices.memory.shard_inactive_time", "-42s").build()));
|
||||||
|
assertEquals("Failed to parse value [-42s] for setting [indices.memory.shard_inactive_time] must be >= 0s", e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNegative() {
|
||||||
|
Exception e = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> new MockController(Settings.builder()
|
||||||
|
.put("indices.memory.max_index_buffer_size", "-6mb").build()));
|
||||||
|
assertEquals("Failed to parse value [-6mb] for setting [indices.memory.max_index_buffer_size] must be >= -1b", e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void testMaxBufferSizes() {
|
public void testMaxBufferSizes() {
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "90%")
|
.put("indices.memory.index_buffer_size", "90%")
|
||||||
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING.getKey(), "6mb").build());
|
.put("indices.memory.max_index_buffer_size", "6mb").build());
|
||||||
|
|
||||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThrottling() throws Exception {
|
public void testThrottling() throws Exception {
|
||||||
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
|
createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build());
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||||
|
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "4mb").build());
|
.put("indices.memory.index_buffer_size", "4mb").build());
|
||||||
IndexShard shard0 = test.getShard(0);
|
IndexShard shard0 = test.getShard(0);
|
||||||
IndexShard shard1 = test.getShard(1);
|
IndexShard shard1 = test.getShard(1);
|
||||||
IndexShard shard2 = test.getShard(2);
|
IndexShard shard2 = test.getShard(2);
|
||||||
@ -326,8 +356,8 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
// #10312
|
// #10312
|
||||||
public void testDeletesAloneCanTriggerRefresh() throws Exception {
|
public void testDeletesAloneCanTriggerRefresh() throws Exception {
|
||||||
createIndex("index",
|
createIndex("index",
|
||||||
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
|
Settings.builder().put("index.number_of_shards", 1)
|
||||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
.put("index.number_of_replicas", 0)
|
||||||
.put("index.refresh_interval", -1)
|
.put("index.refresh_interval", -1)
|
||||||
.build());
|
.build());
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
@ -347,7 +377,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
assertNoFailures(r);
|
assertNoFailures(r);
|
||||||
|
|
||||||
// Make a shell of an IMC to check up on indexing buffer usage:
|
// Make a shell of an IMC to check up on indexing buffer usage:
|
||||||
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "1kb").build();
|
Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "1kb").build();
|
||||||
|
|
||||||
// TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
|
// TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
|
||||||
IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
|
IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
|
||||||
@ -408,7 +438,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
|||||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
|
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
|
||||||
shard.close("simon says", false);
|
shard.close("simon says", false);
|
||||||
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
|
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
|
||||||
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "50kb").build();
|
Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build();
|
||||||
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
|
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
|
||||||
: Collections.singleton(shardRef.get()).iterator();
|
: Collections.singleton(shardRef.get()).iterator();
|
||||||
AtomicInteger flushes = new AtomicInteger();
|
AtomicInteger flushes = new AtomicInteger();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user