From 148265bd164cd5a614cd020fb480d5974f523d81 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Sep 2015 08:27:47 +0200 Subject: [PATCH] Internal: an inactive shard is temporarily activated by triggered synced flush When a shard becomes in active we trigger a sync flush in order to speed up future recoveries. The sync flush causes a new translog generation to be made, which in turn confuses the IndexingMemoryController making it think that the shard is active. If no documents comes along in the next 5m, the shard is made inactive again , triggering a sync flush and so forth. To avoid this, the IndexingMemoryController is changed to ignore empty translogs when checking if a shard became active. This comes with the price of potentially missing indexing operations which are followed by a flush. This is acceptable as if no more index operation come in, it's OK to leave the shard in active. A new unit test is introduced and comparable integration tests are removed. Closes #13802 --- .../memory/IndexingMemoryController.java | 318 +++++++++++------- .../memory/IndexingMemoryControllerIT.java | 123 ++----- .../memory/IndexingMemoryControllerTests.java | 285 ++++++++++++++++ 3 files changed, 512 insertions(+), 214 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index f54020802b5..8703ab2b10a 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.memory; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -48,7 +49,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponentindices.memory.index_buffer_size is a %, to set a floor on the actual size in bytes (default: 48 MB). */ public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size"; @@ -101,8 +102,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); + private final ShardsIndicesStatusChecker statusChecker; + @Inject public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { + this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); + } + + // for testing + protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { super(settings); this.threadPool = threadPool; this.indicesService = indicesService; @@ -111,7 +119,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() { + ArrayList list = new ArrayList<>(); + + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (shardAvailable(indexShard)) { + list.add(indexShard.shardId()); + } + } + } + return list; + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(ShardId shardId) { + return shardAvailable(getShard(shardId)); + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(@Nullable IndexShard shard) { + // shadow replica doesn't have an indexing buffer + return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state()); + } + + /** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */ + protected IndexShard getShard(ShardId shardId) { + IndexService indexService = indicesService.indexService(shardId.index().name()); + if (indexService != null) { + IndexShard indexShard = indexService.shard(shardId.id()); + return indexShard; + } + return null; + } + + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + final IndexShard shard = getShard(shardId); + if (shard != null) { + try { + shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize); + } catch (EngineClosedException e) { + // ignore + } catch (FlushNotAllowedEngineException e) { + // ignore + } catch (Exception e) { + logger.warn("failed to set shard {} index buffer to [{}]", shardId, shardIndexingBufferSize); + } + } + } + + + /** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */ + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + final IndexShard indexShard = getShard(shardId); + if (indexShard == null) { + return null; + } + final Translog translog; + try { + translog = indexShard.engine().getTranslog(); + } catch (EngineClosedException e) { + // not ready yet to be checked for activity + return null; + } + + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translog.currentFileGeneration(); + status.translogNumberOfOperations = translog.totalOperations(); + return status; + } + + // used for tests + void forceCheck() { + statusChecker.run(); + } + class ShardsIndicesStatusChecker implements Runnable { private final Map shardsIndicesStatus = new HashMap<>(); @@ -194,19 +289,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent changes = purgeDeletedAndClosedShards(); - final List activeToInactiveIndexingShards = new ArrayList<>(); + final List activeToInactiveIndexingShards = new ArrayList<>(); final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards); - for (IndexShard indexShard : activeToInactiveIndexingShards) { - // update inactive indexing buffer size - try { - indexShard.markAsInactive(); - } catch (EngineClosedException e) { - // ignore - logger.trace("ignore EngineClosedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id()); - } catch (FlushNotAllowedEngineException e) { - // ignore - logger.trace("ignore FlushNotAllowedException while marking shard [{}][{}] as inactive", indexShard.shardId().index().name(), indexShard.shardId().id()); - } + for (ShardId indexShard : activeToInactiveIndexingShards) { + markShardAsInactive(indexShard); } if (changes.isEmpty() == false) { @@ -220,70 +306,42 @@ public class IndexingMemoryController extends AbstractLifecycleComponent changes, List activeToInactiveIndexingShards) { + private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { int activeShards = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { + for (ShardId shardId : availableShards()) { - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) { - // not ready to be updated yet - continue; - } + final ShardIndexingStatus currentStatus = getTranslogStatus(shardId); - if (indexShard.canIndex() == false) { - // shadow replica doesn't have an indexing buffer - continue; - } + if (currentStatus == null) { + // shard was closed.. + continue; + } - final Translog translog; - try { - translog = indexShard.engine().getTranslog(); - } catch (EngineClosedException e) { - // not ready yet to be checked for activity - continue; - } - - final long timeMS = threadPool.estimatedTimeInMillis(); - - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null) { - status = new ShardIndexingStatus(); - shardsIndicesStatus.put(indexShard.shardId(), status); - changes.add(ShardStatusChangeType.ADDED); - } - - // consider shard inactive if it has same translogFileGeneration and no operations for a long time - if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == status.translogNumberOfOperations) { - if (status.timeMS == -1) { - // first time we noticed the shard become idle - status.timeMS = timeMS; - } - // mark it as inactive only if enough time has passed - if (status.activeIndexing && (timeMS - status.timeMS) > inactiveTime.millis()) { - // inactive for this amount of time, mark it - activeToInactiveIndexingShards.add(indexShard); - status.activeIndexing = false; - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", - indexShard.shardId().index().name(), indexShard.shardId().id(), - inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); - } - } else { - if (!status.activeIndexing) { - status.activeIndexing = true; - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); - } - status.timeMS = -1; - } - status.translogId = translog.currentFileGeneration(); - status.translogNumberOfOperations = translog.totalOperations(); - - if (status.activeIndexing) { - activeShards++; + ShardIndexingStatus status = shardsIndicesStatus.get(shardId); + if (status == null) { + status = currentStatus; + shardsIndicesStatus.put(shardId, status); + changes.add(ShardStatusChangeType.ADDED); + } else { + final boolean lastActiveIndexing = status.activeIndexing; + status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos()); + if (lastActiveIndexing && (status.activeIndexing == false)) { + activeToInactiveIndexingShards.add(shardId); + changes.add(ShardStatusChangeType.BECAME_INACTIVE); + logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", + shardId, + inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); + } else if ((lastActiveIndexing == false) && status.activeIndexing) { + changes.add(ShardStatusChangeType.BECAME_ACTIVE); + logger.debug("marking shard {} as active indexing wise", shardId); } } + + if (status.activeIndexing) { + activeShards++; + } } + return activeShards; } @@ -297,20 +355,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent statusShardIdIterator = shardsIndicesStatus.keySet().iterator(); while (statusShardIdIterator.hasNext()) { - ShardId statusShardId = statusShardIdIterator.next(); - IndexService indexService = indicesService.indexService(statusShardId.getIndex()); - boolean remove; - if (indexService == null) { - remove = true; - } else { - IndexShard indexShard = indexService.shard(statusShardId.id()); - if (indexShard == null) { - remove = true; - } else { - remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state()); - } - } - if (remove) { + ShardId shardId = statusShardIdIterator.next(); + if (shardAvailable(shardId) == false) { changes.add(ShardStatusChangeType.DELETED); statusShardIdIterator.remove(); } @@ -340,40 +386,80 @@ public class IndexingMemoryController extends AbstractLifecycleComponent inactiveNanoInterval) { + // shard is inactive. mark it as such. + activeIndexing = false; + } + } else if (activeIndexing == false // we weren't indexing before + && idle == false // but we do now + && current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow + ) { + // since we sync flush once a shard becomes inactive, the translog id can change, however that + // doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens + // immediately after an indexing operation we may not become active immediately. The following + // indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay + // inactive + + activeIndexing = true; + idleSinceNanoTime = -1; + } + + translogId = current.translogId; + translogNumberOfOperations = current.translogNumberOfOperations; + } } } diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java index 6c0f041b2b8..aaca771853b 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java @@ -22,115 +22,24 @@ package org.elasticsearch.indices.memory; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.test.ESIntegTestCase; import org.junit.Test; -import java.util.concurrent.ExecutionException; - @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndexingMemoryControllerIT extends ESIntegTestCase { - @Test - public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException { - - createNode(Settings.EMPTY); - - prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); - - prepareCreate("test2").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - final IndexShard shard2 = internalCluster().getInstance(IndicesService.class).indexService("test2").shard(0); - final long expected1ShardSize = internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes(); - final long expected2ShardsSize = expected1ShardSize / 2; - - boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize && - shard2.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize - ); - - if (!success) { - fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "] shard2 [" + - shard2.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - client().admin().indices().prepareDelete("test2").get(); - success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() >= expected1ShardSize); - - if (!success) { - fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - } - - @Test - public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException, ExecutionException { - - createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build()); - - prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); - - ensureGreen(); - - final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0); - - if (randomBoolean()) { - logger.info("--> indexing some pending operations"); - indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0")); - } - - boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); - if (!success) { - fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - index("test1", "type", "1", "f", 1); - - success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); - if (!success) { - fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - if (randomBoolean()) { - logger.info("--> flushing translogs"); - flush(); // clean translogs - } - - success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); - if (!success) { - fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().config().getIndexingBufferSize().bytes() + "]" - ); - } - - // Make sure we also pushed the tiny indexing buffer down to the underlying IndexWriter: - assertEquals(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(), getIWBufferSize("test1")); - } - private long getIWBufferSize(String indexName) { return client().admin().indices().prepareStats(indexName).get().getTotal().getSegments().getIndexWriterMaxMemoryInBytes(); } @Test - public void testIndexBufferSizeTwoShards() throws InterruptedException { + public void testIndexBufferPushedToEngine() throws InterruptedException { createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100000h", IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb", IndexShard.INDEX_REFRESH_INTERVAL, "-1").build()); @@ -151,14 +60,32 @@ public class IndexingMemoryControllerIT extends ESIntegTestCase { if (awaitBusy(() -> getIWBufferSize("test4") == 16*1024*1024) == false) { fail("failed to update shard indexing buffer size for test4 index to 16 MB; got: " + getIWBufferSize("test4")); } + + client().admin().indices().prepareDelete("test4").get(); + if (awaitBusy(() -> getIWBufferSize("test3") == 32 * 1024 * 1024) == false) { + fail("failed to update shard indexing buffer size for test3 index to 32 MB; got: " + getIWBufferSize("test4")); + } + } @Test - public void testIndexBufferNotPercent() throws InterruptedException { - // #13487: Make sure you can specify non-percent sized index buffer and not hit NPE - createNode(Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "32mb").build()); - // ... and that it took: - assertEquals(32*1024*1024, internalCluster().getInstance(IndexingMemoryController.class).indexingBufferSize().bytes()); + public void testInactivePushedToShard() throws InterruptedException { + createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms", + IndexingMemoryController.SHARD_INACTIVE_INTERVAL_TIME_SETTING, "100ms", + IndexShard.INDEX_REFRESH_INTERVAL, "-1").build()); + + // Create two active indices, sharing 32 MB indexing buffer: + prepareCreate("test1").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get(); + + ensureGreen(); + + index("test1", "type", "1", "f", 1); + + // make shard the shard buffer was set to inactive size + final ByteSizeValue inactiveBuffer = EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER; + if (awaitBusy(() -> getIWBufferSize("test1") == inactiveBuffer.bytes()) == false) { + fail("failed to update shard indexing buffer size for test1 index to [" + inactiveBuffer + "]; got: " + getIWBufferSize("test1")); + } } private void createNode(Settings settings) { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java new file mode 100644 index 00000000000..f6e21db396a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -0,0 +1,285 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.memory; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class IndexingMemoryControllerTests extends ESTestCase { + + static class MockController extends IndexingMemoryController { + + final static ByteSizeValue INACTIVE = new ByteSizeValue(-1); + + final Map translogIds = new HashMap<>(); + final Map translogOps = new HashMap<>(); + + final Map indexingBuffers = new HashMap<>(); + final Map translogBuffers = new HashMap<>(); + + long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); + + public MockController(Settings settings) { + super(Settings.builder() + .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it + .put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate + .put(settings) + .build(), + null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb + } + + public void incTranslog(ShardId shard1, int id, int ops) { + setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops); + } + + public void setTranslog(ShardId id, long translogId, long ops) { + translogIds.put(id, translogId); + translogOps.put(id, ops); + } + + public void deleteShard(ShardId id) { + translogIds.remove(id); + translogOps.remove(id); + indexingBuffers.remove(id); + translogBuffers.remove(id); + } + + public void assertActive(ShardId id) { + assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE))); + assertThat(translogBuffers.get(id), not(equalTo(INACTIVE))); + } + + public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { + assertThat(indexingBuffers.get(id), equalTo(indexing)); + assertThat(translogBuffers.get(id), equalTo(translog)); + } + + public void assertInActive(ShardId id) { + assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); + assertThat(translogBuffers.get(id), equalTo(INACTIVE)); + } + + @Override + protected long currentTimeInNanos() { + return TimeValue.timeValueSeconds(currentTimeSec).nanos(); + } + + @Override + protected List availableShards() { + return new ArrayList<>(translogIds.keySet()); + } + + @Override + protected boolean shardAvailable(ShardId shardId) { + return translogIds.containsKey(shardId); + } + + @Override + protected void markShardAsInactive(ShardId shardId) { + indexingBuffers.put(shardId, INACTIVE); + translogBuffers.put(shardId, INACTIVE); + } + + @Override + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + if (!shardAvailable(shardId)) { + return null; + } + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translogIds.get(shardId); + status.translogNumberOfOperations = translogOps.get(shardId); + return status; + } + + @Override + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + indexingBuffers.put(shardId, shardIndexingBufferSize); + translogBuffers.put(shardId, shardTranslogBufferSize); + } + + public void incrementTimeSec(int sec) { + currentTimeSec += sec; + } + + public void simulateFlush(ShardId shard) { + setTranslog(shard, translogIds.get(shard) + 1, 0); + } + } + + public void testShardAdditionAndRemoval() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // add another shard + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // remove first shard + controller.deleteShard(shard1); + controller.forceCheck(); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // remove second shard + controller.deleteShard(shard2); + controller.forceCheck(); + + // add a new one + final ShardId shard3 = new ShardId("test", 3); + controller.setTranslog(shard3, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + } + + public void testActiveInactive() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") + .put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s") + .build()); + + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into both shards, move the clock and see that they are still active + controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2)); + // the controller doesn't know when the ops happened, so even if this is more + // than the inactive time the shard is still marked as active + controller.incrementTimeSec(10); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into one shard only, see other shard is made inactive correctly + controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.forceCheck(); // register what happened with the controller (shard is still active) + controller.incrementTimeSec(3); // increment but not enough + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + controller.incrementTimeSec(3); // increment some more + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard2); + + if (randomBoolean()) { + // once a shard gets inactive it will be synced flushed and a new translog generation will be made + controller.simulateFlush(shard2); + controller.forceCheck(); + controller.assertInActive(shard2); + } + + // index some and shard becomes immediately active + controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0 + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + } + + public void testMinShardBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); + } + + public void testMaxShardBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") + .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); + } + + public void testRelativeBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") + .build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + + public void testMinBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + public void testMaxBufferSizes() { + MockController controller = new MockController(Settings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); + controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); + + } + +}