From 485f4171bb9797b6dd81237bd0e87b52abe88261 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Thu, 17 Dec 2015 13:28:59 -0500 Subject: [PATCH] a few cleanups --- .../index/engine/InternalEngine.java | 21 ++++++++------- .../elasticsearch/index/shard/IndexShard.java | 27 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 098868e2b3f..ed8046a2676 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -103,9 +103,9 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; - // How many callers are currently requesting index throttling. Currently there are only two times we do this: when merges are falling - // behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling incoming - // indexing ops to a single thread: + // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges + // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling + // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException { @@ -515,16 +515,17 @@ public class InternalEngine extends Engine { @Override public void writeIndexingBuffer() throws EngineException { - // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two - // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking - // refresh API), and another for version map interactions: - long versionMapBytes = versionMap.ramBytesUsedForRefresh(); - long indexingBufferBytes = indexWriter.ramBytesUsed(); - - // we obtain a read lock here, since we don't want a flush to happen while we are refreshing + // we obtain a read lock here, since we don't want a flush to happen while we are writing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); + + // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two + // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking + // refresh API), and another for version map interactions: + long versionMapBytes = versionMap.ramBytesUsedForRefresh(); + long indexingBufferBytes = indexWriter.ramBytesUsed(); + boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes); if (useRefresh) { // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4f4e46c91f3..da7ebfb4ad8 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -544,8 +544,8 @@ public class IndexShard extends AbstractIndexShardComponent { if (canIndex()) { long ramBytesUsed = getEngine().indexBufferRAMBytesUsed(); indexingMemoryController.addWritingBytes(this, ramBytesUsed); - logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed)); try { + logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed)); long time = System.nanoTime(); getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); @@ -1019,7 +1019,7 @@ public class IndexShard extends AbstractIndexShardComponent { } /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last - * indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. */ + * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */ public void checkIdle(long inactiveTimeNS) { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { @@ -1254,24 +1254,23 @@ public class IndexShard extends AbstractIndexShardComponent { * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk. */ public void writeIndexingBufferAsync() { + if (canIndex() == false) { + throw new UnsupportedOperationException(); + } threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { @Override public void run() { try { Engine engine = getEngine(); - if (canIndex()) { - long bytes = engine.indexBufferRAMBytesUsed(); - // NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map - // memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that - // there's still up to the 20% being used and continue writing if necessary: - indexingMemoryController.addWritingBytes(IndexShard.this, bytes); - try { - getEngine().writeIndexingBuffer(); - } finally { - indexingMemoryController.removeWritingBytes(IndexShard.this, bytes); - } - } else { + long bytes = engine.indexBufferRAMBytesUsed(); + // NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map + // memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that + // there's still up to the 20% being used and continue writing if necessary: + indexingMemoryController.addWritingBytes(IndexShard.this, bytes); + try { getEngine().writeIndexingBuffer(); + } finally { + indexingMemoryController.removeWritingBytes(IndexShard.this, bytes); } } catch (Exception e) { handleRefreshException(e);