a few cleanups
This commit is contained in:
parent
cbb6463425
commit
485f4171bb
|
@ -103,9 +103,9 @@ public class InternalEngine extends Engine {
|
|||
|
||||
private final IndexThrottle throttle;
|
||||
|
||||
// How many callers are currently requesting index throttling. Currently there are only two times we do this: when merges are falling
|
||||
// behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling incoming
|
||||
// indexing ops to a single thread:
|
||||
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
|
||||
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
|
||||
// incoming indexing ops to a single thread:
|
||||
private final AtomicInteger throttleRequestCount = new AtomicInteger();
|
||||
|
||||
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
|
||||
|
@ -515,16 +515,17 @@ public class InternalEngine extends Engine {
|
|||
@Override
|
||||
public void writeIndexingBuffer() throws EngineException {
|
||||
|
||||
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
|
||||
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
|
||||
// refresh API), and another for version map interactions:
|
||||
long versionMapBytes = versionMap.ramBytesUsedForRefresh();
|
||||
long indexingBufferBytes = indexWriter.ramBytesUsed();
|
||||
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are writing
|
||||
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
|
||||
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
|
||||
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
|
||||
// refresh API), and another for version map interactions:
|
||||
long versionMapBytes = versionMap.ramBytesUsedForRefresh();
|
||||
long indexingBufferBytes = indexWriter.ramBytesUsed();
|
||||
|
||||
boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
|
||||
if (useRefresh) {
|
||||
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
|
||||
|
|
|
@ -544,8 +544,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
if (canIndex()) {
|
||||
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
|
||||
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
|
||||
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed));
|
||||
try {
|
||||
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed));
|
||||
long time = System.nanoTime();
|
||||
getEngine().refresh(source);
|
||||
refreshMetric.inc(System.nanoTime() - time);
|
||||
|
@ -1019,7 +1019,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
|
||||
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. */
|
||||
* indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */
|
||||
public void checkIdle(long inactiveTimeNS) {
|
||||
Engine engineOrNull = getEngineOrNull();
|
||||
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
|
||||
|
@ -1254,24 +1254,23 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
|
||||
*/
|
||||
public void writeIndexingBufferAsync() {
|
||||
if (canIndex() == false) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Engine engine = getEngine();
|
||||
if (canIndex()) {
|
||||
long bytes = engine.indexBufferRAMBytesUsed();
|
||||
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
|
||||
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
|
||||
// there's still up to the 20% being used and continue writing if necessary:
|
||||
indexingMemoryController.addWritingBytes(IndexShard.this, bytes);
|
||||
try {
|
||||
getEngine().writeIndexingBuffer();
|
||||
} finally {
|
||||
indexingMemoryController.removeWritingBytes(IndexShard.this, bytes);
|
||||
}
|
||||
} else {
|
||||
long bytes = engine.indexBufferRAMBytesUsed();
|
||||
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
|
||||
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
|
||||
// there's still up to the 20% being used and continue writing if necessary:
|
||||
indexingMemoryController.addWritingBytes(IndexShard.this, bytes);
|
||||
try {
|
||||
getEngine().writeIndexingBuffer();
|
||||
} finally {
|
||||
indexingMemoryController.removeWritingBytes(IndexShard.this, bytes);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
handleRefreshException(e);
|
||||
|
|
Loading…
Reference in New Issue