diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index f54020802b5..8703ab2b10a 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.memory; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -48,7 +49,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponentindices.memory.index_buffer_size is a %, to set a floor on the actual size in bytes (default: 48 MB). */ public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size"; @@ -101,8 +102,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); + private final ShardsIndicesStatusChecker statusChecker; + @Inject public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { + this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); + } + + // for testing + protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { super(settings); this.threadPool = threadPool; this.indicesService = indicesService; @@ -111,7 +119,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() { + ArrayList list = new ArrayList<>(); + + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (shardAvailable(indexShard)) { + list.add(indexShard.shardId()); + } + } + } + return list; + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(ShardId shardId) { + return shardAvailable(getShard(shardId)); + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(@Nullable IndexShard shard) { + // shadow replica doesn't have an indexing buffer + return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state()); + } + + /** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */ + protected IndexShard getShard(ShardId shardId) { + IndexService indexService = indicesService.indexService(shardId.index().name()); + if (indexService != null) { + IndexShard indexShard = indexService.shard(shardId.id()); + return indexShard; + } + return null; + } + + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + final IndexShard shard = getShard(shardId); + if (shard != null) { + try { + shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize); + } catch (EngineClosedException e) { + // ignore + } catch (FlushNotAllowedEngineException e) { + // ignore + } catch (Exception e) { + logger.warn("failed to set shard {} index buffer to [{}]", shardId, shardIndexingBufferSize); + } + } + } + + + /** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */ + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + final IndexShard indexShard = getShard(shardId); + if (indexShard == null) { + return null; + } + final Translog translog; + try { + translog = indexShard.engine().getTranslog(); + } catch (EngineClosedException e) { + // not ready yet to be checked for activity + return null; + } + + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translog.currentFileGeneration(); + status.translogNumberOfOperations = translog.totalOperations(); + return status; + } + + // used for tests + void forceCheck() { + statusChecker.run(); + } + class ShardsIndicesStatusChecker implements Runnable { private final Map shardsIndicesStatus = new HashMap<>(); @@ -194,19 +289,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent changes = purgeDeletedAndClosedShards(); - final List activeToInactiveIndexingShards = new ArrayList<>(); + final List activeToInactiveIndexingShards = new ArrayList<>(); final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards); - for (IndexShard indexShard : activeToInactiveIndexingShards) { - // update inactive indexing buffer size - try { - indexShard.markAsInactive(); - } catch (EngineClosedException e) { - // ignore - logger.trace("ignore EngineClosedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id()); - } catch (FlushNotAllowedEngineException e) { - // ignore - logger.trace("ignore FlushNotAllowedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id()); - } + for (ShardId indexShard : activeToInactiveIndexingShards) { + markShardAsInactive(indexShard); } if (changes.isEmpty() == false) { @@ -220,70 +306,42 @@ public class IndexingMemoryController extends AbstractLifecycleComponent changes, List activeToInactiveIndexingShards) { + private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { int activeShards = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { + for (ShardId shardId : availableShards()) { - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) { - // not ready to be updated yet - continue; - } + final ShardIndexingStatus currentStatus = getTranslogStatus(shardId); - if (indexShard.canIndex() == false) { - // shadow replica doesn't have an indexing buffer - continue; - } + if (currentStatus == null) { + // shard was closed.. + continue; + } - final Translog translog; - try { - translog = indexShard.engine().getTranslog(); - } catch (EngineClosedException e) { - // not ready yet to be checked for activity - continue; - } - - final long timeMS = threadPool.estimatedTimeInMillis(); - - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null) { - status = new ShardIndexingStatus(); - shardsIndicesStatus.put(indexShard.shardId(), status); - changes.add(ShardStatusChangeType.ADDED); - } - - // consider shard inactive if it has same translogFileGeneration and no operations for a long time - if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == status.translogNumberOfOperations) { - if (status.timeMS == -1) { - // first time we noticed the shard become idle - status.timeMS = timeMS; - } - // mark it as inactive only if enough time has passed - if (status.activeIndexing && (timeMS - status.timeMS) > inactiveTime.millis()) { - // inactive for this amount of time, mark it - activeToInactiveIndexingShards.add(indexShard); - status.activeIndexing = false; - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", - indexShard.shardId().index().name(), indexShard.shardId().id(), - inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); - } - } else { - if (!status.activeIndexing) { - status.activeIndexing = true; - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); - } - status.timeMS = -1; - } - status.translogId = translog.currentFileGeneration(); - status.translogNumberOfOperations = translog.totalOperations(); - - if (status.activeIndexing) { - activeShards++; + ShardIndexingStatus status = shardsIndicesStatus.get(shardId); + if (status == null) { + status = currentStatus; + shardsIndicesStatus.put(shardId, status); + changes.add(ShardStatusChangeType.ADDED); + } else { + final boolean lastActiveIndexing = status.activeIndexing; + status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos()); + if (lastActiveIndexing && (status.activeIndexing == false)) { + activeToInactiveIndexingShards.add(shardId); + changes.add(ShardStatusChangeType.BECAME_INACTIVE); + logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", + shardId, + inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); + } else if ((lastActiveIndexing == false) && status.activeIndexing) { + changes.add(ShardStatusChangeType.BECAME_ACTIVE); + logger.debug("marking shard {} as active indexing wise", shardId); } } + + if (status.activeIndexing) { + activeShards++; + } } + return activeShards; } @@ -297,20 +355,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent statusShardIdIterator = shardsIndicesStatus.keySet().iterator(); while (statusShardIdIterator.hasNext()) { - ShardId statusShardId = statusShardIdIterator.next(); - IndexService indexService = indicesService.indexService(statusShardId.getIndex()); - boolean remove; - if (indexService == null) { - remove = true; - } else { - IndexShard indexShard = indexService.shard(statusShardId.id()); - if (indexShard == null) { - remove = true; - } else { - remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state()); - } - } - if (remove) { + ShardId shardId = statusShardIdIterator.next(); + if (shardAvailable(shardId) == false) { changes.add(ShardStatusChangeType.DELETED); statusShardIdIterator.remove(); } @@ -340,40 +386,80 @@ public class IndexingMemoryController extends AbstractLifecycleComponent inactiveNanoInterval) { + // shard is inactive. mark it as such. + activeIndexing = false; + } + } else if (activeIndexing == false // we weren't indexing before + && idle == false // but we do now + && current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow + ) { + // since we sync flush once a shard becomes inactive, the translog id can change, however that + // doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens + // immediately after an indexing operation we may not become active immediately. The following + // indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay + // inactive + + activeIndexing = true; + idleSinceNanoTime = -1; + } + + translogId = current.translogId; + translogNumberOfOperations = current.translogNumberOfOperations; + } } } diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java index 6c0f041b2b8..aaca771853b 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java @@ -22,115 +22,24 @@ package org.elasticsearch.indices.memory; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.test.ESIntegTestCase; import org.junit.Test; -import java.util.concurrent.ExecutionException; - @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndexingMemoryControllerIT extends ESIntegTestCase { - @Test - public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException { - - createNode(Settings.EMPTY); - - prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); - - prepareCreate("test2").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - final IndexShard shard2 = internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0); - final long expected1ShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes(); - final long expected2ShardsSize = expected1ShardSize / 2; - - boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize && - shard2.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize - ); - - if (!success) { - fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "] shard2 [" + - shard2.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - client().admin().indices().prepareDelete("test2").get(); - success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() >= expected1ShardSize); - - if (!success) { - fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - } - - @Test - public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException, ExecutionException { - - createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build()); - - prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); - - if (randomBoolean()) { - logger.info("--> indexing some pending operations"); - indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0")); - } - - boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); - if (!success) { - fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - index("test1", "type", "1", "f", 1); - - success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); - if (!success) { - fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - if (randomBoolean()) { - logger.info("--> flushing translogs"); - flush(); // clean translogs - } - - success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); - if (!success) { - fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - // Make sure we also pushed the tiny indexing buffer down to the underlying IndexWriter: - assertEquals(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(), getIWBufferSize("test1")); - } - private long getIWBufferSize(String indexName) { return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes(); } @Test - public void testIndexBufferSizeTwoShards() throws InterruptedException { + public void testIndexBufferPushedToEngine() throws InterruptedException { createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h", IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb", IndexShard.INDEX_REFRESH_INTERVAL, "-1").build()); @@ -151,14 +60,32 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase { if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) { fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4")); } + + client().admin().indices().prepareDelete("test4").get(); + if (awaitBusy(() -> getIWBufferSize("test3") == 32 * 1024 * 1024) == false) { + fail("failed to update shard indexing buffer size for test3 index to 32 MB; got: " + getIWBufferSize("test4")); + } + } @Test - public void testIndexBufferNotPercent() throws InterruptedException { - // #13487: Make sure you can specify non-percent sized index buffer and not hit NPE - createNode(Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb").build()); - // ... and that it took: - assertEquals(32*1024*1024, internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes()); + public void testInactivePushedToShard() throws InterruptedException { + createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms", + IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms", + IndexShard.INDEX_REFRESH_INTERVAL, "-1").build()); + + // Create two active indices, sharing 32 MB indexing buffer: + prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + index("test1", "type", "1", "f", 1); + + // make shard the shard buffer was set to inactive size + final ByteSizeValue inactiveBuffer = EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER; + if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) { + fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1")); + } } private void createNode(Settings settings) { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java new file mode 100644 index 00000000000..f6e21db396a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -0,0 +1,285 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.memory; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class IndexingMemoryControllerTests extends ESTestCase { + + static class MockController extends IndexingMemoryController { + + final static ByteSizeValue INACTIVE = new ByteSizeValue(-1); + + final Map translogIds = new HashMap<>(); + final Map translogOps = new HashMap<>(); + + final Map indexingBuffers = new HashMap<>(); + final Map translogBuffers = new HashMap<>(); + + long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); + + public MockController(Settings settings) { + super(Settings.builder() + .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it + .put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate + .put(settings) + .build(), + null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb + } + + public void incTranslog(ShardId shard1, int id, int ops) { + setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops); + } + + public void setTranslog(ShardId id, long translogId, long ops) { + translogIds.put(id, translogId); + translogOps.put(id, ops); + } + + public void deleteShard(ShardId id) { + translogIds.remove(id); + translogOps.remove(id); + indexingBuffers.remove(id); + translogBuffers.remove(id); + } + + public void assertActive(ShardId id) { + assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE))); + assertThat(translogBuffers.get(id), not(equalTo(INACTIVE))); + } + + public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { + assertThat(indexingBuffers.get(id), equalTo(indexing)); + assertThat(translogBuffers.get(id), equalTo(translog)); + } + + public void assertInActive(ShardId id) { + assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); + assertThat(translogBuffers.get(id), equalTo(INACTIVE)); + } + + @Override + protected long currentTimeInNanos() { + return TimeValue.timeValueSeconds(currentTimeSec).nanos(); + } + + @Override + protected List availableShards() { + return new ArrayList<>(translogIds.keySet()); + } + + @Override + protected boolean shardAvailable(ShardId shardId) { + return translogIds.containsKey(shardId); + } + + @Override + protected void markShardAsInactive(ShardId shardId) { + indexingBuffers.put(shardId, INACTIVE); + translogBuffers.put(shardId, INACTIVE); + } + + @Override + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + if (!shardAvailable(shardId)) { + return null; + } + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translogIds.get(shardId); + status.translogNumberOfOperations = translogOps.get(shardId); + return status; + } + + @Override + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + indexingBuffers.put(shardId, shardIndexingBufferSize); + translogBuffers.put(shardId, shardTranslogBufferSize); + } + + public void incrementTimeSec(int sec) { + currentTimeSec += sec; + } + + public void simulateFlush(ShardId shard) { + setTranslog(shard, translogIds.get(shard) + 1, 0); + } + } + + public void testShardAdditionAndRemoval() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // add another shard + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // remove first shard + controller.deleteShard(shard1); + controller.forceCheck(); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // remove second shard + controller.deleteShard(shard2); + controller.forceCheck(); + + // add a new one + final ShardId shard3 = new ShardId("test", 3); + controller.setTranslog(shard3, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + } + + public void testActiveInactive() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") + .put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s") + .build()); + + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into both shards, move the clock and see that they are still active + controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2)); + // the controller doesn't know when the ops happened, so even if this is more + // than the inactive time the shard is still marked as active + controller.incrementTimeSec(10); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into one shard only, see other shard is made inactive correctly + controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.forceCheck(); // register what happened with the controller (shard is still active) + controller.incrementTimeSec(3); // increment but not enough + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + controller.incrementTimeSec(3); // increment some more + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard2); + + if (randomBoolean()) { + // once a shard gets inactive it will be synced flushed and a new translog generation will be made + controller.simulateFlush(shard2); + controller.forceCheck(); + controller.assertInActive(shard2); + } + + // index some and shard becomes immediately active + controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0 + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + } + + public void testMinShardBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); + } + + public void testMaxShardBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") + .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); + } + + public void testRelativeBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") + .build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + + public void testMinBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + public void testMaxBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); + controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); + + } + +}