switch to registered Settings for all IndexingMemoryController settings
This commit is contained in:
parent
d863cbaa07
commit
36e99c5f8d
|
@ -65,6 +65,7 @@ import org.elasticsearch.http.HttpTransportSettings;
|
||||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||||
|
import org.elasticsearch.indices.IndexingMemoryController;
|
||||||
import org.elasticsearch.indices.IndicesQueryCache;
|
import org.elasticsearch.indices.IndicesQueryCache;
|
||||||
import org.elasticsearch.indices.IndicesRequestCache;
|
import org.elasticsearch.indices.IndicesRequestCache;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
@ -404,6 +405,11 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING,
|
BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING,
|
||||||
BootstrapSettings.MLOCKALL_SETTING,
|
BootstrapSettings.MLOCKALL_SETTING,
|
||||||
BootstrapSettings.SECCOMP_SETTING,
|
BootstrapSettings.SECCOMP_SETTING,
|
||||||
BootstrapSettings.CTRLHANDLER_SETTING
|
BootstrapSettings.CTRLHANDLER_SETTING,
|
||||||
|
IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING,
|
||||||
|
IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING,
|
||||||
|
IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING,
|
||||||
|
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING,
|
||||||
|
IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1404,7 +1404,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
return new EngineConfig(openMode, shardId,
|
return new EngineConfig(openMode, shardId,
|
||||||
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
|
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
|
||||||
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
|
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
|
||||||
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
|
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Releasable acquirePrimaryOperationLock() {
|
public Releasable acquirePrimaryOperationLock() {
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.indices;
|
package org.elasticsearch.indices;
|
||||||
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -50,22 +52,37 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
|
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
|
||||||
|
|
||||||
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
|
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
|
||||||
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
|
public static final Setting<String> INDEX_BUFFER_SIZE_SETTING = new Setting<String>("indices.memory.index_buffer_size", (s) -> "10%",
|
||||||
|
(s) -> {
|
||||||
|
if (s.endsWith("%")) {
|
||||||
|
try {
|
||||||
|
Double.parseDouble(s.substring(0, s.length() - 1));
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
throw new IllegalArgumentException("unknown value for [indices.memory.index_buffer_size]: must be X% or a size value (e.g. XMB) but was: " + s);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
ByteSizeValue.parseBytesSizeValue(s, "indices.memory.index_buffer_size");
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new IllegalArgumentException("unknown value for [indices.memory.index_buffer_size]: must be X% or a size value (e.g. XMB) but " + t.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s;
|
||||||
|
},
|
||||||
|
Property.NodeScope);
|
||||||
|
|
||||||
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
|
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
|
||||||
public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size";
|
public static final Setting<ByteSizeValue> MIN_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.min_index_buffer_size", new ByteSizeValue(48, ByteSizeUnit.MB), Property.NodeScope);
|
||||||
|
|
||||||
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
|
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
|
||||||
public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size";
|
public static final Setting<ByteSizeValue> MAX_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.max_index_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
|
||||||
|
|
||||||
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
|
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
|
||||||
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
|
public static final Setting<TimeValue> SHARD_INACTIVE_TIME_SETTING = Setting.timeSetting("indices.memory.shard_inactive_time", TimeValue.timeValueMinutes(5), Property.NodeScope);
|
||||||
|
|
||||||
/** Default value (5 minutes) for indices.memory.shard_inactive_time */
|
|
||||||
public static final TimeValue SHARD_DEFAULT_INACTIVE_TIME = TimeValue.timeValueMinutes(5);
|
|
||||||
|
|
||||||
/** How frequently we check indexing memory usage (default: 5 seconds). */
|
/** How frequently we check indexing memory usage (default: 5 seconds). */
|
||||||
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
|
public static final Setting<TimeValue> SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting.timeSetting("indices.memory.interval", TimeValue.timeValueSeconds(5), Property.NodeScope);
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
@ -95,34 +112,34 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
||||||
this.indexShards = indexServices;
|
this.indexShards = indexServices;
|
||||||
|
|
||||||
ByteSizeValue indexingBuffer;
|
ByteSizeValue indexingBuffer;
|
||||||
String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%");
|
String indexingBufferSetting = INDEX_BUFFER_SIZE_SETTING.get(settings);
|
||||||
if (indexingBufferSetting.endsWith("%")) {
|
if (indexingBufferSetting.endsWith("%")) {
|
||||||
double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1));
|
double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1));
|
||||||
indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
|
indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
|
||||||
ByteSizeValue minIndexingBuffer = this.settings.getAsBytesSize(MIN_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(48, ByteSizeUnit.MB));
|
ByteSizeValue minIndexingBuffer = MIN_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
|
||||||
ByteSizeValue maxIndexingBuffer = this.settings.getAsBytesSize(MAX_INDEX_BUFFER_SIZE_SETTING, null);
|
ByteSizeValue maxIndexingBuffer = MAX_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
|
||||||
|
|
||||||
if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) {
|
if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) {
|
||||||
indexingBuffer = minIndexingBuffer;
|
indexingBuffer = minIndexingBuffer;
|
||||||
}
|
}
|
||||||
if (maxIndexingBuffer != null && indexingBuffer.bytes() > maxIndexingBuffer.bytes()) {
|
if (maxIndexingBuffer.bytes() != -1 && indexingBuffer.bytes() > maxIndexingBuffer.bytes()) {
|
||||||
indexingBuffer = maxIndexingBuffer;
|
indexingBuffer = maxIndexingBuffer;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING);
|
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING.getKey());
|
||||||
}
|
}
|
||||||
this.indexingBuffer = indexingBuffer;
|
this.indexingBuffer = indexingBuffer;
|
||||||
|
|
||||||
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, SHARD_DEFAULT_INACTIVE_TIME);
|
this.inactiveTime = SHARD_INACTIVE_TIME_SETTING.get(this.settings);
|
||||||
// we need to have this relatively small to free up heap quickly enough
|
// we need to have this relatively small to free up heap quickly enough
|
||||||
this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5));
|
this.interval = SHARD_MEMORY_INTERVAL_TIME_SETTING.get(this.settings);
|
||||||
|
|
||||||
this.statusChecker = new ShardsIndicesStatusChecker();
|
this.statusChecker = new ShardsIndicesStatusChecker();
|
||||||
|
|
||||||
logger.debug("using indexing buffer size [{}] with {} [{}], {} [{}]",
|
logger.debug("using indexing buffer size [{}] with {} [{}], {} [{}]",
|
||||||
this.indexingBuffer,
|
this.indexingBuffer,
|
||||||
SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
|
SHARD_INACTIVE_TIME_SETTING.getKey(), this.inactiveTime,
|
||||||
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
|
SHARD_MEMORY_INTERVAL_TIME_SETTING.getKey(), this.interval);
|
||||||
this.scheduler = scheduleTask(threadPool);
|
this.scheduler = scheduleTask(threadPool);
|
||||||
|
|
||||||
// Need to save this so we can later launch async "write indexing buffer to disk" on shards:
|
// Need to save this so we can later launch async "write indexing buffer to disk" on shards:
|
||||||
|
|
|
@ -181,7 +181,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||||
|
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
|
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "4mb").build());
|
||||||
IndexShard shard0 = test.getShard(0);
|
IndexShard shard0 = test.getShard(0);
|
||||||
controller.simulateIndexing(shard0);
|
controller.simulateIndexing(shard0);
|
||||||
controller.assertBuffer(shard0, 1);
|
controller.assertBuffer(shard0, 1);
|
||||||
|
@ -214,7 +214,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||||
|
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "5mb")
|
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "5mb")
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
IndexShard shard0 = test.getShard(0);
|
IndexShard shard0 = test.getShard(0);
|
||||||
|
@ -248,16 +248,16 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
public void testMinBufferSizes() {
|
public void testMinBufferSizes() {
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
|
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "0.001%")
|
||||||
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
|
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING.getKey(), "6mb").build());
|
||||||
|
|
||||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMaxBufferSizes() {
|
public void testMaxBufferSizes() {
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
|
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "90%")
|
||||||
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
|
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING.getKey(), "6mb").build());
|
||||||
|
|
||||||
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
|
||||||
}
|
}
|
||||||
|
@ -268,7 +268,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||||
|
|
||||||
MockController controller = new MockController(Settings.builder()
|
MockController controller = new MockController(Settings.builder()
|
||||||
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
|
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "4mb").build());
|
||||||
IndexShard shard0 = test.getShard(0);
|
IndexShard shard0 = test.getShard(0);
|
||||||
IndexShard shard1 = test.getShard(1);
|
IndexShard shard1 = test.getShard(1);
|
||||||
IndexShard shard2 = test.getShard(2);
|
IndexShard shard2 = test.getShard(2);
|
||||||
|
@ -347,7 +347,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||||
assertNoFailures(r);
|
assertNoFailures(r);
|
||||||
|
|
||||||
// Make a shell of an IMC to check up on indexing buffer usage:
|
// Make a shell of an IMC to check up on indexing buffer usage:
|
||||||
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
|
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "1kb").build();
|
||||||
|
|
||||||
// TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
|
// TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
|
||||||
IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
|
IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
|
||||||
|
@ -408,7 +408,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
|
||||||
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
|
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
|
||||||
shard.close("simon says", false);
|
shard.close("simon says", false);
|
||||||
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
|
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
|
||||||
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50kb").build();
|
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING.getKey(), "50kb").build();
|
||||||
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
|
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
|
||||||
: Collections.singleton(shardRef.get()).iterator();
|
: Collections.singleton(shardRef.get()).iterator();
|
||||||
AtomicInteger flushes = new AtomicInteger();
|
AtomicInteger flushes = new AtomicInteger();
|
||||||
|
|
Loading…
Reference in New Issue