LUCENE-2293: make previously hardwired 5 max thread states settable, through IWC

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@923238 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-03-15 14:12:46 +00:00
parent 06b51a380b
commit fa1d73afeb
6 changed files with 41 additions and 8 deletions

View File

@ -172,6 +172,11 @@ New features
for captureState()/restoreState(), if the state itsself
needs to be inspected/modified. (Uwe Schindler)
* LUCENE-2293: Expose control over max number of threads that
IndexWriter will allow to run concurrently while indexing
documents (previously this was hardwired to 5), using
IndexWriterConfig.setMaxThreadStates. (Mike McCandless)
Optimizations
* LUCENE-2075: Terms dict cache is now shared across threads instead

View File

@ -125,7 +125,6 @@ final class DocumentsWriter {
// Max # ThreadState instances; if there are more threads
// than this they share ThreadStates
private final static int MAX_THREAD_STATE = 5;
private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
@ -141,6 +140,10 @@ final class DocumentsWriter {
int maxFieldLength = IndexWriterConfig.UNLIMITED_FIELD_LENGTH;
Similarity similarity;
// max # simultaneous threads; if there are more than
// this, they wait for others to finish first
private final int maxThreadStates;
List<String> newFiles;
static class DocState {
@ -301,10 +304,11 @@ final class DocumentsWriter {
private boolean closed;
DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain) throws IOException {
DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates) throws IOException {
this.directory = directory;
this.writer = writer;
this.similarity = writer.getConfig().getSimilarity();
this.maxThreadStates = maxThreadStates;
flushedDocCount = writer.maxDoc();
consumer = indexingChain.getChain(this);
@ -721,7 +725,7 @@ final class DocumentsWriter {
if (minThreadState == null || ts.numThreads < minThreadState.numThreads)
minThreadState = ts;
}
if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= MAX_THREAD_STATE)) {
if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
state = minThreadState;
state.numThreads++;
} else {

View File

@ -1153,7 +1153,7 @@ public class IndexWriter implements Closeable {
setRollbackSegmentInfos(segmentInfos);
docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain());
docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates());
docWriter.setInfoStream(infoStream);
docWriter.setMaxFieldLength(maxFieldLength);

View File

@ -78,6 +78,12 @@ public final class IndexWriterConfig implements Cloneable {
*/
public static long WRITE_LOCK_TIMEOUT = 1000;
/** The maximum number of simultaneous threads that may be
* indexing documents at once in IndexWriter; if more
* than this many threads arrive they will wait for
* others to finish. */
public final static int DEFAULT_MAX_THREAD_STATES = 8;
/**
* Sets the default (for any instance) maximum time to wait for a write lock
* (in milliseconds).
@ -110,6 +116,7 @@ public final class IndexWriterConfig implements Cloneable {
private int maxBufferedDocs;
private IndexingChain indexingChain;
private IndexReaderWarmer mergedSegmentWarmer;
private int maxThreadStates;
// required for clone
private Version matchVersion;
@ -137,6 +144,7 @@ public final class IndexWriterConfig implements Cloneable {
maxBufferedDocs = DEFAULT_MAX_BUFFERED_DOCS;
indexingChain = DocumentsWriter.defaultIndexingChain;
mergedSegmentWarmer = null;
maxThreadStates = DEFAULT_MAX_THREAD_STATES;
}
@Override
@ -483,6 +491,18 @@ public final class IndexWriterConfig implements Cloneable {
return mergedSegmentWarmer;
}
/** Sets the max number of simultaneous threads that may
* be indexing documents at once in IndexWriter. */
public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
this.maxThreadStates = maxThreadStates;
return this;
}
/** Returns the max number of simultaneous threads that
* may be indexing documents at once in IndexWriter. */
public int getMaxThreadStates() {
return maxThreadStates;
}
/** Expert: sets the {@link DocConsumer} chain to be used to process documents. */
IndexWriterConfig setIndexingChain(IndexingChain indexingChain) {
@ -513,6 +533,7 @@ public final class IndexWriterConfig implements Cloneable {
sb.append("ramBufferSizeMB=").append(ramBufferSizeMB).append("\n");
sb.append("maxBufferedDocs=").append(maxBufferedDocs).append("\n");
sb.append("mergedSegmentWarmer=").append(mergedSegmentWarmer).append("\n");
sb.append("maxThreadStates=").append(maxThreadStates).append("\n");
return sb.toString();
}
}

View File

@ -98,6 +98,7 @@ public class TestIndexWriterConfig extends LuceneTestCaseJ4 {
getters.add("getMaxBufferedDocs");
getters.add("getIndexingChain");
getters.add("getMergedSegmentWarmer");
getters.add("getMaxThreadStates");
for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {
assertTrue("method " + m.getName() + " is not tested for defaults", getters.contains(m.getName()));

View File

@ -84,7 +84,8 @@ public class TestStressIndexing2 extends LuceneTestCase {
// dir1 = FSDirectory.open("foofoofoo");
Directory dir2 = new MockRAMDirectory();
// mergeFactor=2; maxBufferedDocs=2; Map docs = indexRandom(1, 3, 2, dir1);
Map<String,Document> docs = indexRandom(10, 10, 100, dir1);
int maxThreadStates = 1+r.nextInt(10);
Map<String,Document> docs = indexRandom(10, 10, 100, dir1, maxThreadStates);
indexSerial(docs, dir2);
// verifying verify
@ -101,6 +102,7 @@ public class TestStressIndexing2 extends LuceneTestCase {
sameFieldOrder=r.nextBoolean();
mergeFactor=r.nextInt(3)+2;
maxBufferedDocs=r.nextInt(3)+2;
int maxThreadStates = 1+r.nextInt(10);
seed++;
int nThreads=r.nextInt(5)+1;
@ -108,7 +110,7 @@ public class TestStressIndexing2 extends LuceneTestCase {
int range=r.nextInt(20)+1;
Directory dir1 = new MockRAMDirectory();
Directory dir2 = new MockRAMDirectory();
Map<String,Document> docs = indexRandom(nThreads, iter, range, dir1);
Map<String,Document> docs = indexRandom(nThreads, iter, range, dir1, maxThreadStates);
indexSerial(docs, dir2);
verifyEquals(dir1, dir2, "id");
}
@ -182,12 +184,12 @@ public class TestStressIndexing2 extends LuceneTestCase {
return dw;
}
public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir) throws IOException, InterruptedException {
public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates) throws IOException, InterruptedException {
Map<String,Document> docs = new HashMap<String,Document>();
for(int iter=0;iter<3;iter++) {
IndexWriter w = new MockIndexWriter(dir, new IndexWriterConfig(
TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT)).setOpenMode(OpenMode.CREATE)
.setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs));
.setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setMaxThreadStates(maxThreadStates));
LogMergePolicy lmp = (LogMergePolicy) w.getMergePolicy();
lmp.setUseCompoundFile(false);
lmp.setUseCompoundDocStore(false);