add throttling test case

This commit is contained in:
Michael McCandless 2015-12-17 06:37:52 -05:00 committed by mikemccand
parent 319dc8c8ed
commit cbb6463425
3 changed files with 177 additions and 62 deletions

View File

@ -541,15 +541,22 @@ public class IndexShard extends AbstractIndexShardComponent {
/** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
public void refresh(String source) {
verifyNotClosed();
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
try {
if (canIndex()) {
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed));
try {
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
indexingMemoryController.removeWritingBytes(this, ramBytesUsed);
}
} else {
logger.debug("refresh with source [{}]", source);
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
indexingMemoryController.removeWritingBytes(this, ramBytesUsed);
}
}
@ -1252,15 +1259,19 @@ public class IndexShard extends AbstractIndexShardComponent {
public void run() {
try {
Engine engine = getEngine();
long bytes = engine.indexBufferRAMBytesUsed();
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, but this is fine because
// after the writes finish, IMC will poll again and see that there's still up to the 20% being used and continue
// writing if necessary:
indexingMemoryController.addWritingBytes(IndexShard.this, bytes);
try {
if (canIndex()) {
long bytes = engine.indexBufferRAMBytesUsed();
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
// there's still up to the 20% being used and continue writing if necessary:
indexingMemoryController.addWritingBytes(IndexShard.this, bytes);
try {
getEngine().writeIndexingBuffer();
} finally {
indexingMemoryController.removeWritingBytes(IndexShard.this, bytes);
}
} else {
getEngine().writeIndexingBuffer();
} finally {
indexingMemoryController.removeWritingBytes(IndexShard.this, bytes);
}
} catch (Exception e) {
handleRefreshException(e);

View File

@ -76,7 +76,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private volatile ScheduledFuture scheduler;
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
private static final EnumSet<IndexShardState> CAN_WRITE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private final ShardsIndicesStatusChecker statusChecker;
@ -129,13 +129,16 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Shard calls this when it starts writing its indexing buffer to disk to notify us */
public void addWritingBytes(IndexShard shard, long numBytes) {
writingBytes.put(shard, numBytes);
logger.debug("IMC: add writing bytes for {}, {} MB", shard.shardId(), numBytes/1024./1024.);
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(numBytes), shard.shardId());
}
/** Shard calls when it's done writing these bytes to disk */
public void removeWritingBytes(IndexShard shard, long numBytes) {
writingBytes.remove(shard);
logger.debug("IMC: clear writing bytes for {}", shard.shardId());
logger.debug("clear [{}] writing bytes for shard [{}]", new ByteSizeValue(numBytes), shard.shardId());
// Since some bytes just freed up, now we check again to give throttling a chance to stop:
forceCheck();
}
@Override
@ -167,7 +170,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
if (shardAvailable(shard)) {
// shadow replica doesn't have an indexing buffer
if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
availableShards.add(shard);
}
}
@ -185,12 +189,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
shard.writeIndexingBufferAsync();
}
/** returns true if shard exists and is availabe for updates */
protected boolean shardAvailable(IndexShard shard) {
// shadow replica doesn't have an indexing buffer
return shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
}
/** used by tests to check if any shards active status changed, now. */
public void forceCheck() {
statusChecker.run();
@ -201,6 +199,16 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
statusChecker.bytesWritten(bytes);
}
/** Asks this shard to throttle indexing to one thread */
protected void activateThrottling(IndexShard shard) {
shard.activateThrottling();
}
/** Asks this shard to stop throttling indexing to one thread */
protected void deactivateThrottling(IndexShard shard) {
shard.deactivateThrottling();
}
static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
final long bytesUsed;
final IndexShard shard;
@ -273,6 +281,10 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING, indexingBuffer, new ByteSizeValue(totalBytesWriting));
}
// If we are using more than 50% of our budget across both indexing buffer and bytes we are still moving to disk, then we now
// throttle the top shards to send back-pressure to ongoing indexing:
boolean doThrottle = (totalBytesWriting + totalBytesUsed) > 1.5 * indexingBuffer.bytes();
if (totalBytesUsed > indexingBuffer.bytes()) {
// OK we are now over-budget; fill the priority queue and ask largest shard(s) to refresh:
logger.debug("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]",
@ -309,10 +321,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
}
// If we are using more than 50% of our budget across both indexing buffer and bytes we are moving to disk, then we now
// throttle the top shards to give back-pressure:
boolean doThrottle = (totalBytesWriting + totalBytesUsed) > 1.5 * indexingBuffer.bytes();
while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) {
ShardAndBytesUsed largest = queue.poll();
logger.debug("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed));
@ -321,17 +329,17 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
if (doThrottle && throttled.contains(largest.shard) == false) {
logger.info("now throttling indexing for shard [{}]: segment writing can't keep up", largest.shard.shardId());
throttled.add(largest.shard);
largest.shard.activateThrottling();
activateThrottling(largest.shard);
}
}
}
if (doThrottle == false) {
for(IndexShard shard : throttled) {
logger.info("stop throttling indexing for shard [{}]", shard.shardId());
shard.deactivateThrottling();
}
throttled.clear();
if (doThrottle == false) {
for(IndexShard shard : throttled) {
logger.info("stop throttling indexing for shard [{}]", shard.shardId());
deactivateThrottling(shard);
}
throttled.clear();
}
bytesWrittenSinceCheck = 0;
@ -339,8 +347,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
/**
* ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so.
* return false if the shard is not idle, otherwise true
* ask this shard to check now whether it is inactive, and reduces its indexing buffer if so.
*/
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {

View File

@ -41,8 +41,15 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
static class MockController extends IndexingMemoryController {
// Size of each shard's indexing buffer
final Map<IndexShard, Long> indexBufferRAMBytesUsed = new HashMap<>();
// How many bytes this shard is currently moving to disk
final Map<IndexShard, Long> writingBytes = new HashMap<>();
// Shards that are currently throttled
final Set<IndexShard> throttled = new HashSet<>();
public MockController(Settings settings) {
super(Settings.builder()
.put(SHARD_MEMORY_INTERVAL_TIME_SETTING, "200h") // disable it
@ -53,6 +60,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
public void deleteShard(IndexShard shard) {
indexBufferRAMBytesUsed.remove(shard);
writingBytes.remove(shard);
}
@Override
@ -60,19 +68,9 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
return new ArrayList<>(indexBufferRAMBytesUsed.keySet());
}
@Override
protected boolean shardAvailable(IndexShard shard) {
return indexBufferRAMBytesUsed.containsKey(shard);
}
@Override
protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
Long used = indexBufferRAMBytesUsed.get(shard);
if (used == null) {
return 0;
} else {
return used;
}
return indexBufferRAMBytesUsed.get(shard) + writingBytes.get(shard);
}
@Override
@ -81,18 +79,57 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
@Override
public void writeIndexingBufferAsync(IndexShard shard) {
long bytes = indexBufferRAMBytesUsed.put(shard, 0L);
writingBytes.put(shard, writingBytes.get(shard) + bytes);
addWritingBytes(shard, bytes);
indexBufferRAMBytesUsed.put(shard, 0L);
}
public void assertBuffer(IndexShard shard, ByteSizeValue expected) {
@Override
public void activateThrottling(IndexShard shard) {
assertTrue(throttled.add(shard));
}
@Override
public void deactivateThrottling(IndexShard shard) {
assertTrue(throttled.remove(shard));
}
public void doneWriting(IndexShard shard) {
long bytes = writingBytes.put(shard, 0L);
removeWritingBytes(shard, bytes);
}
public void assertBuffer(IndexShard shard, int expectedMB) {
Long actual = indexBufferRAMBytesUsed.get(shard);
assertEquals(expected.bytes(), actual.longValue());
if (actual == null) {
actual = 0L;
}
assertEquals(expectedMB * 1024 * 1024, actual.longValue());
}
public void assertThrottled(IndexShard shard) {
assertTrue(throttled.contains(shard));
}
public void assertNotThrottled(IndexShard shard) {
assertFalse(throttled.contains(shard));
}
public void assertWriting(IndexShard shard, int expectedMB) {
Long actual = writingBytes.get(shard);
if (actual == null) {
actual = 0L;
}
assertEquals(expectedMB * 1024 * 1024, actual.longValue());
}
public void simulateIndexing(IndexShard shard) {
Long bytes = indexBufferRAMBytesUsed.get(shard);
if (bytes == null) {
bytes = 0L;
// First time we are seeing this shard:
writingBytes.put(shard, 0L);
}
// Each doc we index takes up a megabyte!
bytes += 1024*1024;
@ -110,18 +147,18 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
IndexShard shard0 = test.getShard(0);
controller.simulateIndexing(shard0);
controller.assertBuffer(shard0, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard0, 1);
// add another shard
IndexShard shard1 = test.getShard(1);
controller.simulateIndexing(shard1);
controller.assertBuffer(shard0, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard0, 1);
controller.assertBuffer(shard1, 1);
// remove first shard
controller.deleteShard(shard0);
controller.forceCheck();
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard1, 1);
// remove second shard
controller.deleteShard(shard1);
@ -130,7 +167,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
// add a new one
IndexShard shard2 = test.getShard(2);
controller.simulateIndexing(shard2);
controller.assertBuffer(shard2, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard2, 1);
}
public void testActiveInactive() {
@ -148,28 +185,28 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
IndexShard shard1 = test.getShard(1);
controller.simulateIndexing(shard1);
controller.assertBuffer(shard0, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard1, new ByteSizeValue(1, ByteSizeUnit.MB));
controller.assertBuffer(shard0, 1);
controller.assertBuffer(shard1, 1);
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard1);
controller.assertBuffer(shard0, new ByteSizeValue(2, ByteSizeUnit.MB));
controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB));
controller.assertBuffer(shard0, 2);
controller.assertBuffer(shard1, 2);
// index into one shard only, crosses the 5mb limit, so shard1 is refreshed
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard0);
controller.assertBuffer(shard0, new ByteSizeValue(0, ByteSizeUnit.MB));
controller.assertBuffer(shard1, new ByteSizeValue(2, ByteSizeUnit.MB));
controller.assertBuffer(shard0, 0);
controller.assertBuffer(shard1, 2);
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard1);
controller.assertBuffer(shard1, new ByteSizeValue(4, ByteSizeUnit.MB));
controller.assertBuffer(shard1, 4);
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard1);
// shard1 crossed 5 mb and is now cleared:
controller.assertBuffer(shard1, new ByteSizeValue(0, ByteSizeUnit.MB));
controller.assertBuffer(shard1, 0);
}
public void testMinBufferSizes() {
@ -188,6 +225,66 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
}
public void testThrottling() throws Exception {
createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
MockController controller = new MockController(Settings.builder()
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "4mb").build());
IndexShard shard0 = test.getShard(0);
IndexShard shard1 = test.getShard(1);
IndexShard shard2 = test.getShard(2);
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard0);
controller.assertBuffer(shard0, 3);
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard1);
// We are now using 5 MB, so we should be writing shard0 since it's using the most heap:
controller.assertWriting(shard0, 3);
controller.assertWriting(shard1, 0);
controller.assertBuffer(shard0, 0);
controller.assertBuffer(shard1, 2);
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard1);
controller.simulateIndexing(shard1);
// Now we are still writing 3 MB (shard0), and using 5 MB index buffers, so we should now 1) be writing shard1, and 2) be throttling shard1:
controller.assertWriting(shard0, 3);
controller.assertWriting(shard1, 4);
controller.assertBuffer(shard0, 1);
controller.assertBuffer(shard1, 0);
controller.assertNotThrottled(shard0);
controller.assertThrottled(shard1);
System.out.println("TEST: now index more");
// More indexing to shard0
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard0);
controller.simulateIndexing(shard0);
// Now we are using 5 MB again, so shard0 should also be writing and now also be throttled:
controller.assertWriting(shard0, 8);
controller.assertWriting(shard1, 4);
controller.assertBuffer(shard0, 0);
controller.assertBuffer(shard1, 0);
controller.assertThrottled(shard0);
controller.assertThrottled(shard1);
// Both shards finally finish writing, and throttling should stop:
controller.doneWriting(shard0);
controller.doneWriting(shard1);
controller.assertNotThrottled(shard0);
controller.assertNotThrottled(shard1);
}
// #10312
public void testDeletesAloneCanTriggerRefresh() throws Exception {
createIndex("index",