trigger index throttling if writing indexing buffers to disk can't keep up and we are >= 50% over the allowed budget

This commit is contained in:
Michael McCandless 2015-12-16 06:41:24 -05:00 committed by mikemccand
parent 86a0dd0f28
commit 330bc5b614
5 changed files with 131 additions and 43 deletions

View File

@ -1087,4 +1087,14 @@ public abstract class Engine implements Closeable {
*/ */
void warm(Engine.Searcher searcher, boolean isTopLevelReader); void warm(Engine.Searcher searcher, boolean isTopLevelReader);
} }
/**
* Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link deactivateThrottling}.
*/
public abstract void activateThrottling();
/**
* Reverses a previous {@link #activateThrottling} call.
*/
public abstract void deactivateThrottling();
} }

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -510,9 +511,12 @@ public class InternalEngine extends Engine {
public void writeIndexingBuffer() throws EngineException { public void writeIndexingBuffer() throws EngineException {
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
// searcher managers, one for searching which is only refreshed by the schedule the user asks for, and another for version // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
// map interactions: // refresh API), and another for version map interactions:
boolean useRefresh = versionMapRefreshPending.get() || (indexWriter.ramBytesUsed()/4 < versionMap.ramBytesUsedForRefresh()); long versionMapBytes = versionMap.ramBytesUsedForRefresh();
long indexingBufferBytes = indexWriter.ramBytesUsed();
boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing // we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
@ -520,9 +524,13 @@ public class InternalEngine extends Engine {
ensureOpen(); ensureOpen();
if (useRefresh) { if (useRefresh) {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
searcherManager.maybeRefreshBlocking(); logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
refresh("write indexing buffer");
} else { } else {
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush: // Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
logger.debug("use flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
indexWriter.flush(); indexWriter.flush();
} }
} catch (AlreadyClosedException e) { } catch (AlreadyClosedException e) {
@ -1043,12 +1051,24 @@ public class InternalEngine extends Engine {
} }
} }
private final AtomicInteger throttleRequestCount = new AtomicInteger();
@Override
public void activateThrottling() { public void activateThrottling() {
throttle.activate(); int count = throttleRequestCount.incrementAndGet();
assert count >= 1;
if (count == 1) {
throttle.activate();
}
} }
@Override
public void deactivateThrottling() { public void deactivateThrottling() {
throttle.deactivate(); int count = throttleRequestCount.decrementAndGet();
assert count >= 0;
if (count == 0) {
throttle.deactivate();
}
} }
long getGcDeletesInMillis() { long getGcDeletesInMillis() {

View File

@ -237,4 +237,14 @@ public class ShadowEngine extends Engine {
// No indexing buffer // No indexing buffer
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
} }
@Override
public void activateThrottling() {
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
@Override
public void deactivateThrottling() {
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
} }

View File

@ -539,9 +539,7 @@ public class IndexShard extends AbstractIndexShardComponent {
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed(); long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addWritingBytes(this, ramBytesUsed); indexingMemoryController.addWritingBytes(this, ramBytesUsed);
try { try {
if (logger.isTraceEnabled()) { logger.debug("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed);
logger.trace("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed);
}
long time = System.nanoTime(); long time = System.nanoTime();
getEngine().refresh(source); getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time); refreshMetric.inc(System.nanoTime() - time);
@ -1209,6 +1207,22 @@ public class IndexShard extends AbstractIndexShardComponent {
return indexEventListener; return indexEventListener;
} }
public void activateThrottling() {
try {
getEngine().activateThrottling();
} catch (EngineClosedException ex) {
// ignore
}
}
public void deactivateThrottling() {
try {
getEngine().deactivateThrottling();
} catch (EngineClosedException ex) {
// ignore
}
}
/** /**
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk. * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
*/ */

View File

@ -70,6 +70,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final TimeValue inactiveTime; private final TimeValue inactiveTime;
private final TimeValue interval; private final TimeValue interval;
/** Contains shards currently being throttled because we can't write segments quickly enough */
private final Set<IndexShard> throttled = new HashSet<>();
private volatile ScheduledFuture scheduler; private volatile ScheduledFuture scheduler;
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
@ -77,10 +80,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
private final ShardsIndicesStatusChecker statusChecker; private final ShardsIndicesStatusChecker statusChecker;
/** How many bytes we are currently moving to disk by the engine to refresh */ /** Maps each shard to how many bytes it is currently, asynchronously, writing to disk */
private final AtomicLong bytesRefreshingNow = new AtomicLong(); private final Map<IndexShard,Long> writingBytes = new ConcurrentHashMap<>();
private final Map<IndexShard,Long> refreshingBytes = new ConcurrentHashMap<>();
@Inject @Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
@ -124,16 +125,18 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval); SHARD_MEMORY_INTERVAL_TIME_SETTING, this.interval);
} }
/** Shard calls this to notify us that this many bytes are being asynchronously moved from RAM to disk */ /** Shard calls this when it starts writing its indexing buffer to disk to notify us */
public void addWritingBytes(IndexShard shard, long numBytes) { public void addWritingBytes(IndexShard shard, long numBytes) {
refreshingBytes.put(shard, numBytes); writingBytes.put(shard, numBytes);
logger.debug("IMC: add writing bytes for {}, {} MB", shard.shardId(), numBytes/1024./1024.);
} }
/** Shard calls this to notify us that this many bytes are are done being asynchronously moved from RAM to disk */ /** Shard calls when it's done writing these bytes to disk */
public void removeWritingBytes(IndexShard shard, long numBytes) { public void removeWritingBytes(IndexShard shard, long numBytes) {
// nocommit this can fail, if two refreshes are running "concurrently" // nocommit this can fail, if two refreshes are running "concurrently"
Long result = refreshingBytes.remove(shard); Long result = writingBytes.remove(shard);
assert result != null; assert result != null;
logger.debug("IMC: clear writing bytes for {}", shard.shardId());
} }
@Override @Override
@ -189,7 +192,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
return shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state()); return shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());
} }
/** check if any shards active status changed, now. */ /** used by tests to check if any shards active status changed, now. */
public void forceCheck() { public void forceCheck() {
statusChecker.run(); statusChecker.run();
} }
@ -224,12 +227,11 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Shard calls this on each indexing/delete op */ /** Shard calls this on each indexing/delete op */
public synchronized void bytesWritten(int bytes) { public synchronized void bytesWritten(int bytes) {
bytesWrittenSinceCheck += bytes; bytesWrittenSinceCheck += bytes;
if (bytesWrittenSinceCheck > indexingBuffer.bytes()/20) { if (bytesWrittenSinceCheck > indexingBuffer.bytes()/30) {
// NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
// typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
// thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
// processed by indexing: // processed by indexing:
System.out.println(((System.currentTimeMillis() - startMS)/1000.0) + ": NOW CHECK xlog=" + bytesWrittenSinceCheck);
run(); run();
} }
} }
@ -237,69 +239,101 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
@Override @Override
public synchronized void run() { public synchronized void run() {
// nocommit add defensive try/catch-everything here? bad if an errant EngineClosedExc kills off this thread!! // NOTE: even if we hit an errant exc here, our ThreadPool.scheduledWithFixedDelay will log the exception and re-invoke us
// again, on schedule
// Fast check to sum up how much heap all shards' indexing buffers are using now: // First pass to sum up how much heap all shards' indexing buffers are using now, and how many bytes they are currently moving
// to disk:
long totalBytesUsed = 0; long totalBytesUsed = 0;
long totalBytesWriting = 0;
for (IndexShard shard : availableShards()) { for (IndexShard shard : availableShards()) {
// Give shard a chance to transition to inactive so sync'd flush can happen: // Give shard a chance to transition to inactive so sync'd flush can happen:
checkIdle(shard, inactiveTime.nanos()); checkIdle(shard, inactiveTime.nanos());
// nocommit explain why order is important here! // How many bytes this shard is currently (async'd) moving from heap to disk:
Long bytes = refreshingBytes.get(shard); Long shardWritingBytes = writingBytes.get(shard);
// How many heap bytes this shard is currently using
long shardBytesUsed = getIndexBufferRAMBytesUsed(shard); long shardBytesUsed = getIndexBufferRAMBytesUsed(shard);
if (bytes != null) { if (shardWritingBytes != null) {
// Only count up bytes not already being refreshed: shardBytesUsed -= shardWritingBytes;
shardBytesUsed -= bytes; totalBytesWriting += shardWritingBytes;
// If the refresh completed just after we pulled refreshingBytes and before we pulled index buffer bytes, then we could // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could
// have a negative value here: // have a negative value here. So we just skip this shard since that means it's now using very little heap:
if (shardBytesUsed < 0) { if (shardBytesUsed < 0) {
continue; continue;
} }
} }
totalBytesUsed += shardBytesUsed; totalBytesUsed += shardBytesUsed;
System.out.println("IMC: " + shard.shardId() + " using " + (shardBytesUsed/1024./1024.) + " MB");
} }
System.out.println(((System.currentTimeMillis() - startMS)/1000.0) + ": TOT=" + totalBytesUsed + " vs " + indexingBuffer.bytes()); if (logger.isTraceEnabled()) {
logger.trace("total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]",
new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING, indexingBuffer, new ByteSizeValue(totalBytesWriting));
}
if (totalBytesUsed - bytesRefreshingNow.get() > indexingBuffer.bytes()) { if (totalBytesUsed > indexingBuffer.bytes()) {
// OK we are using too much; make a queue and ask largest shard(s) to refresh: // OK we are now over-budget; fill the priority queue and ask largest shard(s) to refresh:
logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer); logger.debug("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]",
new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING, indexingBuffer, new ByteSizeValue(totalBytesWriting));
PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>(); PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
for (IndexShard shard : availableShards()) {
// nocommit explain why order is important here!
Long bytes = refreshingBytes.get(shard);
for (IndexShard shard : availableShards()) {
// How many bytes this shard is currently (async'd) moving from heap to disk:
Long shardWritingBytes = writingBytes.get(shard);
// How many heap bytes this shard is currently using
long shardBytesUsed = getIndexBufferRAMBytesUsed(shard); long shardBytesUsed = getIndexBufferRAMBytesUsed(shard);
if (bytes != null) { if (shardWritingBytes != null) {
// Only count up bytes not already being refreshed: // Only count up bytes not already being refreshed:
shardBytesUsed -= bytes; shardBytesUsed -= shardWritingBytes;
// If the refresh completed just after we pulled refreshingBytes and before we pulled index buffer bytes, then we could // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could
// have a negative value here: // have a negative value here. So we just skip this shard since that means it's now using very little heap:
if (shardBytesUsed < 0) { if (shardBytesUsed < 0) {
continue; continue;
} }
} }
if (shardBytesUsed > 0) { if (shardBytesUsed > 0) {
if (logger.isTraceEnabled()) {
if (shardWritingBytes != null) {
logger.trace("shard [{}] is using [{}] heap, writing [{}] heap", shard.shardId(), shardBytesUsed, shardWritingBytes);
} else {
logger.trace("shard [{}] is using [{}] heap, not writing any bytes", shard.shardId(), shardBytesUsed);
}
}
queue.add(new ShardAndBytesUsed(shardBytesUsed, shard)); queue.add(new ShardAndBytesUsed(shardBytesUsed, shard));
} }
} }
// If we are using more than 50% of our budget across both indexing buffer and bytes we are moving to disk, then we now
// throttle the top shards to give back-pressure:
boolean doThrottle = (totalBytesWriting + totalBytesUsed) > 1.5 * indexingBuffer.bytes();
while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) { while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) {
ShardAndBytesUsed largest = queue.poll(); ShardAndBytesUsed largest = queue.poll();
System.out.println("IMC: write " + largest.shard.shardId() + ": " + (largest.bytesUsed/1024./1024.) + " MB"); logger.debug("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed));
logger.debug("refresh shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed));
writeIndexingBufferAsync(largest.shard); writeIndexingBufferAsync(largest.shard);
totalBytesUsed -= largest.bytesUsed; totalBytesUsed -= largest.bytesUsed;
if (doThrottle && throttled.contains(largest.shard) == false) {
logger.info("now throttling indexing for shard [{}]: segment writing can't keep up", largest.shard.shardId());
throttled.add(largest.shard);
largest.shard.activateThrottling();
}
}
if (doThrottle == false) {
for(IndexShard shard : throttled) {
logger.info("stop throttling indexing for shard [{}]", shard.shardId());
shard.deactivateThrottling();
}
throttled.clear();
} }
} }