From 05b2623c3823037bbd223593a81b0bfa3299a40d Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Fri, 9 May 2014 23:35:06 +0000 Subject: [PATCH] LUCENE-5644: favor an already initialized ThreadState git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1593651 13f79535-47bb-0310-9956-ffa450edef68 --- .../index/DocumentsWriterPerThreadPool.java | 21 ++++++ .../apache/lucene/index/SegmentReader.java | 25 ++++--- .../TestIndexWriterThreadsToSegments.java | 75 +++++++++++++++++++ 3 files changed, 110 insertions(+), 11 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index b113841b273..bd8cfa63a09 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -263,6 +263,27 @@ final class DocumentsWriterPerThreadPool implements Cloneable { // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a // limited number of thread states: threadState = freeList[freeCount-1]; + + if (threadState.dwpt == null) { + // This thread-state is not initialized, e.g. it + // was just flushed. See if we can instead find + // another free thread state that already has docs + // indexed. This way if incoming thread concurrency + // has decreased, we don't leave docs + // indefinitely buffered, tying up RAM. This + // will instead get those thread states flushed, + // freeing up RAM for larger segment flushes: + for(int i=0;i @@ -218,7 +219,9 @@ public final class SegmentReader extends AtomicReader { try { final String segmentSuffix = info.getFieldInfosGen() == -1 ? "" : Long.toString(info.getFieldInfosGen(), Character.MAX_RADIX); - return info.info.getCodec().fieldInfosFormat().getFieldInfosReader().read(dir, info.info.name, segmentSuffix, IOContext.READONCE); + Codec codec = info.info.getCodec(); + FieldInfosFormat fisFormat = codec.fieldInfosFormat(); + return fisFormat.getFieldInfosReader().read(dir, info.info.name, segmentSuffix, IOContext.READONCE); } finally { if (closeDir) { dir.close(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterThreadsToSegments.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterThreadsToSegments.java index ee98df81c11..72102e39304 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterThreadsToSegments.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterThreadsToSegments.java @@ -19,16 +19,21 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoFormat; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.TextField; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; @@ -266,4 +271,74 @@ public class TestIndexWriterThreadsToSegments extends LuceneTestCase { } dir.close(); } + + public void testDocsStuckInRAMForever() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + iwc.setRAMBufferSizeMB(.2); + Codec codec = Codec.forName("Lucene46"); + iwc.setCodec(codec); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + final IndexWriter w = new IndexWriter(dir, iwc); + final CountDownLatch startingGun = new CountDownLatch(1); + Thread[] threads = new Thread[2]; + for(int i=0;i segSeen = new HashSet<>(); + int thread0Count = 0; + int thread1Count = 0; + + // At this point the writer should have 2 thread states w/ docs; now we index with only 1 thread until we see all 1000 thread0 & thread1 + // docs flushed. If the writer incorrectly holds onto previously indexed docs forever then this will run forever: + while (thread0Count < 1000 || thread1Count < 1000) { + Document doc = new Document(); + doc.add(newStringField("field", "threadIDmain", Field.Store.NO)); + w.addDocument(doc); + + for(String fileName : dir.listAll()) { + if (fileName.endsWith(".si")) { + String segName = IndexFileNames.parseSegmentName(fileName); + if (segSeen.contains(segName) == false) { + segSeen.add(segName); + SegmentInfo si = new Lucene46SegmentInfoFormat().getSegmentInfoReader().read(dir, segName, IOContext.DEFAULT); + si.setCodec(codec); + SegmentCommitInfo sci = new SegmentCommitInfo(si, 0, -1, -1); + SegmentReader sr = new SegmentReader(sci, IOContext.DEFAULT); + try { + thread0Count += sr.docFreq(new Term("field", "threadID0")); + thread1Count += sr.docFreq(new Term("field", "threadID1")); + } finally { + sr.close(); + } + } + } + } + } + + w.close(); + dir.close(); + } }