From 57cd8f765ff74cc6a620f23f8c658bba6a3e1eee Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 8 Jul 2014 21:46:01 +0200 Subject: [PATCH] [ENGINE] Prevent NPE if engine is closed while version map is checked We check if the version map needs to be refreshed after we released the readlock which can cause the the engine being closed before we read the value from the volatile `indexWriter` field which can cause an NPE on the indexing thread. This commit also fixes a potential uncaught exception if the refresh failed due to the engine being already closed. Relates to #6443 Closes #6786 --- .../index/engine/internal/InternalEngine.java | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 49fe481a896..83e8507da27 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; @@ -79,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -385,8 +387,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public void create(Create create) throws EngineException { + final IndexWriter writer; try (InternalLock _ = readLock.acquire()) { - IndexWriter writer = this.indexWriter; + writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); } @@ -400,7 +403,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin maybeFailEngine(t); throw new CreateFailedEngineException(shardId, create, t); } - checkVersionMapRefresh(); + checkVersionMapRefresh(writer); } private void maybeFailEngine(Throwable t) { @@ -480,8 +483,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public void index(Index index) throws EngineException { + final IndexWriter writer; try (InternalLock _ = readLock.acquire()) { - IndexWriter writer = this.indexWriter; + writer = this.indexWriter; if (writer == null) { throw new EngineClosedException(shardId, failedEngine); } @@ -495,19 +499,30 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin maybeFailEngine(t); throw new IndexFailedEngineException(shardId, index, t); } - checkVersionMapRefresh(); + checkVersionMapRefresh(writer); } - /** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). */ - private void checkVersionMapRefresh() { + /** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). + * */ + private void checkVersionMapRefresh(final IndexWriter indexWriter) { // TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable? - if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25*this.indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) { - // Now refresh to clear versionMap: - threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { - public void run() { - refresh(new Refresh("version_table_full")); - } - }); + if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25 * indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) { + if (!closed) { + try { + // Now refresh to clear versionMap: + threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() { + public void run() { + try { + refresh(new Refresh("version_table_full")); + } catch (EngineClosedException ex) { + // ignore + } + } + }); + } catch (EsRejectedExecutionException ex) { + // that is fine too.. we might be shutting down + } + } } }