[ENGINE] Prevent NPE if engine is closed while version map is checked
We check if the version map needs to be refreshed after we released the readlock which can cause the the engine being closed before we read the value from the volatile `indexWriter` field which can cause an NPE on the indexing thread. This commit also fixes a potential uncaught exception if the refresh failed due to the engine being already closed. Relates to #6443 Closes #6786
This commit is contained in:
parent
b97b670011
commit
57cd8f765f
|
@ -52,6 +52,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
import org.elasticsearch.index.codec.CodecService;
|
import org.elasticsearch.index.codec.CodecService;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
|
@ -79,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -385,8 +387,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void create(Create create) throws EngineException {
|
public void create(Create create) throws EngineException {
|
||||||
|
final IndexWriter writer;
|
||||||
try (InternalLock _ = readLock.acquire()) {
|
try (InternalLock _ = readLock.acquire()) {
|
||||||
IndexWriter writer = this.indexWriter;
|
writer = this.indexWriter;
|
||||||
if (writer == null) {
|
if (writer == null) {
|
||||||
throw new EngineClosedException(shardId, failedEngine);
|
throw new EngineClosedException(shardId, failedEngine);
|
||||||
}
|
}
|
||||||
|
@ -400,7 +403,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
maybeFailEngine(t);
|
maybeFailEngine(t);
|
||||||
throw new CreateFailedEngineException(shardId, create, t);
|
throw new CreateFailedEngineException(shardId, create, t);
|
||||||
}
|
}
|
||||||
checkVersionMapRefresh();
|
checkVersionMapRefresh(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeFailEngine(Throwable t) {
|
private void maybeFailEngine(Throwable t) {
|
||||||
|
@ -480,8 +483,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void index(Index index) throws EngineException {
|
public void index(Index index) throws EngineException {
|
||||||
|
final IndexWriter writer;
|
||||||
try (InternalLock _ = readLock.acquire()) {
|
try (InternalLock _ = readLock.acquire()) {
|
||||||
IndexWriter writer = this.indexWriter;
|
writer = this.indexWriter;
|
||||||
if (writer == null) {
|
if (writer == null) {
|
||||||
throw new EngineClosedException(shardId, failedEngine);
|
throw new EngineClosedException(shardId, failedEngine);
|
||||||
}
|
}
|
||||||
|
@ -495,19 +499,30 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
maybeFailEngine(t);
|
maybeFailEngine(t);
|
||||||
throw new IndexFailedEngineException(shardId, index, t);
|
throw new IndexFailedEngineException(shardId, index, t);
|
||||||
}
|
}
|
||||||
checkVersionMapRefresh();
|
checkVersionMapRefresh(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer). */
|
/** Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer).
|
||||||
private void checkVersionMapRefresh() {
|
* */
|
||||||
|
private void checkVersionMapRefresh(final IndexWriter indexWriter) {
|
||||||
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
|
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
|
||||||
if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25*this.indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) {
|
if (versionMap.ramBytesUsedForRefresh()/1024/1024. > 0.25 * indexWriter.getConfig().getRAMBufferSizeMB() && versionMapRefreshPending.getAndSet(true) == false) {
|
||||||
// Now refresh to clear versionMap:
|
if (!closed) {
|
||||||
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
|
try {
|
||||||
public void run() {
|
// Now refresh to clear versionMap:
|
||||||
refresh(new Refresh("version_table_full"));
|
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
|
||||||
}
|
public void run() {
|
||||||
});
|
try {
|
||||||
|
refresh(new Refresh("version_table_full"));
|
||||||
|
} catch (EngineClosedException ex) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (EsRejectedExecutionException ex) {
|
||||||
|
// that is fine too.. we might be shutting down
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue