diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index d6dd31481db..079bd277750 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -478,6 +478,12 @@ public abstract class Engine implements Closeable { */ public abstract void refresh(String source) throws EngineException; + /** + * Called when our engine is using too much heap and should move buffered indexed/deleted documents to disk. + */ + // NOTE: do NOT rename this to something containing flush or refresh! + public abstract void writeIndexingBuffer() throws EngineException; + /** * Flushes the state of the engine including the transaction log, clearing memory. * diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2aed653d02e..7b46dd3cc9a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -506,6 +506,45 @@ public class InternalEngine extends Engine { mergeScheduler.refreshConfig(); } + @Override + public void writeIndexingBuffer() throws EngineException { + + // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two + // searcher managers, one for searching which is only refreshed by the schedule the user asks for, and another for version + // map interactions: + boolean useRefresh = versionMapRefreshPending.get() || (indexWriter.ramBytesUsed()/4 < versionMap.ramBytesUsedForRefresh()); + + // we obtain a read lock here, since we don't want a flush to happen while we are refreshing + // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + if (useRefresh) { + // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears + searcherManager.maybeRefreshBlocking(); + } else { + // Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush: + indexWriter.flush(); + } + } catch (AlreadyClosedException e) { + ensureOpen(); + maybeFailEngine("writeIndexingBuffer", e); + } catch (EngineClosedException e) { + throw e; + } catch (Throwable t) { + failEngine("writeIndexingBuffer failed", t); + throw new RefreshFailedEngineException(shardId, t); + } + + // TODO: maybe we should just put a scheduled job in threadPool? + // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes + // for a long time: + if (useRefresh) { + maybePruneDeletedTombstones(); + versionMapRefreshPending.set(false); + mergeScheduler.refreshConfig(); + } + } + @Override public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException { // best effort attempt before we acquire locks diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index dad1c5e09f2..8a013a31792 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -231,4 +231,10 @@ public class ShadowEngine extends Engine { // No IndexWriter nor version map throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } + + @Override + public void writeIndexingBuffer() { + // No indexing buffer + throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fcf204603f5..cf0296ef461 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -537,7 +537,7 @@ public class IndexShard extends AbstractIndexShardComponent { verifyNotClosed(); // nocommit OK to throw EngineClosedExc? long ramBytesUsed = getEngine().indexBufferRAMBytesUsed(); - indexingMemoryController.addRefreshingBytes(this, ramBytesUsed); + indexingMemoryController.addWritingBytes(this, ramBytesUsed); try { if (logger.isTraceEnabled()) { logger.trace("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed); @@ -546,7 +546,7 @@ public class IndexShard extends AbstractIndexShardComponent { getEngine().refresh(source); refreshMetric.inc(System.nanoTime() - time); } finally { - indexingMemoryController.removeRefreshingBytes(this, ramBytesUsed); + indexingMemoryController.removeWritingBytes(this, ramBytesUsed); } } @@ -1210,17 +1210,42 @@ public class IndexShard extends AbstractIndexShardComponent { } /** - * Asynchronously refreshes the engine for new search operations to reflect the latest - * changes. + * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk. */ - public void refreshAsync(final String reason) { - engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() { + public void writeIndexingBufferAsync() { + threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { @Override public void run() { try { - refresh(reason); + Engine engine = getEngine(); + long bytes = engine.indexBufferRAMBytesUsed(); + // NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, but this is fine because + // after the writes finish, IMC will poll again and see that there's still up to the 20% being used and continue + // writing if necessary: + indexingMemoryController.addWritingBytes(IndexShard.this, bytes); + try { + getEngine().writeIndexingBuffer(); + } finally { + indexingMemoryController.removeWritingBytes(IndexShard.this, bytes); + } } catch (EngineClosedException ex) { // ignore + } catch (RefreshFailedEngineException e) { + if (e.getCause() instanceof InterruptedException) { + // ignore, we are being shutdown + } else if (e.getCause() instanceof ClosedByInterruptException) { + // ignore, we are being shutdown + } else if (e.getCause() instanceof ThreadInterruptedException) { + // ignore, we are being shutdown + } else { + if (state != IndexShardState.CLOSED) { + logger.warn("Failed to perform scheduled engine refresh", e); + } + } + } catch (Exception e) { + if (state != IndexShardState.CLOSED) { + logger.warn("Failed to perform scheduled engine refresh", e); + } } } }); diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 53c549efc77..a699dabcfff 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -124,11 +124,14 @@ public class IndexingMemoryController extends AbstractLifecycleComponent indexingBuffer.bytes()/20) { // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is - // typically smaller. But this logic is here only as a safety against thread starvation or too infrequent checking, - // to ensure we are still checking in proportion to bytes processed by indexing: + // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against + // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes + // processed by indexing: System.out.println(((System.currentTimeMillis() - startMS)/1000.0) + ": NOW CHECK xlog=" + bytesWrittenSinceCheck); run(); } @@ -293,7 +298,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent