remove 'white lie' and tracking refreshing bytes explicitly

This commit is contained in:
Michael McCandless 2015-12-14 15:39:18 -05:00 committed by mikemccand
parent c66b05d9cf
commit 99e328c9bf
3 changed files with 63 additions and 25 deletions

View File

@ -102,8 +102,6 @@ public class InternalEngine extends Engine {
private volatile SegmentInfos lastCommittedSegmentInfos;
private volatile boolean refreshing;
private final IndexThrottle throttle;
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
@ -489,7 +487,6 @@ public class InternalEngine extends Engine {
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
refreshing = true;
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
ensureOpen();
@ -499,8 +496,6 @@ public class InternalEngine extends Engine {
} catch (Throwable t) {
failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t);
} finally {
refreshing = false;
}
// TODO: maybe we should just put a scheduled job in threadPool?
@ -759,15 +754,7 @@ public class InternalEngine extends Engine {
@Override
public long indexBufferRAMBytesUsed() {
if (refreshing) {
// We tell a "white lie" here, pretending that we instantaneously moved all
// heap to disk at the start of refresh. We do this so IMC behaves as if we
// are using no heap, else it will just keep asking us when it should be
// asking others:
return 0;
} else {
return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
}
return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
}
@Override

View File

@ -506,12 +506,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public void refresh(String source) {
verifyNotClosed();
if (logger.isTraceEnabled()) {
logger.trace("refresh with source: {}", source);
// nocommit OK to throw EngineClosedExc?
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addRefreshingBytes(shardId, ramBytesUsed);
try {
if (logger.isTraceEnabled()) {
logger.trace("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed);
}
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
} finally {
indexingMemoryController.removeRefreshingBytes(shardId, ramBytesUsed);
}
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
}
public RefreshStats refreshStats() {

View File

@ -58,7 +58,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
/** Hardwired translog buffer size */
public static final ByteSizeValue SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("32kb", "SHARD_TRANSLOG_BUFFER");
public static final ByteSizeValue SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("8kb", "SHARD_TRANSLOG_BUFFER");
private final ThreadPool threadPool;
private final IndicesService indicesService;
@ -75,6 +75,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final ShardsIndicesStatusChecker statusChecker;
/** How many bytes we are currently moving to disk by the engine to refresh */
private final AtomicLong bytesRefreshingNow = new AtomicLong();
private final Map<ShardId,Long> refreshingBytes = new ConcurrentHashMap<>();
@Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
@ -117,6 +122,15 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
}
public void addRefreshingBytes(ShardId shardId, long numBytes) {
refreshingBytes.put(shardId, numBytes);
}
public void removeRefreshingBytes(ShardId shardId, long numBytes) {
boolean result = refreshingBytes.remove(shardId);
assert result;
}
@Override
protected void doStart() {
// it's fine to run it on the scheduler thread, no busy work
@ -248,29 +262,59 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
@Override
public synchronized void run() {
// nocommit lower the translog buffer to 8 KB
// nocommit add defensive try/catch-everything here? bad if an errant EngineClosedExc kills off this thread!!
// Fast check to sum up how much heap all shards' indexing buffers are using now:
long totalBytesUsed = 0;
for (ShardId shardId : availableShards()) {
Long refreshingBytes = refreshingBytes.get(shardId);
// Give shard a chance to transition to inactive so sync'd flush can happen:
checkIdle(shardId, inactiveTime.nanos());
totalBytesUsed += getIndexBufferRAMBytesUsed(shardId);
System.out.println("IMC: " + shardId + " using " + (getIndexBufferRAMBytesUsed(shardId)/1024./1024.) + " MB");
// nocommit explain why order is important here!
Long refreshingBytes = refreshingBytes.get(shardId);
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
if (refreshingBytes != null) {
// Only count up bytes not already being refreshed:
shardBytesUsed -= refreshingBytes;
// If the refresh completed just after we pulled refreshingBytes and before we pulled index buffer bytes, then we could
// have a negative value here:
if (shardBytesUsed < 0) {
continue;
}
}
totalBytesUsed += shardBytesUsed;
System.out.println("IMC: " + shardId + " using " + (shardBytesUsed/1024./1024.) + " MB");
}
System.out.println(((System.currentTimeMillis() - startMS)/1000.0) + ": TOT=" + totalBytesUsed + " vs " + indexingBuffer.bytes());
if (totalBytesUsed > indexingBuffer.bytes()) {
if (totalBytesUsed - bytesRefreshingNow.get() > indexingBuffer.bytes()) {
// OK we are using too much; make a queue and ask largest shard(s) to refresh:
logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer);
PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
for (ShardId shardId : availableShards()) {
// nocommit explain why order is important here!
Long refreshingBytes = refreshingBytes.get(shardId);
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
if (refreshingBytes != null) {
// Only count up bytes not already being refreshed:
shardBytesUsed -= refreshingBytes;
// If the refresh completed just after we pulled refreshingBytes and before we pulled index buffer bytes, then we could
// have a negative value here:
if (shardBytesUsed < 0) {
continue;
}
}
if (shardBytesUsed > 0) {
queue.add(new ShardAndBytesUsed(shardBytesUsed, shardId));
}