Merge pull request #17778 from mikemccand/imc_registered_settings

Switch to registered Settings for all IndexingMemoryController settings
This commit is contained in:
Michael McCandless 2016-04-15 13:14:12 -04:00
commit b55368b39d
5 changed files with 85 additions and 44 deletions

View File

@ -65,6 +65,7 @@ import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
@ -404,6 +405,11 @@ public final class ClusterSettings extends AbstractScopedSettings {
BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING,
BootstrapSettings.MLOCKALL_SETTING,
BootstrapSettings.SECCOMP_SETTING,
BootstrapSettings.CTRLHANDLER_SETTING
BootstrapSettings.CTRLHANDLER_SETTING,
IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING,
IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING,
IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING,
IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING
)));
}

View File

@ -523,9 +523,9 @@ public class Setting<T> extends ToXContentToBytes {
return new Setting<>(key, defaultValue, (s) -> ByteSizeValue.parseBytesSizeValue(s, key), properties);
}
public static Setting<ByteSizeValue> byteSizeSetting(String key, ByteSizeValue value, ByteSizeValue minValue, ByteSizeValue maxValue,
public static Setting<ByteSizeValue> byteSizeSetting(String key, ByteSizeValue defaultValue, ByteSizeValue minValue, ByteSizeValue maxValue,
Property... properties) {
return byteSizeSetting(key, (s) -> value.toString(), minValue, maxValue, properties);
return byteSizeSetting(key, (s) -> defaultValue.toString(), minValue, maxValue, properties);
}
public static Setting<ByteSizeValue> byteSizeSetting(String key, Function<Settings, String> defaultValue,

View File

@ -1404,7 +1404,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()));
}
public Releasable acquirePrimaryOperationLock() {

View File

@ -20,6 +20,8 @@
package org.elasticsearch.indices;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -50,22 +52,27 @@ import java.util.concurrent.locks.ReentrantLock;
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
public static final Setting<ByteSizeValue> INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.index_buffer_size", "10%", Property.NodeScope);
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size";
public static final Setting<ByteSizeValue> MIN_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.min_index_buffer_size",
new ByteSizeValue(48, ByteSizeUnit.MB),
new ByteSizeValue(0, ByteSizeUnit.BYTES),
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
Property.NodeScope);
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size";
public static final Setting<ByteSizeValue> MAX_INDEX_BUFFER_SIZE_SETTING = Setting.byteSizeSetting("indices.memory.max_index_buffer_size",
new ByteSizeValue(-1),
new ByteSizeValue(-1),
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
Property.NodeScope);
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
/** Default value (5 minutes) for indices.memory.shard_inactive_time */
public static final TimeValue SHARD_DEFAULT_INACTIVE_TIME = TimeValue.timeValueMinutes(5);
public static final Setting<TimeValue> SHARD_INACTIVE_TIME_SETTING = Setting.positiveTimeSetting("indices.memory.shard_inactive_time", TimeValue.timeValueMinutes(5), Property.NodeScope);
/** How frequently we check indexing memory usage (default: 5 seconds). */
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
public static final Setting<TimeValue> SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting.positiveTimeSetting("indices.memory.interval", TimeValue.timeValueSeconds(5), Property.NodeScope);
private final ThreadPool threadPool;
@ -94,35 +101,33 @@ public class IndexingMemoryController extends AbstractComponent implements Index
super(settings);
this.indexShards = indexServices;
ByteSizeValue indexingBuffer;
String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%");
if (indexingBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1));
indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100)));
ByteSizeValue minIndexingBuffer = this.settings.getAsBytesSize(MIN_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(48, ByteSizeUnit.MB));
ByteSizeValue maxIndexingBuffer = this.settings.getAsBytesSize(MAX_INDEX_BUFFER_SIZE_SETTING, null);
ByteSizeValue indexingBuffer = INDEX_BUFFER_SIZE_SETTING.get(settings);
String indexingBufferSetting = settings.get(INDEX_BUFFER_SIZE_SETTING.getKey());
// null means we used the default (10%)
if (indexingBufferSetting == null || indexingBufferSetting.endsWith("%")) {
// We only apply the min/max when % value was used for the index buffer:
ByteSizeValue minIndexingBuffer = MIN_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
ByteSizeValue maxIndexingBuffer = MAX_INDEX_BUFFER_SIZE_SETTING.get(this.settings);
if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) {
indexingBuffer = minIndexingBuffer;
}
if (maxIndexingBuffer != null && indexingBuffer.bytes() > maxIndexingBuffer.bytes()) {
if (maxIndexingBuffer.bytes() != -1 && indexingBuffer.bytes() > maxIndexingBuffer.bytes()) {
indexingBuffer = maxIndexingBuffer;
}
} else {
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING);
}
this.indexingBuffer = indexingBuffer;
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, SHARD_DEFAULT_INACTIVE_TIME);
this.inactiveTime = SHARD_INACTIVE_TIME_SETTING.get(this.settings);
// we need to have this relatively small to free up heap quickly enough
this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5));
this.interval = SHARD_MEMORY_INTERVAL_TIME_SETTING.get(this.settings);
this.statusChecker = new ShardsIndicesStatusChecker();
logger.debug("using indexing buffer size [{}] with {} [{}], {} [{}]",
this.indexingBuffer,
SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
SHARD_INACTIVE_TIME_SETTING.getKey(), this.inactiveTime,
SHARD_MEMORY_INTERVAL_TIME_SETTING.getKey(), this.interval);
this.scheduler = scheduleTask(threadPool);
// Need to save this so we can later launch async "write indexing buffer to disk" on shards:

