move async-ness upwards

This commit is contained in:
Michael McCandless 2016-01-05 11:27:05 -05:00 committed by mikemccand
parent 99d6ec53fa
commit 1d46a00d43
2 changed files with 26 additions and 25 deletions

View File

@ -1264,33 +1264,29 @@ public class IndexShard extends AbstractIndexShardComponent {
/** /**
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk. * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
*/ */
public void writeIndexingBufferAsync() { public void writeIndexingBuffer() {
if (canIndex() == false) { if (canIndex() == false) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { try {
@Override Engine engine = getEngine();
public void run() { long bytes = engine.getIndexBufferRAMBytesUsed();
try {
Engine engine = getEngine(); // NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
long bytes = engine.getIndexBufferRAMBytesUsed(); // memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map // there's still up to the 20% being used and continue writing if necessary:
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
// there's still up to the 20% being used and continue writing if necessary: writingBytes.addAndGet(bytes);
writingBytes.addAndGet(bytes); try {
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); engine.writeIndexingBuffer();
try { } finally {
getEngine().writeIndexingBuffer(); // nocommit but we don't promptly stop index throttling anymore?
} finally { writingBytes.addAndGet(-bytes);
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
// nocommit but we don't promptly stop index throttling anymore? }
writingBytes.addAndGet(-bytes); } catch (Exception e) {
} handleRefreshException(e);
} catch (Exception e) { };
handleRefreshException(e);
}
}
});
} }
final class EngineRefresher implements Runnable { final class EngineRefresher implements Runnable {

View File

@ -174,7 +174,12 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** ask this shard to refresh, in the background, to free up heap */ /** ask this shard to refresh, in the background, to free up heap */
protected void writeIndexingBufferAsync(IndexShard shard) { protected void writeIndexingBufferAsync(IndexShard shard) {
shard.writeIndexingBufferAsync(); threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
@Override
public void run() {
shard.writeIndexingBuffer();
}
});
} }
/** force checker to run now */ /** force checker to run now */