Merge pull request #13548 from mikemccand/indexing_memory_controller

Improve IndexingMemoryController a bit:
      - promptly push indexing buffer changes to IndexWriter, instead of waiting for next refresh/flush
      - don't wait for merges to finish before dropping a shards's indexing buffer to 512 KB once it's inactive
      - fix NPE if indices.memory.index_buffer_size is in node's settings with a bytes (not %) unit
      - add some more logger.debug
This commit is contained in:
Michael McCandless 2015-09-15 10:39:10 -04:00
commit 028acdd06f
7 changed files with 173 additions and 81 deletions

View File

@ -94,8 +94,11 @@ public final class EngineConfig {
public static final String INDEX_GC_DELETES_SETTING = "index.gc_deletes";
/**
* Index setting to control the initial index buffer size.
* This setting is <b>not</b> realtime updateable.
* Index setting to control the initial index buffer size. NOTE: this setting is somewhat
* useless, since IndexingMemoryController will take over quickly and partition the
* indices.memory.index_buffer_size for this node across all shards.
*
* <p>This setting is <b>not</b> realtime updateable.
*/
public static final String INDEX_BUFFER_SIZE_SETTING = "index.buffer_size";

View File

@ -666,7 +666,6 @@ public class InternalEngine extends Engine {
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
updateIndexWriterSettings();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
ensureOpen();
@ -736,7 +735,6 @@ public class InternalEngine extends Engine {
*/
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
updateIndexWriterSettings();
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing) {
@ -954,7 +952,6 @@ public class InternalEngine extends Engine {
}
}
/**
* Closes the engine without acquiring the write lock. This should only be
* called while the write lock is hold or in a disaster condition ie. if the engine
@ -1168,8 +1165,6 @@ public class InternalEngine extends Engine {
return indexWriter.getConfig();
}
private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
@ -1245,11 +1240,14 @@ public class InternalEngine extends Engine {
public void onSettingsChanged() {
mergeScheduler.refreshConfig();
updateIndexWriterSettings();
// config().getVersionMapSize() may have changed:
checkVersionMapRefresh();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones();
}
public MergeStats getMergeStats() {
return mergeScheduler.stats();
}
}

View File

@ -115,9 +115,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class IndexShard extends AbstractIndexShardComponent {
private final ThreadPool threadPool;
@ -985,15 +982,27 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final EngineConfig config = engineConfig;
final ByteSizeValue preValue = config.getIndexingBufferSize();
config.setIndexingBufferSize(shardIndexingBufferSize);
Engine engine = engineUnsafe();
if (engine == null) {
logger.debug("updateBufferSize: engine is closed; skipping");
return;
}
// update engine if it is already started.
if (preValue.bytes() != shardIndexingBufferSize.bytes() && engineUnsafe() != null) {
// its inactive, make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER && preValue != EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// so we push changes these changes down to IndexWriter:
engine.onSettingsChanged();
if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
// it's inactive: make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize);
try {
refresh("update index buffer");
@ -1004,10 +1013,8 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);
}
}
Engine engine = engineUnsafe();
if (engine != null) {
engine.getTranslog().updateBuffer(shardTranslogBufferSize);
}
engine.getTranslog().updateBuffer(shardTranslogBufferSize);
}
public void markAsInactive() {
@ -1129,7 +1136,7 @@ public class IndexShard extends AbstractIndexShardComponent {
searchService.onRefreshSettings(settings);
indexingService.onRefreshSettings(settings);
if (change) {
refresh("apply settings");
engine().onSettingsChanged();
}
}
}
@ -1267,6 +1274,8 @@ public class IndexShard extends AbstractIndexShardComponent {
return engine;
}
/** NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed. */
protected Engine engineUnsafe() {
return this.currentEngineReference.get();
}

View File

@ -51,6 +51,42 @@ import java.util.concurrent.ScheduledFuture;
*/
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size";
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size";
/** Sets a floor on the per-shard index buffer size (default: 4 MB). */
public static final String MIN_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_shard_index_buffer_size";
/** Sets a ceiling on the per-shard index buffer size (default: 512 MB). */
public static final String MAX_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_shard_index_buffer_size";
/** How much heap (% or bytes) we will share across all actively indexing shards for the translog buffer (default: 1%). */
public static final String TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.translog_buffer_size";
/** Only applies when <code>indices.memory.translog_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 256 KB). */
public static final String MIN_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_translog_buffer_size";
/** Only applies when <code>indices.memory.translog_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
public static final String MAX_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_translog_buffer_size";
/** Sets a floor on the per-shard translog buffer size (default: 2 KB). */
public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size";
/** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */
public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size";
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
/** How frequently we check shards to find inactive ones (default: 30 seconds). */
public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval";
private final ThreadPool threadPool;
private final IndicesService indicesService;
@ -77,12 +113,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
this.indicesService = indicesService;
ByteSizeValue indexingBuffer;
String indexingBufferSetting = this.settings.get("indices.memory.index_buffer_size", "10%");
String indexingBufferSetting = this.settings.get(INDEX_BUFFER_SIZE_SETTING, "10%");
if (indexingBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1));
indexingBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) * (percent / 100)));
ByteSizeValue minIndexingBuffer = this.settings.getAsBytesSize("indices.memory.min_index_buffer_size", new ByteSizeValue(48, ByteSizeUnit.MB));
ByteSizeValue maxIndexingBuffer = this.settings.getAsBytesSize("indices.memory.max_index_buffer_size", null);
ByteSizeValue minIndexingBuffer = this.settings.getAsBytesSize(MIN_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(48, ByteSizeUnit.MB));
ByteSizeValue maxIndexingBuffer = this.settings.getAsBytesSize(MAX_INDEX_BUFFER_SIZE_SETTING, null);
if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) {
indexingBuffer = minIndexingBuffer;
@ -91,20 +127,20 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
indexingBuffer = maxIndexingBuffer;
}
} else {
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, null);
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, INDEX_BUFFER_SIZE_SETTING);
}
this.indexingBuffer = indexingBuffer;
this.minShardIndexBufferSize = this.settings.getAsBytesSize("indices.memory.min_shard_index_buffer_size", new ByteSizeValue(4, ByteSizeUnit.MB));
this.minShardIndexBufferSize = this.settings.getAsBytesSize(MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(4, ByteSizeUnit.MB));
// LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense: https://issues.apache.org/jira/browse/LUCENE-2324?focusedCommentId=13005155&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13005155
this.maxShardIndexBufferSize = this.settings.getAsBytesSize("indices.memory.max_shard_index_buffer_size", new ByteSizeValue(512, ByteSizeUnit.MB));
this.maxShardIndexBufferSize = this.settings.getAsBytesSize(MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(512, ByteSizeUnit.MB));
ByteSizeValue translogBuffer;
String translogBufferSetting = this.settings.get("indices.memory.translog_buffer_size", "1%");
String translogBufferSetting = this.settings.get(TRANSLOG_BUFFER_SIZE_SETTING, "1%");
if (translogBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(translogBufferSetting.substring(0, translogBufferSetting.length() - 1));
translogBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().getMem().getHeapMax().bytes()) * (percent / 100)));
ByteSizeValue minTranslogBuffer = this.settings.getAsBytesSize("indices.memory.min_translog_buffer_size", new ByteSizeValue(256, ByteSizeUnit.KB));
ByteSizeValue maxTranslogBuffer = this.settings.getAsBytesSize("indices.memory.max_translog_buffer_size", null);
ByteSizeValue minTranslogBuffer = this.settings.getAsBytesSize(MIN_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.KB));
ByteSizeValue maxTranslogBuffer = this.settings.getAsBytesSize(MAX_TRANSLOG_BUFFER_SIZE_SETTING, null);
if (translogBuffer.bytes() < minTranslogBuffer.bytes()) {
translogBuffer = minTranslogBuffer;
@ -116,15 +152,19 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, null);
}
this.translogBuffer = translogBuffer;
this.minShardTranslogBufferSize = this.settings.getAsBytesSize("indices.memory.min_shard_translog_buffer_size", new ByteSizeValue(2, ByteSizeUnit.KB));
this.maxShardTranslogBufferSize = this.settings.getAsBytesSize("indices.memory.max_shard_translog_buffer_size", new ByteSizeValue(64, ByteSizeUnit.KB));
this.minShardTranslogBufferSize = this.settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB));
this.maxShardTranslogBufferSize = this.settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB));
this.inactiveTime = this.settings.getAsTime("indices.memory.shard_inactive_time", TimeValue.timeValueMinutes(5));
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
// we need to have this relatively small to move a shard from inactive to active fast (enough)
this.interval = this.settings.getAsTime("indices.memory.interval", TimeValue.timeValueSeconds(30));
logger.debug("using index_buffer_size [{}], with min_shard_index_buffer_size [{}], max_shard_index_buffer_size [{}], shard_inactive_time [{}]", this.indexingBuffer, this.minShardIndexBufferSize, this.maxShardIndexBufferSize, this.inactiveTime);
this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30));
logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]",
this.indexingBuffer,
MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize,
SHARD_INACTIVE_TIME_SETTING, this.inactiveTime,
SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval);
}
@Override
@ -155,12 +195,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final Map<ShardId, ShardIndexingStatus> shardsIndicesStatus = new HashMap<>();
@Override
public void run() {
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);
changes.addAll(purgeDeletedAndClosedShards());
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();
final List<IndexShard> activeToInactiveIndexingShards = new ArrayList<>();
final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards);
@ -170,11 +207,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
indexShard.markAsInactive();
} catch (EngineClosedException e) {
// ignore
logger.trace("ignore EngineClosedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id());
} catch (FlushNotAllowedEngineException e) {
// ignore
logger.trace("ignore FlushNotAllowedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id());
}
}
if (!changes.isEmpty()) {
if (changes.isEmpty() == false) {
// Something changed: recompute indexing buffers:
calcAndSetShardBuffers(activeShards, "[" + changes + "]");
}
}
@ -190,23 +231,24 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
for (IndexShard indexShard : indexService) {
if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) {
// not ready to be updated yet.
// not ready to be updated yet
continue;
}
if (indexShard.canIndex() == false) {
// not relevant for memory related issues.
// shadow replica doesn't have an indexing buffer
continue;
}
final Translog translog;
try {
translog = indexShard.engine().getTranslog();
} catch (EngineClosedException e) {
// not ready yet to be checked for in activity
// not ready yet to be checked for activity
continue;
}
final long time = threadPool.estimatedTimeInMillis();
final long timeMS = threadPool.estimatedTimeInMillis();
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null) {
@ -214,21 +256,22 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
shardsIndicesStatus.put(indexShard.shardId(), status);
changes.add(ShardStatusChangeType.ADDED);
}
// check if it is deemed to be inactive (sam translogFileGeneration and numberOfOperations over a long period of time)
// consider shard inactive if it has same translogFileGeneration and no operations for a long time
if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == 0) {
if (status.time == -1) { // first time
status.time = time;
if (status.timeMS == -1) {
// first time we noticed the shard become idle
status.timeMS = timeMS;
}
// inactive?
if (status.activeIndexing) {
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.activeIndexing = false;
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
}
// mark it as inactive only if enough time has passed
if (status.activeIndexing && (timeMS - status.timeMS) > inactiveTime.millis()) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.activeIndexing = false;
changes.add(ShardStatusChangeType.BECAME_INACTIVE);
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]",
indexShard.shardId().index().name(), indexShard.shardId().id(),
inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
}
} else {
if (!status.activeIndexing) {
@ -236,10 +279,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
changes.add(ShardStatusChangeType.BECAME_ACTIVE);
logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id());
}
status.time = -1;
status.timeMS = -1;
}
status.translogId = translog.currentFileGeneration();
status.translogNumberOfOperations = translog.totalOperations();
if (status.activeIndexing) {
activeShards++;
@ -261,31 +303,28 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
while (statusShardIdIterator.hasNext()) {
ShardId statusShardId = statusShardIdIterator.next();
IndexService indexService = indicesService.indexService(statusShardId.getIndex());
boolean remove = false;
try {
if (indexService == null) {
remove = true;
continue;
}
boolean remove;
if (indexService == null) {
remove = true;
} else {
IndexShard indexShard = indexService.shard(statusShardId.id());
if (indexShard == null) {
remove = true;
continue;
}
remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state());
} finally {
if (remove) {
changes.add(ShardStatusChangeType.DELETED);
statusShardIdIterator.remove();
} else {
remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state());
}
}
if (remove) {
changes.add(ShardStatusChangeType.DELETED);
statusShardIdIterator.remove();
}
}
return changes;
}
private void calcAndSetShardBuffers(int activeShards, String reason) {
if (activeShards == 0) {
logger.debug("no active shards (reason={})", reason);
return;
}
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / activeShards);
@ -335,11 +374,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
}
static class ShardIndexingStatus {
private static class ShardIndexingStatus {
long translogId = -1;
int translogNumberOfOperations = -1;
boolean activeIndexing = true;
long time = -1; // contains the first time we saw this shard with no operations done on it
long timeMS = -1; // contains the first time we saw this shard with no operations done on it
}
}