View File

@ -56,8 +56,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
@ -76,7 +74,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
public MockController(Settings settings) {
super(Settings.builder()
.put(SHARD_MEMORY_INTERVAL_TIME_SETTING, "200h") // disable it
.put("indices.memory.interval", "200h") // disable it
.put(settings)
.build(),
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
@ -176,12 +174,12 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
}
public void testShardAdditionAndRemoval() {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
.put("indices.memory.index_buffer_size", "4mb").build());
IndexShard shard0 = test.getShard(0);
controller.simulateIndexing(shard0);
controller.assertBuffer(shard0, 1);
@ -209,12 +207,12 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
public void testActiveInactive() {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
createIndex("test", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "5mb")
.put("indices.memory.index_buffer_size", "5mb")
.build());
IndexShard shard0 = test.getShard(0);
@ -248,27 +246,59 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
public void testMinBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%")
.put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
.put("indices.memory.index_buffer_size", "0.001%")
.put("indices.memory.min_index_buffer_size", "6mb").build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
}
public void testNegativeMinIndexBufferSize() {
Exception e = expectThrows(IllegalArgumentException.class,
() -> new MockController(Settings.builder()
.put("indices.memory.min_index_buffer_size", "-6mb").build()));
assertEquals("Failed to parse value [-6mb] for setting [indices.memory.min_index_buffer_size] must be >= 0b", e.getMessage());
}
public void testNegativeInterval() {
Exception e = expectThrows(IllegalArgumentException.class,
() -> new MockController(Settings.builder()
.put("indices.memory.interval", "-42s").build()));
assertEquals("Failed to parse value [-42s] for setting [indices.memory.interval] must be >= 0s", e.getMessage());
}
public void testNegativeShardInactiveTime() {
Exception e = expectThrows(IllegalArgumentException.class,
() -> new MockController(Settings.builder()
.put("indices.memory.shard_inactive_time", "-42s").build()));
assertEquals("Failed to parse value [-42s] for setting [indices.memory.shard_inactive_time] must be >= 0s", e.getMessage());
}
public void testNegativeMaxIndexBufferSize() {
Exception e = expectThrows(IllegalArgumentException.class,
() -> new MockController(Settings.builder()
.put("indices.memory.max_index_buffer_size", "-6mb").build()));
assertEquals("Failed to parse value [-6mb] for setting [indices.memory.max_index_buffer_size] must be >= -1b", e.getMessage());
}
public void testMaxBufferSizes() {
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%")
.put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build());
.put("indices.memory.index_buffer_size", "90%")
.put("indices.memory.max_index_buffer_size", "6mb").build());
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
}
public void testThrottling() throws Exception {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
.put("indices.memory.index_buffer_size", "4mb").build());
IndexShard shard0 = test.getShard(0);
IndexShard shard1 = test.getShard(1);
IndexShard shard2 = test.getShard(2);
@ -326,8 +356,8 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
// #10312
public void testDeletesAloneCanTriggerRefresh() throws Exception {
createIndex("index",
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", -1)
.build());
ensureGreen();
@ -347,7 +377,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
assertNoFailures(r);
// Make a shell of an IMC to check up on indexing buffer usage:
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "1kb").build();
// TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
@ -408,7 +438,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
shard.close("simon says", false);
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50kb").build();
Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build();
Iterable<IndexShard> iterable = () -> (shardRef.get() == null) ? Collections.<IndexShard>emptyList().iterator()
: Collections.singleton(shardRef.get()).iterator();
AtomicInteger flushes = new AtomicInteger();