fix tests; pull out translog buffer size constant
This commit is contained in:
parent
f27c0adb0b
commit
934cc091e6
|
@ -1000,7 +1000,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns true if the indexing buffer size did change */
|
/** Returns true if the indexing buffer size did change */
|
||||||
public boolean updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||||
|
|
||||||
final EngineConfig config = engineConfig;
|
final EngineConfig config = engineConfig;
|
||||||
final ByteSizeValue preValue = config.getIndexingBufferSize();
|
final ByteSizeValue preValue = config.getIndexingBufferSize();
|
||||||
|
@ -1010,7 +1010,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
||||||
Engine engine = engineUnsafe();
|
Engine engine = engineUnsafe();
|
||||||
if (engine == null) {
|
if (engine == null) {
|
||||||
logger.debug("updateBufferSize: engine is closed; skipping");
|
logger.debug("updateBufferSize: engine is closed; skipping");
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update engine if it is already started.
|
// update engine if it is already started.
|
||||||
|
|
|
@ -44,6 +44,8 @@ public final class TranslogConfig {
|
||||||
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
||||||
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
|
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
|
||||||
|
|
||||||
|
public static final ByteSizeValue DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE = ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE);
|
||||||
|
|
||||||
private final TimeValue syncInterval;
|
private final TimeValue syncInterval;
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
@ -73,7 +75,7 @@ public final class TranslogConfig {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name()));
|
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name()));
|
||||||
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE)).bytes(); // Not really interesting, updated by IndexingMemoryController...
|
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE).bytes(); // Not really interesting, updated by IndexingMemoryController...
|
||||||
|
|
||||||
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
||||||
if (syncInterval.millis() > 0 && threadPool != null) {
|
if (syncInterval.millis() > 0 && threadPool != null) {
|
||||||
|
|
|
@ -22,13 +22,17 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.translog.TranslogConfig;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
|
@ -43,13 +47,14 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
|
final Map<ShardId, ByteSizeValue> translogBuffers = new HashMap<>();
|
||||||
|
|
||||||
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
|
final Map<ShardId, Long> lastIndexTimeNanos = new HashMap<>();
|
||||||
|
final Set<ShardId> activeShards = new HashSet<>();
|
||||||
|
|
||||||
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
|
long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds();
|
||||||
|
|
||||||
public MockController(Settings settings) {
|
public MockController(Settings settings) {
|
||||||
super(Settings.builder()
|
super(Settings.builder()
|
||||||
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
|
.put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it
|
||||||
.put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate
|
.put(SHARD_INACTIVE_TIME_SETTING, "1ms") // nearly immediate
|
||||||
.put(settings)
|
.put(settings)
|
||||||
.build(),
|
.build(),
|
||||||
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb
|
||||||
|
@ -60,11 +65,6 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
translogBuffers.remove(id);
|
translogBuffers.remove(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void assertActive(ShardId id) {
|
|
||||||
assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE)));
|
|
||||||
assertThat(translogBuffers.get(id), not(equalTo(INACTIVE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
|
public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) {
|
||||||
assertThat(indexingBuffers.get(id), equalTo(indexing));
|
assertThat(indexingBuffers.get(id), equalTo(indexing));
|
||||||
assertThat(translogBuffers.get(id), equalTo(translog));
|
assertThat(translogBuffers.get(id), equalTo(translog));
|
||||||
|
@ -94,11 +94,12 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
protected void markShardAsInactive(ShardId shardId) {
|
protected void markShardAsInactive(ShardId shardId) {
|
||||||
indexingBuffers.put(shardId, INACTIVE);
|
indexingBuffers.put(shardId, INACTIVE);
|
||||||
translogBuffers.put(shardId, INACTIVE);
|
translogBuffers.put(shardId, INACTIVE);
|
||||||
|
activeShards.remove(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Boolean getShardActive(ShardId shardId) {
|
protected Boolean getShardActive(ShardId shardId) {
|
||||||
return INACTIVE.equals(indexingBuffers.get(shardId));
|
return activeShards.contains(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -118,6 +119,12 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
|
|
||||||
public void simulateIndexing(ShardId shardId) {
|
public void simulateIndexing(ShardId shardId) {
|
||||||
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
|
lastIndexTimeNanos.put(shardId, currentTimeInNanos());
|
||||||
|
if (indexingBuffers.containsKey(shardId) == false) {
|
||||||
|
// First time we are indexing into this shard; start it off with default indexing buffer:
|
||||||
|
indexingBuffers.put(shardId, EngineConfig.DEFAULT_INDEX_BUFFER_SIZE);
|
||||||
|
translogBuffers.put(shardId, TranslogConfig.DEFAULT_SHARD_TRANSLOG_BUFFER_SIZE);
|
||||||
|
}
|
||||||
|
activeShards.add(shardId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,31 +178,35 @@ public class IndexingMemoryControllerTests extends ESTestCase {
|
||||||
// index into both shards, move the clock and see that they are still active
|
// index into both shards, move the clock and see that they are still active
|
||||||
controller.simulateIndexing(shard1);
|
controller.simulateIndexing(shard1);
|
||||||
controller.simulateIndexing(shard2);
|
controller.simulateIndexing(shard2);
|
||||||
// the controller doesn't know when the ops happened, so even if this is more
|
|
||||||
// than the inactive time the shard is still marked as active
|
|
||||||
controller.incrementTimeSec(10);
|
controller.incrementTimeSec(10);
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
|
||||||
|
|
||||||
// index into one shard only, see other shard is made inactive correctly
|
// both shards now inactive
|
||||||
|
controller.assertInActive(shard1);
|
||||||
|
controller.assertInActive(shard2);
|
||||||
|
|
||||||
|
// index into one shard only, see it becomes active
|
||||||
controller.simulateIndexing(shard1);
|
controller.simulateIndexing(shard1);
|
||||||
controller.forceCheck(); // register what happened with the controller (shard is still active)
|
controller.forceCheck(); // register what happened with the controller (shard is still active)
|
||||||
controller.incrementTimeSec(3); // increment but not enough
|
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||||
|
controller.assertInActive(shard2);
|
||||||
|
|
||||||
|
controller.incrementTimeSec(3); // increment but not enough to become inactive
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertInActive(shard2);
|
||||||
|
|
||||||
controller.incrementTimeSec(3); // increment some more
|
controller.incrementTimeSec(3); // increment some more
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
controller.assertInActive(shard1);
|
||||||
controller.assertInActive(shard2);
|
controller.assertInActive(shard2);
|
||||||
|
|
||||||
// index some and shard becomes immediately active
|
// index some and shard becomes immediately active
|
||||||
controller.simulateIndexing(shard2);
|
controller.simulateIndexing(shard2);
|
||||||
controller.forceCheck();
|
controller.forceCheck();
|
||||||
controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertInActive(shard1);
|
||||||
controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB));
|
controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMinShardBufferSizes() {
|
public void testMinShardBufferSizes() {
|
||||||
|
|
Loading…
Reference in New Issue