LUCENE-5644: favor an already initialized ThreadState

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1593651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2014-05-09 23:35:06 +00:00
parent e12039a377
commit 05b2623c38
3 changed files with 110 additions and 11 deletions

View File

@ -263,6 +263,27 @@ final class DocumentsWriterPerThreadPool implements Cloneable {
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a
// limited number of thread states:
threadState = freeList[freeCount-1];
if (threadState.dwpt == null) {
// This thread-state is not initialized, e.g. it
// was just flushed. See if we can instead find
// another free thread state that already has docs
// indexed. This way if incoming thread concurrency
// has decreased, we don't leave docs
// indefinitely buffered, tying up RAM. This
// will instead get those thread states flushed,
// freeing up RAM for larger segment flushes:
for(int i=0;i<freeCount;i++) {
if (freeList[i].dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
ThreadState ts = freeList[i];
freeList[i] = threadState;
threadState = ts;
break;
}
}
}
freeCount--;
break;
} else if (numThreadStatesActive < threadStates.length) {

View File

@ -17,9 +17,20 @@ package org.apache.lucene.index;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldInfosFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.index.FieldInfo.DocValuesType;
@ -31,16 +42,6 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.IOUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* IndexReader implementation over a single segment.
* <p>
@ -218,7 +219,9 @@ public final class SegmentReader extends AtomicReader {
try {
final String segmentSuffix = info.getFieldInfosGen() == -1 ? "" : Long.toString(info.getFieldInfosGen(), Character.MAX_RADIX);
return info.info.getCodec().fieldInfosFormat().getFieldInfosReader().read(dir, info.info.name, segmentSuffix, IOContext.READONCE);
Codec codec = info.info.getCodec();
FieldInfosFormat fisFormat = codec.fieldInfosFormat();
return fisFormat.getFieldInfosReader().read(dir, info.info.name, segmentSuffix, IOContext.READONCE);
} finally {
if (closeDir) {
dir.close();

View File

@ -19,16 +19,21 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoFormat;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@ -266,4 +271,74 @@ public class TestIndexWriterThreadsToSegments extends LuceneTestCase {
}
dir.close();
}
public void testDocsStuckInRAMForever() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
iwc.setRAMBufferSizeMB(.2);
Codec codec = Codec.forName("Lucene46");
iwc.setCodec(codec);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
final IndexWriter w = new IndexWriter(dir, iwc);
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[2];
for(int i=0;i<threads.length;i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int j=0;j<1000;j++) {
Document doc = new Document();
doc.add(newStringField("field", "threadID" + threadID, Field.Store.NO));
w.addDocument(doc);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread t : threads) {
t.join();
}
Set<String> segSeen = new HashSet<>();
int thread0Count = 0;
int thread1Count = 0;
// At this point the writer should have 2 thread states w/ docs; now we index with only 1 thread until we see all 1000 thread0 & thread1
// docs flushed. If the writer incorrectly holds onto previously indexed docs forever then this will run forever:
while (thread0Count < 1000 || thread1Count < 1000) {
Document doc = new Document();
doc.add(newStringField("field", "threadIDmain", Field.Store.NO));
w.addDocument(doc);
for(String fileName : dir.listAll()) {
if (fileName.endsWith(".si")) {
String segName = IndexFileNames.parseSegmentName(fileName);
if (segSeen.contains(segName) == false) {
segSeen.add(segName);
SegmentInfo si = new Lucene46SegmentInfoFormat().getSegmentInfoReader().read(dir, segName, IOContext.DEFAULT);
si.setCodec(codec);
SegmentCommitInfo sci = new SegmentCommitInfo(si, 0, -1, -1);
SegmentReader sr = new SegmentReader(sci, IOContext.DEFAULT);
try {
thread0Count += sr.docFreq(new Term("field", "threadID0"));
thread1Count += sr.docFreq(new Term("field", "threadID1"));
} finally {
sr.close();
}
}
}
}
}
w.close();
dir.close();
}
}