From 934cc091e6ff0440fa78e6ace8c5b20700141ca1 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Sat, 3 Oct 2015 17:15:59 -0400 Subject: [PATCH] fix tests; pull out translog buffer size constant --- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/translog/TranslogConfig.java | 4 +- .../memory/IndexingMemoryControllerTests.java | 47 ++++++++++++------- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f50bcb8ce70..fe3541f3f4c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1000,7 +1000,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett } /** Returns true if the indexing buffer size did change */ - public boolean updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { final EngineConfig config = engineConfig; final ByteSizeValue preValue = config.getIndexingBufferSize(); @@ -1010,7 +1010,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett Engine engine = engineUnsafe(); if (engine == null) { logger.debug("updateBufferSize: engine is closed; skipping"); - return false; + return; } // update engine if it is already started. diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 4d74961619c..f18da8283ca 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -44,6 +44,8 @@ public final class TranslogConfig { public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER"); + public static final ByteSizeValue DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE = ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE); + private final TimeValue syncInterval; private final BigArrays bigArrays; private final ThreadPool threadPool; @@ -73,7 +75,7 @@ public final class TranslogConfig { this.threadPool = threadPool; this.bigArrays = bigArrays; this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name())); - this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE)).bytes(); // Not really interesting, updated by IndexingMemoryController... + this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE).bytes(); // Not really interesting, updated by IndexingMemoryController... syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); if (syncInterval.millis() > 0 && threadPool != null) { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java index 4e11c7d2cbb..61c265aa9c7 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -22,13 +22,17 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -43,13 +47,14 @@ public class IndexingMemoryControllerTests extends ESTestCase { final Map translogBuffers = new HashMap<>(); final Map lastIndexTimeNanos = new HashMap<>(); + final Set activeShards = new HashSet<>(); long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); public MockController(Settings settings) { super(Settings.builder() .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it - .put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate + .put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate .put(settings) .build(), null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb @@ -60,11 +65,6 @@ public class IndexingMemoryControllerTests extends ESTestCase { translogBuffers.remove(id); } - public void assertActive(ShardId id) { - assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE))); - assertThat(translogBuffers.get(id), not(equalTo(INACTIVE))); - } - public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { assertThat(indexingBuffers.get(id), equalTo(indexing)); assertThat(translogBuffers.get(id), equalTo(translog)); @@ -94,11 +94,12 @@ public class IndexingMemoryControllerTests extends ESTestCase { protected void markShardAsInactive(ShardId shardId) { indexingBuffers.put(shardId, INACTIVE); translogBuffers.put(shardId, INACTIVE); + activeShards.remove(shardId); } @Override protected Boolean getShardActive(ShardId shardId) { - return INACTIVE.equals(indexingBuffers.get(shardId)); + return activeShards.contains(shardId); } @Override @@ -118,6 +119,12 @@ public class IndexingMemoryControllerTests extends ESTestCase { public void simulateIndexing(ShardId shardId) { lastIndexTimeNanos.put(shardId, currentTimeInNanos()); + if (indexingBuffers.containsKey(shardId) == false) { + // First time we are indexing into this shard; start it off with default indexing buffer: + indexingBuffers.put(shardId, EngineConfig.DEFAULT_INDEX_BUFFER_SIZE); + translogBuffers.put(shardId, TranslogConfig.DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE); + } + activeShards.add(shardId); } } @@ -171,31 +178,35 @@ public class IndexingMemoryControllerTests extends ESTestCase { // index into both shards, move the clock and see that they are still active controller.simulateIndexing(shard1); controller.simulateIndexing(shard2); - // the controller doesn't know when the ops happened, so even if this is more - // than the inactive time the shard is still marked as active + controller.incrementTimeSec(10); controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - // index into one shard only, see other shard is made inactive correctly + // both shards now inactive + controller.assertInActive(shard1); + controller.assertInActive(shard2); + + // index into one shard only, see it becomes active controller.simulateIndexing(shard1); controller.forceCheck(); // register what happened with the controller (shard is still active) - controller.incrementTimeSec(3); // increment but not enough + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard2); + + controller.incrementTimeSec(3); // increment but not enough to become inactive controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard2); controller.incrementTimeSec(3); // increment some more controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard1); controller.assertInActive(shard2); // index some and shard becomes immediately active controller.simulateIndexing(shard2); controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertInActive(shard1); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); } public void testMinShardBufferSizes() {