diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 079bd277750..410d2af8d0b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1087,4 +1087,14 @@ public abstract class Engine implements Closeable { */ void warm(Engine.Searcher searcher, boolean isTopLevelReader); } + + /** + * Request that this engine throttle incoming indexing requests to one thread. Must be matched by a later call to {@link deactivateThrottling}. + */ + public abstract void activateThrottling(); + + /** + * Reverses a previous {@link #activateThrottling} call. + */ + public abstract void deactivateThrottling(); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7b46dd3cc9a..486920dc750 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.math.MathUtils; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -510,9 +511,12 @@ public class InternalEngine extends Engine { public void writeIndexingBuffer() throws EngineException { // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two - // searcher managers, one for searching which is only refreshed by the schedule the user asks for, and another for version - // map interactions: - boolean useRefresh = versionMapRefreshPending.get() || (indexWriter.ramBytesUsed()/4 < versionMap.ramBytesUsedForRefresh()); + // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking + // refresh API), and another for version map interactions: + long versionMapBytes = versionMap.ramBytesUsedForRefresh(); + long indexingBufferBytes = indexWriter.ramBytesUsed(); + + boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes); // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) @@ -520,9 +524,13 @@ public class InternalEngine extends Engine { ensureOpen(); if (useRefresh) { // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears - searcherManager.maybeRefreshBlocking(); + logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", + new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); + refresh("write indexing buffer"); } else { // Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush: + logger.debug("use flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])", + new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); indexWriter.flush(); } } catch (AlreadyClosedException e) { @@ -1043,12 +1051,24 @@ public class InternalEngine extends Engine { } } + private final AtomicInteger throttleRequestCount = new AtomicInteger(); + + @Override public void activateThrottling() { - throttle.activate(); + int count = throttleRequestCount.incrementAndGet(); + assert count >= 1; + if (count == 1) { + throttle.activate(); + } } + @Override public void deactivateThrottling() { - throttle.deactivate(); + int count = throttleRequestCount.decrementAndGet(); + assert count >= 0; + if (count == 0) { + throttle.deactivate(); + } } long getGcDeletesInMillis() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 8a013a31792..286b06e69f9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -237,4 +237,14 @@ public class ShadowEngine extends Engine { // No indexing buffer throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } + + @Override + public void activateThrottling() { + throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); + } + + @Override + public void deactivateThrottling() { + throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cf0296ef461..3ac6d1ceefa 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -539,9 +539,7 @@ public class IndexShard extends AbstractIndexShardComponent { long ramBytesUsed = getEngine().indexBufferRAMBytesUsed(); indexingMemoryController.addWritingBytes(this, ramBytesUsed); try { - if (logger.isTraceEnabled()) { - logger.trace("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed); - } + logger.debug("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed); long time = System.nanoTime(); getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); @@ -1209,6 +1207,22 @@ public class IndexShard extends AbstractIndexShardComponent { return indexEventListener; } + public void activateThrottling() { + try { + getEngine().activateThrottling(); + } catch (EngineClosedException ex) { + // ignore + } + } + + public void deactivateThrottling() { + try { + getEngine().deactivateThrottling(); + } catch (EngineClosedException ex) { + // ignore + } + } + /** * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk. */ diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index a699dabcfff..c3199426b13 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -70,6 +70,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent throttled = new HashSet<>(); + private volatile ScheduledFuture scheduler; private static final EnumSet CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( @@ -77,10 +80,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent refreshingBytes = new ConcurrentHashMap<>(); + /** Maps each shard to how many bytes it is currently, asynchronously, writing to disk */ + private final Map writingBytes = new ConcurrentHashMap<>(); @Inject public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { @@ -124,16 +125,18 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexingBuffer.bytes()/20) { + if (bytesWrittenSinceCheck > indexingBuffer.bytes()/30) { // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes // processed by indexing: - System.out.println(((System.currentTimeMillis() - startMS)/1000.0) + ": NOW CHECK xlog=" + bytesWrittenSinceCheck); run(); } } @@ -237,69 +239,101 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexingBuffer.bytes()) { - // OK we are using too much; make a queue and ask largest shard(s) to refresh: - logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer); + if (totalBytesUsed > indexingBuffer.bytes()) { + // OK we are now over-budget; fill the priority queue and ask largest shard(s) to refresh: + logger.debug("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]", + new ByteSizeValue(totalBytesUsed), INDEX_BUFFER_SIZE_SETTING, indexingBuffer, new ByteSizeValue(totalBytesWriting)); PriorityQueue queue = new PriorityQueue<>(); - for (IndexShard shard : availableShards()) { - // nocommit explain why order is important here! - Long bytes = refreshingBytes.get(shard); + for (IndexShard shard : availableShards()) { + // How many bytes this shard is currently (async'd) moving from heap to disk: + Long shardWritingBytes = writingBytes.get(shard); + + // How many heap bytes this shard is currently using long shardBytesUsed = getIndexBufferRAMBytesUsed(shard); - if (bytes != null) { + if (shardWritingBytes != null) { // Only count up bytes not already being refreshed: - shardBytesUsed -= bytes; + shardBytesUsed -= shardWritingBytes; - // If the refresh completed just after we pulled refreshingBytes and before we pulled index buffer bytes, then we could - // have a negative value here: + // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could + // have a negative value here. So we just skip this shard since that means it's now using very little heap: if (shardBytesUsed < 0) { continue; } } if (shardBytesUsed > 0) { + if (logger.isTraceEnabled()) { + if (shardWritingBytes != null) { + logger.trace("shard [{}] is using [{}] heap, writing [{}] heap", shard.shardId(), shardBytesUsed, shardWritingBytes); + } else { + logger.trace("shard [{}] is using [{}] heap, not writing any bytes", shard.shardId(), shardBytesUsed); + } + } queue.add(new ShardAndBytesUsed(shardBytesUsed, shard)); } } + // If we are using more than 50% of our budget across both indexing buffer and bytes we are moving to disk, then we now + // throttle the top shards to give back-pressure: + boolean doThrottle = (totalBytesWriting + totalBytesUsed) > 1.5 * indexingBuffer.bytes(); + while (totalBytesUsed > indexingBuffer.bytes() && queue.isEmpty() == false) { ShardAndBytesUsed largest = queue.poll(); - System.out.println("IMC: write " + largest.shard.shardId() + ": " + (largest.bytesUsed/1024./1024.) + " MB"); - logger.debug("refresh shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed)); + logger.debug("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed)); writeIndexingBufferAsync(largest.shard); totalBytesUsed -= largest.bytesUsed; + if (doThrottle && throttled.contains(largest.shard) == false) { + logger.info("now throttling indexing for shard [{}]: segment writing can't keep up", largest.shard.shardId()); + throttled.add(largest.shard); + largest.shard.activateThrottling(); + } + } + + if (doThrottle == false) { + for(IndexShard shard : throttled) { + logger.info("stop throttling indexing for shard [{}]", shard.shardId()); + shard.deactivateThrottling(); + } + throttled.clear(); } }