use IW.flush to move bytes to disk

This commit is contained in:
Michael McCandless 2015-12-15 06:29:45 -05:00 committed by mikemccand
parent 27d8509f0e
commit 86a0dd0f28
6 changed files with 96 additions and 15 deletions

View File

@ -478,6 +478,12 @@ public abstract class Engine implements Closeable {
*/
public abstract void refresh(String source) throws EngineException;
/**
* Called when our engine is using too much heap and should move buffered indexed/deleted documents to disk.
*/
// NOTE: do NOT rename this to something containing flush or refresh!
public abstract void writeIndexingBuffer() throws EngineException;
/**
* Flushes the state of the engine including the transaction log, clearing memory.
*

View File

@ -506,6 +506,45 @@ public class InternalEngine extends Engine {
mergeScheduler.refreshConfig();
}
@Override
public void writeIndexingBuffer() throws EngineException {
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
// searcher managers, one for searching which is only refreshed by the schedule the user asks for, and another for version
// map interactions:
boolean useRefresh = versionMapRefreshPending.get() || (indexWriter.ramBytesUsed()/4 < versionMap.ramBytesUsedForRefresh());
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (useRefresh) {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
searcherManager.maybeRefreshBlocking();
} else {
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
indexWriter.flush();
}
} catch (AlreadyClosedException e) {
ensureOpen();
maybeFailEngine("writeIndexingBuffer", e);
} catch (EngineClosedException e) {
throw e;
} catch (Throwable t) {
failEngine("writeIndexingBuffer failed", t);
throw new RefreshFailedEngineException(shardId, t);
}
// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
if (useRefresh) {
maybePruneDeletedTombstones();
versionMapRefreshPending.set(false);
mergeScheduler.refreshConfig();
}
}
@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks

View File

@ -231,4 +231,10 @@ public class ShadowEngine extends Engine {
// No IndexWriter nor version map
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
@Override
public void writeIndexingBuffer() {
// No indexing buffer
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
}

View File

@ -537,7 +537,7 @@ public class IndexShard extends AbstractIndexShardComponent {
verifyNotClosed();
// nocommit OK to throw EngineClosedExc?
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addRefreshingBytes(this, ramBytesUsed);
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
try {
if (logger.isTraceEnabled()) {
logger.trace("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed);
@ -546,7 +546,7 @@ public class IndexShard extends AbstractIndexShardComponent {
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
indexingMemoryController.removeRefreshingBytes(this, ramBytesUsed);
indexingMemoryController.removeWritingBytes(this, ramBytesUsed);
}
}
@ -1210,17 +1210,42 @@ public class IndexShard extends AbstractIndexShardComponent {
}
/**
* Asynchronously refreshes the engine for new search operations to reflect the latest
* changes.
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
*/
public void refreshAsync(final String reason) {
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void writeIndexingBufferAsync() {
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
@Override
public void run() {
try {
refresh(reason);
Engine engine = getEngine();
long bytes = engine.indexBufferRAMBytesUsed();
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, but this is fine because
// after the writes finish, IMC will poll again and see that there's still up to the 20% being used and continue
// writing if necessary:
indexingMemoryController.addWritingBytes(IndexShard.this, bytes);
try {
getEngine().writeIndexingBuffer();
} finally {
indexingMemoryController.removeWritingBytes(IndexShard.this, bytes);
}
} catch (EngineClosedException ex) {
// ignore
} catch (RefreshFailedEngineException e) {
if (e.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ClosedByInterruptException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ThreadInterruptedException) {
// ignore, we are being shutdown
} else {
if (state != IndexShardState.CLOSED) {
logger.warn("Failed to perform scheduled engine refresh", e);
}
}
} catch (Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("Failed to perform scheduled engine refresh", e);
}
}
}
});

View File

@ -124,11 +124,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
}
public void addRefreshingBytes(IndexShard shard, long numBytes) {
/** Shard calls this to notify us that this many bytes are being asynchronously moved from RAM to disk */
public void addWritingBytes(IndexShard shard, long numBytes) {
refreshingBytes.put(shard, numBytes);
}
public void removeRefreshingBytes(IndexShard shard, long numBytes) {
/** Shard calls this to notify us that this many bytes are are done being asynchronously moved from RAM to disk */
public void removeWritingBytes(IndexShard shard, long numBytes) {
// nocommit this can fail, if two refreshes are running "concurrently"
Long result = refreshingBytes.remove(shard);
assert result != null;
}
@ -176,8 +179,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
/** ask this shard to refresh, in the background, to free up heap */
protected void refreshShardAsync(IndexShard shard) {
shard.refreshAsync("memory");
protected void writeIndexingBufferAsync(IndexShard shard) {
shard.writeIndexingBufferAsync();
}
/** returns true if shard exists and is availabe for updates */
@ -218,12 +221,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
long bytesWrittenSinceCheck;
/** Shard calls this on each indexing/delete op */
public synchronized void bytesWritten(int bytes) {
bytesWrittenSinceCheck += bytes;
if (bytesWrittenSinceCheck > indexingBuffer.bytes()/20) {
// NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
// typically smaller. But this logic is here only as a safety against thread starvation or too infrequent checking,
// to ensure we are still checking in proportion to bytes processed by indexing:
// typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
// thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
// processed by indexing:
System.out.println(((System.currentTimeMillis() - startMS)/1000.0) + ": NOW CHECK xlog=" + bytesWrittenSinceCheck);
run();
}
@ -293,7 +298,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
ShardAndBytesUsed largest = queue.poll();
System.out.println("IMC: write " + largest.shard.shardId() + ": " + (largest.bytesUsed/1024./1024.) + " MB");
logger.debug("refresh shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed));
refreshShardAsync(largest.shard);
writeIndexingBufferAsync(largest.shard);
totalBytesUsed -= largest.bytesUsed;
}
}

View File

@ -76,7 +76,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
}
@Override
public void refreshShardAsync(IndexShard shard) {
public void writeIndexingBufferAsync(IndexShard shard) {
indexBufferRAMBytesUsed.put(shard, 0L);
}