From 9f6d6d540bb626e57be8bb09b198bd16d9f45493 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 19 Sep 2014 00:32:53 +0200 Subject: [PATCH] [ENGINE] try increment store before searcher is acquired InternalEngine#refreshNeeded must increment the ref count on the store used before it's checking if the searcher is current since internally a searcher ref is acquired and if that happens concurrently to a engine close it might violate the assumption that all files are closed when the store is closed. This commit also converts some try / finally into try / with. --- .../index/engine/internal/InternalEngine.java | 82 ++++++++++--------- 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 42bbc1dd3cb..7777641b2b9 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -361,7 +361,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin } // no version, get the version from the index, we know that we refresh on flush - Searcher searcher = acquireSearcher("get"); + final Searcher searcher = acquireSearcher("get"); final Versions.DocIdAndVersion docIdAndVersion; try { docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid()); @@ -737,15 +737,26 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin @Override public boolean refreshNeeded() { - try { - // we are either dirty due to a document added or due to a - // finished merge - either way we should refresh - return dirty || !searcherManager.isSearcherCurrent(); - } catch (IOException e) { - logger.error("failed to access searcher manager", e); - failEngine("failed to access searcher manager", e); - throw new EngineException(shardId, "failed to access searcher manager", e); + if (store.tryIncRef()) { + /* + we need to inc the store here since searcherManager.isSearcherCurrent() + acquires a searcher internally and that might keep a file open on the + store. this violates the assumption that all files are closed when + the store is closed so we need to make sure we increment it here + */ + try { + // we are either dirty due to a document added or due to a + // finished merge - either way we should refresh + return dirty || !searcherManager.isSearcherCurrent(); + } catch (IOException e) { + logger.error("failed to access searcher manager", e); + failEngine("failed to access searcher manager", e); + throw new EngineException(shardId, "failed to access searcher manager", e); + } finally { + store.decRef(); + } } + return false; } @Override @@ -1158,8 +1169,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin public SegmentsStats segmentsStats() { try (InternalLock _ = readLock.acquire()) { ensureOpen(); - Searcher searcher = acquireSearcher("segments_stats"); - try { + try (final Searcher searcher = acquireSearcher("segments_stats")) { SegmentsStats stats = new SegmentsStats(); for (AtomicReaderContext reader : searcher.reader().leaves()) { stats.add(1, getReaderRamBytesUsed(reader)); @@ -1168,8 +1178,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed()); stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB()*1024*1024)); return stats; - } finally { - searcher.close(); } } } @@ -1355,11 +1363,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin } private long loadCurrentVersionFromIndex(Term uid) throws IOException { - Searcher searcher = acquireSearcher("load_version"); - try { + try (final Searcher searcher = acquireSearcher("load_version")) { return Versions.loadVersion(searcher.reader(), uid); - } finally { - searcher.close(); } } @@ -1573,7 +1578,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin if (warmer != null) { // we need to pass a custom searcher that does not release anything on Engine.Search Release, // we will release explicitly - Searcher currentSearcher = null; IndexSearcher newSearcher = null; boolean closeNewSearcher = false; try { @@ -1581,30 +1585,31 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin // fresh index writer, just do on all of it newSearcher = searcher; } else { - currentSearcher = acquireSearcher("search_factory"); - // figure out the newSearcher, with only the new readers that are relevant for us - List readers = Lists.newArrayList(); - for (AtomicReaderContext newReaderContext : searcher.getIndexReader().leaves()) { - if (isMergedSegment(newReaderContext.reader())) { - // merged segments are already handled by IndexWriterConfig.setMergedSegmentWarmer - continue; - } - boolean found = false; - for (AtomicReaderContext currentReaderContext : currentSearcher.reader().leaves()) { - if (currentReaderContext.reader().getCoreCacheKey().equals(newReaderContext.reader().getCoreCacheKey())) { - found = true; - break; + try (final Searcher currentSearcher = acquireSearcher("search_factory")) { + // figure out the newSearcher, with only the new readers that are relevant for us + List readers = Lists.newArrayList(); + for (AtomicReaderContext newReaderContext : searcher.getIndexReader().leaves()) { + if (isMergedSegment(newReaderContext.reader())) { + // merged segments are already handled by IndexWriterConfig.setMergedSegmentWarmer + continue; + } + boolean found = false; + for (AtomicReaderContext currentReaderContext : currentSearcher.reader().leaves()) { + if (currentReaderContext.reader().getCoreCacheKey().equals(newReaderContext.reader().getCoreCacheKey())) { + found = true; + break; + } + } + if (!found) { + readers.add(newReaderContext.reader()); } } - if (!found) { - readers.add(newReaderContext.reader()); + if (!readers.isEmpty()) { + // we don't want to close the inner readers, just increase ref on them + newSearcher = new IndexSearcher(new MultiReader(readers.toArray(new IndexReader[readers.size()]), false)); + closeNewSearcher = true; } } - if (!readers.isEmpty()) { - // we don't want to close the inner readers, just increase ref on them - newSearcher = new IndexSearcher(new MultiReader(readers.toArray(new IndexReader[readers.size()]), false)); - closeNewSearcher = true; - } } if (newSearcher != null) { @@ -1618,7 +1623,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin } } finally { // no need to release the fullSearcher, nothing really is done... - Releasables.close(currentSearcher); if (newSearcher != null && closeNewSearcher) { IOUtils.closeWhileHandlingException(newSearcher.getIndexReader()); // ignore }