View File

@ -315,6 +315,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engine.config().setCompoundOnFlush(false);
engine.onSettingsChanged();
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
engine.create(new Engine.Create(newUid("3"), doc3));
@ -363,6 +364,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(segments.get(1).isCompound(), equalTo(false));
engine.config().setCompoundOnFlush(true);
engine.onSettingsChanged();
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
engine.create(new Engine.Create(newUid("4"), doc4));
engine.refresh("test");

View File

@ -338,6 +338,7 @@ public class ShadowEngineTests extends ESTestCase {
primaryEngine.config().setCompoundOnFlush(false);
primaryEngine.onSettingsChanged();
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
primaryEngine.create(new Engine.Create(newUid("3"), doc3));
@ -410,6 +411,8 @@ public class ShadowEngineTests extends ESTestCase {
replicaEngine.refresh("test");
primaryEngine.config().setCompoundOnFlush(true);
primaryEngine.onSettingsChanged();
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
primaryEngine.create(new Engine.Create(newUid("4"), doc4));
primaryEngine.refresh("test");

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
@ -78,7 +79,7 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
@Test
public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException {
createNode(Settings.builder().put("indices.memory.shard_inactive_time", "100ms").build());
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build());
prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
@ -109,6 +110,45 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
// Make sure we also pushed the tiny indexing buffer down to the underlying IndexWriter:
assertEquals(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(), getIWBufferSize("test1"));
}
private long getIWBufferSize(String indexName) {
return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes();
}
@Test
public void testIndexBufferSizeTwoShards() throws InterruptedException {
createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h",
IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb",
IndexShard.INDEX_REFRESH_INTERVAL, "-1").build());
// Create two active indices, sharing 32 MB indexing buffer:
prepareCreate("test3").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
prepareCreate("test4").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
index("test3", "type", "1", "f", 1);
index("test4", "type", "1", "f", 1);
// .. then make sure we really pushed the update (16 MB for each) down to the IndexWriter, even if refresh nor flush occurs:
if (awaitBusy(() -> getIWBufferSize("test3") == 16*1024*1024) == false) {
fail("failed to update shard indexing buffer size for test3 index to 16 MB; got: " + getIWBufferSize("test3"));
}
if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) {
fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4"));
}
}
@Test
public void testIndexBufferNotPercent() throws InterruptedException {
// #13487: Make sure you can specify non-percent sized index buffer and not hit NPE
createNode(Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb").build());
// ... and that it took:
assertEquals(32*1024*1024, internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes());
}
private void createNode(Settings settings) {
@ -120,7 +160,7 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase {
.put(EsExecutors.PROCESSORS, 1) // limit the number of threads created
.put("http.enabled", false)
.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true) // make sure we get what we set :)
.put("indices.memory.interval", "100ms")
.put(IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms")
.put(settings)
);
}