mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 18:35:25 +00:00
[ENGINE] try increment store before searcher is acquired
InternalEngine#refreshNeeded must increment the ref count on the store used before it's checking if the searcher is current since internally a searcher ref is acquired and if that happens concurrently to a engine close it might violate the assumption that all files are closed when the store is closed. This commit also converts some try / finally into try / with.
This commit is contained in:
parent
508ff29e0d
commit
9f6d6d540b
@ -361,7 +361,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
}
|
}
|
||||||
|
|
||||||
// no version, get the version from the index, we know that we refresh on flush
|
// no version, get the version from the index, we know that we refresh on flush
|
||||||
Searcher searcher = acquireSearcher("get");
|
final Searcher searcher = acquireSearcher("get");
|
||||||
final Versions.DocIdAndVersion docIdAndVersion;
|
final Versions.DocIdAndVersion docIdAndVersion;
|
||||||
try {
|
try {
|
||||||
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||||
@ -737,15 +737,26 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean refreshNeeded() {
|
public boolean refreshNeeded() {
|
||||||
try {
|
if (store.tryIncRef()) {
|
||||||
// we are either dirty due to a document added or due to a
|
/*
|
||||||
// finished merge - either way we should refresh
|
we need to inc the store here since searcherManager.isSearcherCurrent()
|
||||||
return dirty || !searcherManager.isSearcherCurrent();
|
acquires a searcher internally and that might keep a file open on the
|
||||||
} catch (IOException e) {
|
store. this violates the assumption that all files are closed when
|
||||||
logger.error("failed to access searcher manager", e);
|
the store is closed so we need to make sure we increment it here
|
||||||
failEngine("failed to access searcher manager", e);
|
*/
|
||||||
throw new EngineException(shardId, "failed to access searcher manager", e);
|
try {
|
||||||
|
// we are either dirty due to a document added or due to a
|
||||||
|
// finished merge - either way we should refresh
|
||||||
|
return dirty || !searcherManager.isSearcherCurrent();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("failed to access searcher manager", e);
|
||||||
|
failEngine("failed to access searcher manager", e);
|
||||||
|
throw new EngineException(shardId, "failed to access searcher manager", e);
|
||||||
|
} finally {
|
||||||
|
store.decRef();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1158,8 +1169,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
public SegmentsStats segmentsStats() {
|
public SegmentsStats segmentsStats() {
|
||||||
try (InternalLock _ = readLock.acquire()) {
|
try (InternalLock _ = readLock.acquire()) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
Searcher searcher = acquireSearcher("segments_stats");
|
try (final Searcher searcher = acquireSearcher("segments_stats")) {
|
||||||
try {
|
|
||||||
SegmentsStats stats = new SegmentsStats();
|
SegmentsStats stats = new SegmentsStats();
|
||||||
for (AtomicReaderContext reader : searcher.reader().leaves()) {
|
for (AtomicReaderContext reader : searcher.reader().leaves()) {
|
||||||
stats.add(1, getReaderRamBytesUsed(reader));
|
stats.add(1, getReaderRamBytesUsed(reader));
|
||||||
@ -1168,8 +1178,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed());
|
stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed());
|
||||||
stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB()*1024*1024));
|
stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB()*1024*1024));
|
||||||
return stats;
|
return stats;
|
||||||
} finally {
|
|
||||||
searcher.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1355,11 +1363,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
}
|
}
|
||||||
|
|
||||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||||
Searcher searcher = acquireSearcher("load_version");
|
try (final Searcher searcher = acquireSearcher("load_version")) {
|
||||||
try {
|
|
||||||
return Versions.loadVersion(searcher.reader(), uid);
|
return Versions.loadVersion(searcher.reader(), uid);
|
||||||
} finally {
|
|
||||||
searcher.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1573,7 +1578,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
if (warmer != null) {
|
if (warmer != null) {
|
||||||
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
|
// we need to pass a custom searcher that does not release anything on Engine.Search Release,
|
||||||
// we will release explicitly
|
// we will release explicitly
|
||||||
Searcher currentSearcher = null;
|
|
||||||
IndexSearcher newSearcher = null;
|
IndexSearcher newSearcher = null;
|
||||||
boolean closeNewSearcher = false;
|
boolean closeNewSearcher = false;
|
||||||
try {
|
try {
|
||||||
@ -1581,30 +1585,31 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
// fresh index writer, just do on all of it
|
// fresh index writer, just do on all of it
|
||||||
newSearcher = searcher;
|
newSearcher = searcher;
|
||||||
} else {
|
} else {
|
||||||
currentSearcher = acquireSearcher("search_factory");
|
try (final Searcher currentSearcher = acquireSearcher("search_factory")) {
|
||||||
// figure out the newSearcher, with only the new readers that are relevant for us
|
// figure out the newSearcher, with only the new readers that are relevant for us
|
||||||
List<IndexReader> readers = Lists.newArrayList();
|
List<IndexReader> readers = Lists.newArrayList();
|
||||||
for (AtomicReaderContext newReaderContext : searcher.getIndexReader().leaves()) {
|
for (AtomicReaderContext newReaderContext : searcher.getIndexReader().leaves()) {
|
||||||
if (isMergedSegment(newReaderContext.reader())) {
|
if (isMergedSegment(newReaderContext.reader())) {
|
||||||
// merged segments are already handled by IndexWriterConfig.setMergedSegmentWarmer
|
// merged segments are already handled by IndexWriterConfig.setMergedSegmentWarmer
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (AtomicReaderContext currentReaderContext : currentSearcher.reader().leaves()) {
|
for (AtomicReaderContext currentReaderContext : currentSearcher.reader().leaves()) {
|
||||||
if (currentReaderContext.reader().getCoreCacheKey().equals(newReaderContext.reader().getCoreCacheKey())) {
|
if (currentReaderContext.reader().getCoreCacheKey().equals(newReaderContext.reader().getCoreCacheKey())) {
|
||||||
found = true;
|
found = true;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) {
|
||||||
|
readers.add(newReaderContext.reader());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!found) {
|
if (!readers.isEmpty()) {
|
||||||
readers.add(newReaderContext.reader());
|
// we don't want to close the inner readers, just increase ref on them
|
||||||
|
newSearcher = new IndexSearcher(new MultiReader(readers.toArray(new IndexReader[readers.size()]), false));
|
||||||
|
closeNewSearcher = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!readers.isEmpty()) {
|
|
||||||
// we don't want to close the inner readers, just increase ref on them
|
|
||||||
newSearcher = new IndexSearcher(new MultiReader(readers.toArray(new IndexReader[readers.size()]), false));
|
|
||||||
closeNewSearcher = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (newSearcher != null) {
|
if (newSearcher != null) {
|
||||||
@ -1618,7 +1623,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// no need to release the fullSearcher, nothing really is done...
|
// no need to release the fullSearcher, nothing really is done...
|
||||||
Releasables.close(currentSearcher);
|
|
||||||
if (newSearcher != null && closeNewSearcher) {
|
if (newSearcher != null && closeNewSearcher) {
|
||||||
IOUtils.closeWhileHandlingException(newSearcher.getIndexReader()); // ignore
|
IOUtils.closeWhileHandlingException(newSearcher.getIndexReader()); // ignore
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user