diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 909c9a4c89d..839203885f8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -114,7 +115,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, private volatile int disableFlushCounter = 0; - private volatile Searcher postFlushSearcher; + // indexing searcher is initialized + private final AtomicReference indexingSearcher = new AtomicReference(); private final AtomicBoolean flushing = new AtomicBoolean(); @@ -190,10 +192,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, try { translog.newTranslog(newTransactionLogId()); this.nrtResource = buildNrtResource(indexWriter); - if (postFlushSearcher != null) { - postFlushSearcher.release(); + if (indexingSearcher.get() != null) { + indexingSearcher.get().release(); + indexingSearcher.set(null); } - postFlushSearcher = searcher(); } catch (IOException e) { try { indexWriter.rollback(); @@ -697,12 +699,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, dirty = true; // force a refresh // we need to do a refresh here so we sync versioning support refresh(new Refresh(true)); - if (postFlushSearcher != null) { - postFlushSearcher.release(); + if (indexingSearcher.get() != null) { + indexingSearcher.get().release(); + indexingSearcher.set(null); } - // only need to load for this flush version searcher, since we keep a map for all - // the changes since the previous flush in memory - postFlushSearcher = searcher(); } finally { rwl.writeLock().unlock(); flushing.set(false); @@ -854,9 +854,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, rwl.writeLock().lock(); this.versionMap.clear(); try { - if (postFlushSearcher != null) { - postFlushSearcher.release(); - postFlushSearcher = null; + if (indexingSearcher.get() != null) { + indexingSearcher.get().release(); + indexingSearcher.set(null); } if (nrtResource != null) { this.nrtResource.forceClose(); @@ -884,7 +884,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, private long loadCurrentVersionFromIndex(Term uid) { UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(uid.text()); // no version, get the version from the index - Searcher searcher = postFlushSearcher; + Searcher searcher = indexingSearcher.get(); + if (searcher == null) { + Searcher tmpSearcher = searcher(); + if (!indexingSearcher.compareAndSet(null, tmpSearcher)) { + // someone beat us to it, release the one we got + tmpSearcher.release(); + } + // it must have a value, since someone set it already, and this code gets called + // under a readLock, while the indexSearcher gets nullified on a writeLock + searcher = indexingSearcher.get(); + } for (IndexReader reader : searcher.searcher().subReaders()) { BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter); // we know that its not there...