Improve how versioning are read to reduce (deleted) open file handles, closes #1230.

This commit is contained in:
Shay Banon 2011-08-11 12:25:56 +03:00
parent fc6e0dd037
commit 0d07d5b91f
1 changed files with 16 additions and 41 deletions

View File

@ -79,7 +79,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -146,8 +145,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private volatile int disableFlushCounter = 0;
// indexing searcher is initialized
private final AtomicReference<Searcher> indexingSearcher = new AtomicReference<Searcher>();
private final AtomicBoolean flushing = new AtomicBoolean();
private final ConcurrentMap<String, VersionValue> versionMap;
@ -266,10 +263,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
translog.newTranslog(translogIdGenerator.get());
this.nrtResource = buildNrtResource(indexWriter);
if (indexingSearcher.get() != null) {
indexingSearcher.set(null);
indexingSearcher.get().release();
}
SegmentInfos infos = new SegmentInfos();
infos.read(store.directory());
lastCommittedSegmentInfos = infos;
@ -871,10 +864,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void refreshVersioningTable(long time) {
// we need to refresh in order to clear older version values
refresh(new Refresh(true).force(true));
Searcher searcher = indexingSearcher.get();
if (searcher != null) {
indexingSearcher.set(null);
}
for (Map.Entry<String, VersionValue> entry : versionMap.entrySet()) {
String id = entry.getKey();
synchronized (dirtyLock(id)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
@ -894,9 +883,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
}
if (searcher != null) {
searcher.release();
}
}
@Override public void maybeMerge() throws EngineException {
@ -1157,10 +1143,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
this.versionMap.clear();
this.failedEngineListeners.clear();
try {
if (indexingSearcher.get() != null) {
indexingSearcher.get().release();
indexingSearcher.set(null);
}
if (nrtResource != null) {
this.nrtResource.forceClose();
}
@ -1194,18 +1176,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private long loadCurrentVersionFromIndex(Term uid) {
UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(uid.text());
// no version, get the version from the index
Searcher searcher = indexingSearcher.get();
if (searcher == null) {
Searcher tmpSearcher = searcher();
if (!indexingSearcher.compareAndSet(null, tmpSearcher)) {
// someone beat us to it, release the one we got
tmpSearcher.release();
}
// it must have a value, since someone set it already, and this code gets called
// under a readLock, while the indexSearcher gets nullified on a writeLock
searcher = indexingSearcher.get();
}
Searcher searcher = searcher();
try {
for (IndexReader reader : searcher.searcher().subReaders()) {
BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter);
// we know that its not there...
@ -1219,6 +1191,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
return -1;
} finally {
searcher.release();
}
}
private IndexWriter createWriter() throws IOException {