From 0d07d5b91f910f9b4e36a1c1cb27b74ac66c1d74 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 11 Aug 2011 12:25:56 +0300 Subject: [PATCH] Improve how versioning are read to reduce (deleted) open file handles, closes #1230. --- .../index/engine/robin/RobinEngine.java | 57 ++++++------------- 1 file changed, 16 insertions(+), 41 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index b7a0bf6a899..a2a5bb423ff 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -79,7 +79,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -146,8 +145,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private volatile int disableFlushCounter = 0; // indexing searcher is initialized - private final AtomicReference indexingSearcher = new AtomicReference(); - private final AtomicBoolean flushing = new AtomicBoolean(); private final ConcurrentMap versionMap; @@ -266,10 +263,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } translog.newTranslog(translogIdGenerator.get()); this.nrtResource = buildNrtResource(indexWriter); - if (indexingSearcher.get() != null) { - indexingSearcher.set(null); - indexingSearcher.get().release(); - } SegmentInfos infos = new SegmentInfos(); infos.read(store.directory()); lastCommittedSegmentInfos = infos; @@ -871,10 +864,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private void refreshVersioningTable(long time) { // we need to refresh in order to clear older version values refresh(new Refresh(true).force(true)); - Searcher searcher = indexingSearcher.get(); - if (searcher != null) { - indexingSearcher.set(null); - } for (Map.Entry entry : versionMap.entrySet()) { String id = entry.getKey(); synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? @@ -894,9 +883,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } } - if (searcher != null) { - searcher.release(); - } } @Override public void maybeMerge() throws EngineException { @@ -1157,10 +1143,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { this.versionMap.clear(); this.failedEngineListeners.clear(); try { - if (indexingSearcher.get() != null) { - indexingSearcher.get().release(); - indexingSearcher.set(null); - } if (nrtResource != null) { this.nrtResource.forceClose(); } @@ -1194,31 +1176,24 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private long loadCurrentVersionFromIndex(Term uid) { UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(uid.text()); - // no version, get the version from the index - Searcher searcher = indexingSearcher.get(); - if (searcher == null) { - Searcher tmpSearcher = searcher(); - if (!indexingSearcher.compareAndSet(null, tmpSearcher)) { - // someone beat us to it, release the one we got - tmpSearcher.release(); + Searcher searcher = searcher(); + try { + for (IndexReader reader : searcher.searcher().subReaders()) { + BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter); + // we know that its not there... + if (!filter.isPresent(utf8.result, 0, utf8.length)) { + continue; + } + long version = UidField.loadVersion(reader, uid); + // either -2 (its there, but no version associated), or an actual version + if (version != -1) { + return version; + } } - // it must have a value, since someone set it already, and this code gets called - // under a readLock, while the indexSearcher gets nullified on a writeLock - searcher = indexingSearcher.get(); + return -1; + } finally { + searcher.release(); } - for (IndexReader reader : searcher.searcher().subReaders()) { - BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter); - // we know that its not there... - if (!filter.isPresent(utf8.result, 0, utf8.length)) { - continue; - } - long version = UidField.loadVersion(reader, uid); - // either -2 (its there, but no version associated), or an actual version - if (version != -1) { - return version; - } - } - return -1; } private IndexWriter createWriter() throws IOException {