diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 49fe481a896..83e8507da27 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; @@ -79,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -385,8 +387,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public void create(Create create) throws EngineException { + final IndexWriter writer; try (InternalLock _ = readLock.acquire()) { - IndexWriter writer = this.indexWriter; + writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); } @@ -400,7 +403,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin maybeFailEngine(t); throw new CreateFailedEngineException(shardId, create, t); } - checkVersionMapRefresh(); + checkVersionMapRefresh(writer); } private void maybeFailEngine(Throwable t) { @@ -480,8 +483,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public void index(Index index) throws EngineException { + final IndexWriter writer; try (InternalLock _ = readLock.acquire()) { - IndexWriter writer = this.indexWriter; + writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); } @@ -495,19 +499,30 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin maybeFailEngine(t); throw new IndexFailedEngineException(shardId, index, t); } - checkVersionMapRefresh(); + checkVersionMapRefresh(writer); } - /** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). */ - private void checkVersionMapRefresh() { + /** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). + * */ + private void checkVersionMapRefresh(final IndexWriter indexWriter) { // TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable? - if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25*this.indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) { - // Now refresh to clear versionMap: - threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { - public void run() { - refresh(new Refresh("version_table_full")); - } - }); + if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25 * indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) { + if (!closed) { + try { + // Now refresh to clear versionMap: + threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { + public void run() { + try { + refresh(new Refresh("version_table_full")); + } catch (EngineClosedException ex) { + // ignore + } + } + }); + } catch (EsRejectedExecutionException ex) { + // that is fine too.. we might be shutting down + } + } } }