LUCENE-6659: remove IWC's max thread states limit

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1690299 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-07-10 18:09:58 +00:00
parent 63da283f04
commit a6301dd900
39 changed files with 95 additions and 312 deletions

View File

@ -134,6 +134,9 @@ New Features
that can be used to validate that an index has an appropriate structure to
run join queries. (Adrien Grand)
* LUCENE-6659: Remove IndexWriter's unnecessary hard limit on max concurrency
(Robert Muir, Mike McCandless)
API Changes
* LUCENE-6508: Simplify Lock api, there is now just

View File

@ -166,7 +166,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return deleteQueue;
}
private final boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (flushControl.getAndResetApplyAllDeletes()) {
if (deleteQueue != null && !flushControl.isFullFlush()) {
ticketQueue.addDeletes(deleteQueue);
@ -177,7 +177,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return false;
}
final int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
if (forced) {
return ticketQueue.forcePurge(writer);
} else {
@ -209,7 +209,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "abort");
}
final int limit = perThreadPool.getActiveThreadState();
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
@ -262,7 +262,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
/** Returns how many documents were aborted. */
private final int abortThreadState(final ThreadState perThread) {
private int abortThreadState(final ThreadState perThread) {
assert perThread.isHeldByCurrentThread();
if (perThread.isActive()) { // we might be closed
if (perThread.isInitialized()) {
@ -285,7 +285,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAll");
@ -315,13 +315,11 @@ final class DocumentsWriter implements Closeable, Accountable {
* ticket queue has any tickets.
*/
boolean anyChanges = numDocsInRAM.get() != 0 || anyDeletions() || ticketQueue.hasTickets() || pendingChangesInCurrentFullFlush;
if (infoStream.isEnabled("DW")) {
if (anyChanges) {
infoStream.message("DW", "anyChanges? numDocsInRam=" + numDocsInRAM.get()
+ " deletes=" + anyDeletions() + " hasTickets:"
+ ticketQueue.hasTickets() + " pendingChangesInFullFlush: "
+ pendingChangesInCurrentFullFlush);
}
if (infoStream.isEnabled("DW") && anyChanges) {
infoStream.message("DW", "anyChanges? numDocsInRam=" + numDocsInRAM.get()
+ " deletes=" + anyDeletions() + " hasTickets:"
+ ticketQueue.hasTickets() + " pendingChangesInFullFlush: "
+ pendingChangesInCurrentFullFlush);
}
return anyChanges;
}
@ -361,10 +359,8 @@ final class DocumentsWriter implements Closeable, Accountable {
hasEvents |= doFlush(flushingDWPT);
}
if (infoStream.isEnabled("DW")) {
if (flushControl.anyStalledThreads()) {
infoStream.message("DW", "WARNING DocumentsWriter has stalled threads; waiting");
}
if (infoStream.isEnabled("DW") && flushControl.anyStalledThreads()) {
infoStream.message("DW", "WARNING DocumentsWriter has stalled threads; waiting");
}
flushControl.waitIfStalled(); // block if stalled
@ -391,7 +387,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return hasEvents;
}
private final void ensureInitialized(ThreadState state) throws IOException {
private void ensureInitialized(ThreadState state) throws IOException {
if (state.isActive() && state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder(
writer.globalFieldNumberMap);
@ -482,7 +478,6 @@ final class DocumentsWriter implements Closeable, Accountable {
hasEvents = true;
boolean success = false;
SegmentFlushTicket ticket = null;
Throwable exc = null;
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@ -538,7 +533,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadState()) {
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) {
// This means there is a backlog: the one
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
@ -576,7 +571,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return hasEvents;
}
final void subtractFlushedNumDocs(int numFlushed) {
void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
oldValue = numDocsInRAM.get();
@ -598,7 +593,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
final boolean flushAllThreads()
boolean flushAllThreads()
throws IOException, AbortingException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
@ -640,7 +635,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return anythingFlushed;
}
final void finishFullFlush(IndexWriter indexWriter, boolean success) {
void finishFullFlush(IndexWriter indexWriter, boolean success) {
assert indexWriter.holdsFullFlushLock();
try {
if (infoStream.isEnabled("DW")) {

View File

@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
@ -116,7 +115,7 @@ final class DocumentsWriterFlushControl implements Accountable {
// (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully flushed
// all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta
// (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta);
final long expected = (2 * ramBufferBytes) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta);
// the expected ram consumption is an upper bound at this point and not really the expected consumption
if (peakDelta < (ramBufferBytes >> 1)) {
/*
@ -245,9 +244,9 @@ final class DocumentsWriterFlushControl implements Accountable {
* that we don't stall/block if an ongoing or pending flush can
* not free up enough memory to release the stall lock.
*/
final boolean stall = ((activeBytes + flushBytes) > limit) &&
(activeBytes < limit) &&
!closed;
final boolean stall = (activeBytes + flushBytes) > limit &&
activeBytes < limit &&
!closed;
stallControl.updateStalled(stall);
return stall;
}
@ -364,7 +363,7 @@ final class DocumentsWriterFlushControl implements Accountable {
numPending = this.numPending;
}
if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
final int limit = perThreadPool.getActiveThreadState();
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit && numPending > 0; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
if (next.flushPending) {
@ -390,7 +389,7 @@ final class DocumentsWriterFlushControl implements Accountable {
* Returns an iterator that provides access to all currently active {@link ThreadState}s
*/
public Iterator<ThreadState> allActiveThreadStates() {
return getPerThreadsIterator(perThreadPool.getActiveThreadState());
return getPerThreadsIterator(perThreadPool.getActiveThreadStateCount());
}
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
@ -451,7 +450,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
int numActiveDWPT() {
return this.perThreadPool.getActiveThreadState();
return this.perThreadPool.getActiveThreadStateCount();
}
ThreadState obtainAndLock() {
@ -488,7 +487,7 @@ final class DocumentsWriterFlushControl implements Accountable {
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
}
final int limit = perThreadPool.getActiveThreadState();
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
@ -531,7 +530,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
final int limit = perThreadPool.getActiveThreadState();
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();

View File

@ -16,10 +16,11 @@ package org.apache.lucene.index;
* limitations under the License.
*/
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.ThreadInterruptedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
/**
* {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
@ -128,42 +129,17 @@ final class DocumentsWriterPerThreadPool {
}
}
private final ThreadState[] threadStates;
private final List<ThreadState> threadStates = new ArrayList<>();
private volatile int numThreadStatesActive;
private final ThreadState[] freeList;
private int freeCount;
private final List<ThreadState> freeList = new ArrayList<>();
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
DocumentsWriterPerThreadPool(int maxNumThreadStates) {
if (maxNumThreadStates < 1) {
throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
}
threadStates = new ThreadState[maxNumThreadStates];
numThreadStatesActive = 0;
for (int i = 0; i < threadStates.length; i++) {
threadStates[i] = new ThreadState(null);
}
freeList = new ThreadState[maxNumThreadStates];
}
/**
* Returns the max number of {@link ThreadState} instances available in this
* {@link DocumentsWriterPerThreadPool}
*/
int getMaxThreadStates() {
return threadStates.length;
}
/**
* Returns the active number of {@link ThreadState} instances.
*/
int getActiveThreadState() {
synchronized int getActiveThreadStateCount() {
return numThreadStatesActive;
}
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
@ -175,9 +151,14 @@ final class DocumentsWriterPerThreadPool {
* @return a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>
*/
private ThreadState newThreadState() {
assert numThreadStatesActive < threadStates.length;
final ThreadState threadState = threadStates[numThreadStatesActive];
private synchronized ThreadState newThreadState() {
assert numThreadStatesActive <= threadStates.size();
if (numThreadStatesActive == threadStates.size()) {
threadStates.add(new ThreadState(null));
}
ThreadState threadState = threadStates.get(numThreadStatesActive);
threadState.lock(); // lock so nobody else will get this ThreadState
boolean unlock = true;
try {
@ -199,14 +180,16 @@ final class DocumentsWriterPerThreadPool {
}
}
}
// Used by assert
private synchronized boolean assertUnreleasedThreadStatesInactive() {
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
for (int i = numThreadStatesActive; i < threadStates.size(); i++) {
ThreadState threadState = threadStates.get(i);
assert threadState.tryLock() : "unreleased threadstate should not be locked";
try {
assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive";
assert !threadState.isInitialized() : "expected unreleased thread state to be inactive";
} finally {
threadStates[i].unlock();
threadState.unlock();
}
}
return true;
@ -216,8 +199,8 @@ final class DocumentsWriterPerThreadPool {
* Deactivate all unreleased threadstates
*/
synchronized void deactivateUnreleasedStates() {
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
final ThreadState threadState = threadStates[i];
for (int i = numThreadStatesActive; i < threadStates.size(); i++) {
final ThreadState threadState = threadStates.get(i);
threadState.lock();
try {
threadState.deactivate();
@ -249,44 +232,33 @@ final class DocumentsWriterPerThreadPool {
ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = null;
synchronized (this) {
while (true) {
if (freeCount > 0) {
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a
// limited number of thread states:
threadState = freeList[freeCount-1];
if (freeList.isEmpty()) {
// ThreadState is already locked before return by this method:
return newThreadState();
} else {
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a
// limited number of thread states:
threadState = freeList.remove(freeList.size()-1);
if (threadState.dwpt == null) {
// This thread-state is not initialized, e.g. it
// was just flushed. See if we can instead find
// another free thread state that already has docs
// indexed. This way if incoming thread concurrency
// has decreased, we don't leave docs
// indefinitely buffered, tying up RAM. This
// will instead get those thread states flushed,
// freeing up RAM for larger segment flushes:
for(int i=0;i<freeCount;i++) {
if (freeList[i].dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
ThreadState ts = freeList[i];
freeList[i] = threadState;
threadState = ts;
break;
}
if (threadState.dwpt == null) {
// This thread-state is not initialized, e.g. it
// was just flushed. See if we can instead find
// another free thread state that already has docs
// indexed. This way if incoming thread concurrency
// has decreased, we don't leave docs
// indefinitely buffered, tying up RAM. This
// will instead get those thread states flushed,
// freeing up RAM for larger segment flushes:
for(int i=0;i<freeList.size();i++) {
ThreadState ts = freeList.get(i);
if (ts.dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
freeList.set(i, threadState);
threadState = ts;
break;
}
}
freeCount--;
break;
} else if (numThreadStatesActive < threadStates.length) {
// ThreadState is already locked before return by this method:
return newThreadState();
} else {
// Wait until a thread state frees up:
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
}
@ -300,8 +272,7 @@ final class DocumentsWriterPerThreadPool {
void release(ThreadState state) {
state.unlock();
synchronized (this) {
assert freeCount < freeList.length;
freeList[freeCount++] = state;
freeList.add(state);
// In case any thread is waiting, wake one of them up since we just released a thread state; notify() should be sufficient but we do
// notifyAll defensively:
notifyAll();
@ -317,8 +288,12 @@ final class DocumentsWriterPerThreadPool {
* @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
* given ord.
*/
ThreadState getThreadState(int ord) {
return threadStates[ord];
synchronized ThreadState getThreadState(int ord) {
return threadStates.get(ord);
}
synchronized int getMaxThreadStates() {
return threadStates.size();
}
/**
@ -330,35 +305,13 @@ final class DocumentsWriterPerThreadPool {
ThreadState minThreadState = null;
final int limit = numThreadStatesActive;
for (int i = 0; i < limit; i++) {
final ThreadState state = threadStates[i];
final ThreadState state = threadStates.get(i);
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
minThreadState = state;
}
}
return minThreadState;
}
/**
* Returns the number of currently deactivated {@link ThreadState} instances.
* A deactivated {@link ThreadState} should not be used for indexing anymore.
*
* @return the number of currently deactivated {@link ThreadState} instances.
*/
int numDeactivatedThreadStates() {
int count = 0;
for (int i = 0; i < threadStates.length; i++) {
final ThreadState threadState = threadStates[i];
threadState.lock();
try {
if (!threadState.isActive) {
count++;
}
} finally {
threadState.unlock();
}
}
return count;
}
/**
* Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can

View File

@ -792,7 +792,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// IndexFormatTooOldException.
boolean initialIndexExists = true;
boolean fromReader = false;
String[] files = directory.listAll();
@ -2088,8 +2087,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
IOUtils.close(writeLock); // release write lock
writeLock = null;
assert docWriter.perThreadPool.numDeactivatedThreadStates() == docWriter.perThreadPool.getMaxThreadStates() : "" + docWriter.perThreadPool.numDeactivatedThreadStates() + " " + docWriter.perThreadPool.getMaxThreadStates();
}
success = true;

View File

@ -92,12 +92,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
/** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
/** The maximum number of simultaneous threads that may be
* indexing documents at once in IndexWriter; if more
* than this many threads arrive they will wait for
* others to finish. Default value is 8. */
public final static int DEFAULT_MAX_THREAD_STATES = 8;
/** Default value for compound file system for newly written segments
* (set to <code>true</code>). For batch indexing with very large
* ram buffers use <code>false</code> */
@ -287,23 +281,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
return indexerThreadPool;
}
/**
* Sets the max number of simultaneous threads that may be indexing documents
* at once in IndexWriter. Values &lt; 1 are invalid and if passed
* <code>maxThreadStates</code> will be set to
* {@link #DEFAULT_MAX_THREAD_STATES}.
*
* <p>Only takes effect when IndexWriter is first created. */
public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
this.indexerThreadPool = new DocumentsWriterPerThreadPool(maxThreadStates);
return this;
}
@Override
public int getMaxThreadStates() {
return indexerThreadPool.getMaxThreadStates();
}
/** By default, IndexWriter does not pool the
* SegmentReaders it must open for deletions and
* merging, unless a near-real-time reader has been

View File

@ -116,7 +116,7 @@ public class LiveIndexWriterConfig {
mergePolicy = new TieredMergePolicy();
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
indexerThreadPool = new DocumentsWriterPerThreadPool();
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
@ -371,14 +371,6 @@ public class LiveIndexWriterConfig {
return indexerThreadPool;
}
/**
* Returns the max number of simultaneous threads that may be indexing
* documents at once in IndexWriter.
*/
public int getMaxThreadStates() {
return indexerThreadPool.getMaxThreadStates();
}
/**
* Returns {@code true} if {@link IndexWriter} should pool readers even if
* {@link DirectoryReader#open(IndexWriter, boolean)} has not been called.

View File

@ -70,9 +70,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
.setFlushPolicy(flushPolicy);
final int numDWPT = 1 + atLeast(2);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numDWPT);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
iwc.setRAMBufferSizeMB(maxRamMB);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
@ -129,9 +127,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
.setFlushPolicy(flushPolicy);
final int numDWPT = 1 + atLeast(2);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numDWPT);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
iwc.setMaxBufferedDocs(2 + atLeast(10));
iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
@ -181,9 +177,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
iwc.setFlushPolicy(flushPolicy);
final int numDWPT = 1 + random().nextInt(8);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numDWPT);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
IndexWriter writer = new IndexWriter(dir, iwc);
@ -249,8 +243,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
iwc.setFlushPolicy(flushPolicy);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numThreads[i]== 1 ? 1 : 2);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
// with such a small ram buffer we should be stalled quite quickly
iwc.setRAMBufferSizeMB(0.25);

View File

@ -131,8 +131,7 @@ public class TestIndexWriterThreadsToSegments extends LuceneTestCase {
assertNotNull(r2);
r.close();
r = r2;
int maxThreadStates = w.getConfig().getMaxThreadStates();
int maxExpectedSegments = oldSegmentCount + Math.min(maxThreadStates, maxThreadCountPerIter.get());
int maxExpectedSegments = oldSegmentCount + maxThreadCountPerIter.get();
if (VERBOSE) {
System.out.println("TEST: iter done; now verify oldSegCount=" + oldSegmentCount + " newSegCount=" + r2.leaves().size() + " maxExpected=" + maxExpectedSegments);
}
@ -165,16 +164,9 @@ public class TestIndexWriterThreadsToSegments extends LuceneTestCase {
Directory dir = newFSDirectory(createTempDir());
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
int maxThreadStates = TestUtil.nextInt(random(), 1, 12);
if (VERBOSE) {
System.out.println("TEST: maxThreadStates=" + maxThreadStates);
}
// Never trigger flushes (so we only flush on getReader):
iwc.setMaxBufferedDocs(100000000);
iwc.setRAMBufferSizeMB(-1);
iwc.setMaxThreadStates(maxThreadStates);
// Never trigger merges (so we can simplistically count flushed segments):
iwc.setMergePolicy(NoMergePolicy.INSTANCE);

View File

@ -32,7 +32,6 @@ import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
@ -65,9 +64,8 @@ public class TestStressIndexing2 extends LuceneTestCase {
Directory dir1 = newDirectory();
Directory dir2 = newDirectory();
// mergeFactor=2; maxBufferedDocs=2; Map docs = indexRandom(1, 3, 2, dir1);
int maxThreadStates = 1+random().nextInt(10);
boolean doReaderPooling = random().nextBoolean();
Map<String,Document> docs = indexRandom(5, 3, 100, dir1, maxThreadStates, doReaderPooling);
Map<String,Document> docs = indexRandom(5, 3, 100, dir1, doReaderPooling);
indexSerial(random(), docs, dir2);
// verifying verify
@ -90,7 +88,6 @@ public class TestStressIndexing2 extends LuceneTestCase {
sameFieldOrder=random().nextBoolean();
mergeFactor=random().nextInt(3)+2;
maxBufferedDocs=random().nextInt(3)+2;
int maxThreadStates = 1+random().nextInt(10);
boolean doReaderPooling = random().nextBoolean();
seed++;
@ -100,9 +97,9 @@ public class TestStressIndexing2 extends LuceneTestCase {
Directory dir1 = newDirectory();
Directory dir2 = newDirectory();
if (VERBOSE) {
System.out.println(" nThreads=" + nThreads + " iter=" + iter + " range=" + range + " doPooling=" + doReaderPooling + " maxThreadStates=" + maxThreadStates + " sameFieldOrder=" + sameFieldOrder + " mergeFactor=" + mergeFactor + " maxBufferedDocs=" + maxBufferedDocs);
System.out.println(" nThreads=" + nThreads + " iter=" + iter + " range=" + range + " doPooling=" + doReaderPooling + " sameFieldOrder=" + sameFieldOrder + " mergeFactor=" + mergeFactor + " maxBufferedDocs=" + maxBufferedDocs);
}
Map<String,Document> docs = indexRandom(nThreads, iter, range, dir1, maxThreadStates, doReaderPooling);
Map<String,Document> docs = indexRandom(nThreads, iter, range, dir1, doReaderPooling);
if (VERBOSE) {
System.out.println("TEST: index serial");
}
@ -187,14 +184,13 @@ public class TestStressIndexing2 extends LuceneTestCase {
return dw;
}
public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates,
public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir,
boolean doReaderPooling) throws IOException, InterruptedException {
Map<String,Document> docs = new HashMap<>();
IndexWriter w = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
.setOpenMode(OpenMode.CREATE)
.setRAMBufferSizeMB(0.1)
.setMaxBufferedDocs(maxBufferedDocs)
.setIndexerThreadPool(new DocumentsWriterPerThreadPool(maxThreadStates))
.setReaderPooling(doReaderPooling)
.setMergePolicy(newLogMergePolicy()), random());
LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();

View File

@ -949,12 +949,6 @@ public abstract class LuceneTestCase extends Assert {
c.setMaxBufferedDocs(TestUtil.nextInt(r, 16, 1000));
}
}
if (r.nextBoolean()) {
int maxNumThreadStates = rarely(r) ? TestUtil.nextInt(r, 5, 20) // crazy value
: TestUtil.nextInt(r, 1, 4); // reasonable value
c.setMaxThreadStates(maxNumThreadStates);
}
c.setMergePolicy(newMergePolicy(r));

View File

@ -31,7 +31,6 @@ A solrconfig.xml snippet containing indexConfig settings for randomized testing.
<useCompoundFile>${useCompoundFile:false}</useCompoundFile>
<maxBufferedDocs>${solr.tests.maxBufferedDocs}</maxBufferedDocs>
<maxIndexingThreads>${solr.tests.maxIndexingThreads}</maxIndexingThreads>
<ramBufferSizeMB>${solr.tests.ramBufferSizeMB}</ramBufferSizeMB>
<mergeScheduler class="${solr.tests.mergeScheduler}" />

View File

@ -143,12 +143,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<maxIndexingThreads>${solr.maxIndexingThreads:8}</maxIndexingThreads>
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -159,12 +159,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<maxIndexingThreads>${solr.maxIndexingThreads:8}</maxIndexingThreads>
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -161,12 +161,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<maxIndexingThreads>${solr.maxIndexingThreads:8}</maxIndexingThreads>
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -143,12 +143,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<maxIndexingThreads>${solr.maxIndexingThreads:8}</maxIndexingThreads>
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -162,12 +162,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<maxIndexingThreads>${solr.maxIndexingThreads:8}</maxIndexingThreads>
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -57,7 +57,6 @@ public class SolrIndexConfig implements MapSerializable {
public final int maxBufferedDocs;
public final int maxMergeDocs;
public final int maxIndexingThreads;
public final int mergeFactor;
public final double ramBufferSizeMB;
@ -86,7 +85,6 @@ public class SolrIndexConfig implements MapSerializable {
effectiveUseCompoundFileSetting = false;
maxBufferedDocs = -1;
maxMergeDocs = -1;
maxIndexingThreads = IndexWriterConfig.DEFAULT_MAX_THREAD_STATES;
mergeFactor = -1;
ramBufferSizeMB = 100;
writeLockTimeout = -1;
@ -136,7 +134,6 @@ public class SolrIndexConfig implements MapSerializable {
effectiveUseCompoundFileSetting = solrConfig.getBool(prefix+"/useCompoundFile", def.getUseCompoundFile());
maxBufferedDocs=solrConfig.getInt(prefix+"/maxBufferedDocs",def.maxBufferedDocs);
maxMergeDocs=solrConfig.getInt(prefix+"/maxMergeDocs",def.maxMergeDocs);
maxIndexingThreads=solrConfig.getInt(prefix+"/maxIndexingThreads",def.maxIndexingThreads);
mergeFactor=solrConfig.getInt(prefix+"/mergeFactor",def.mergeFactor);
ramBufferSizeMB = solrConfig.getDouble(prefix+"/ramBufferSizeMB", def.ramBufferSizeMB);
@ -172,7 +169,6 @@ public class SolrIndexConfig implements MapSerializable {
Map<String, Object> m = ZkNodeProps.makeMap("useCompoundFile", effectiveUseCompoundFileSetting,
"maxBufferedDocs", maxBufferedDocs,
"maxMergeDocs", maxMergeDocs,
"maxIndexingThreads", maxIndexingThreads,
"mergeFactor", mergeFactor,
"ramBufferSizeMB", ramBufferSizeMB,
"writeLockTimeout", writeLockTimeout,
@ -221,10 +217,6 @@ public class SolrIndexConfig implements MapSerializable {
// there may modify the effective useCompoundFile
iwc.setUseCompoundFile(getUseCompoundFile());
if (maxIndexingThreads != -1) {
iwc.setMaxThreadStates(maxIndexingThreads);
}
if (mergedSegmentWarmerInfo != null) {
// TODO: add infostream -> normal logging system (there is an issue somewhere)
IndexReaderWarmer warmer = schema.getResourceLoader().newInstance(mergedSegmentWarmerInfo.className,

View File

@ -33,7 +33,6 @@
<useCompoundFile>${useCompoundFile:false}</useCompoundFile>
<maxBufferedDocs>${solr.tests.maxBufferedDocs}</maxBufferedDocs>
<maxIndexingThreads>${solr.tests.maxIndexingThreads}</maxIndexingThreads>
<ramBufferSizeMB>${solr.tests.ramBufferSizeMB}</ramBufferSizeMB>
<mergeScheduler class="${solr.tests.mergeScheduler}" />

View File

@ -30,7 +30,6 @@
<useCompoundFile>${useCompoundFile:false}</useCompoundFile>
<maxBufferedDocs>${solr.tests.maxBufferedDocs}</maxBufferedDocs>
<maxIndexingThreads>${solr.tests.maxIndexingThreads}</maxIndexingThreads>
<ramBufferSizeMB>${solr.tests.ramBufferSizeMB}</ramBufferSizeMB>
<mergeScheduler class="${solr.tests.mergeScheduler}" />

View File

@ -23,7 +23,6 @@
<indexConfig>
<useCompoundFile>${useCompoundFile:false}</useCompoundFile>
<maxIndexingThreads>123</maxIndexingThreads>
<infoStream>true</infoStream>
<mergePolicy class="org.apache.solr.util.RandomMergePolicy" />
</indexConfig>

View File

@ -25,7 +25,6 @@
<!-- set some values to -1 to force the use of internal lucene defaults -->
<maxBufferedDocs>-1</maxBufferedDocs>
<ramBufferSizeMB>-1</ramBufferSizeMB>
<maxIndexingThreads>-1</maxIndexingThreads>
<mergeFactor>11</mergeFactor>
<maxMergeDocs>456</maxMergeDocs>

View File

@ -36,7 +36,6 @@
<useCompoundFile>${useCompoundFile}</useCompoundFile>
<maxBufferedDocs>${solr.tests.maxBufferedDocs}</maxBufferedDocs>
<maxIndexingThreads>${solr.tests.maxIndexingThreads}</maxIndexingThreads>
<ramBufferSizeMB>${solr.tests.ramBufferSizeMB}</ramBufferSizeMB>
<mergeScheduler class="${solr.tests.mergeScheduler}" />
<writeLockTimeout>1000</writeLockTimeout>

View File

@ -31,7 +31,6 @@ A solrconfig.xml snippet containing indexConfig settings for randomized testing.
<useCompoundFile>${useCompoundFile:false}</useCompoundFile>
<maxBufferedDocs>${solr.tests.maxBufferedDocs}</maxBufferedDocs>
<maxIndexingThreads>${solr.tests.maxIndexingThreads}</maxIndexingThreads>
<ramBufferSizeMB>${solr.tests.ramBufferSizeMB}</ramBufferSizeMB>
<mergeScheduler class="${solr.tests.mergeScheduler}" />

View File

@ -122,7 +122,6 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put(CoreDescriptor.CORE_CONFIG, "solrconfig-tlog.xml");
collectionProperties.put("solr.tests.maxBufferedDocs", "100000");
collectionProperties.put("solr.tests.maxIndexingThreads", "-1");
collectionProperties.put("solr.tests.ramBufferSizeMB", "100");
// use non-test classes so RandomizedRunner isn't necessary
collectionProperties.put("solr.tests.mergePolicy", "org.apache.lucene.index.TieredMergePolicy");

View File

@ -176,7 +176,6 @@ public class TestSolrCloudWithKerberosAlt extends LuceneTestCase {
Properties properties = new Properties();
properties.put(CoreDescriptor.CORE_CONFIG, "solrconfig-tlog.xml");
properties.put("solr.tests.maxBufferedDocs", "100000");
properties.put("solr.tests.maxIndexingThreads", "-1");
properties.put("solr.tests.ramBufferSizeMB", "100");
// use non-test classes so RandomizedRunner isn't necessary
properties.put("solr.tests.mergePolicy", "org.apache.lucene.index.TieredMergePolicy");

View File

@ -120,7 +120,6 @@ public class TestConfig extends SolrTestCaseJ4 {
++numDefaultsTested; assertEquals("default maxBufferedDocs", -1, sic.maxBufferedDocs);
++numDefaultsTested; assertEquals("default maxMergeDocs", -1, sic.maxMergeDocs);
++numDefaultsTested; assertEquals("default maxIndexingThreads", IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, sic.maxIndexingThreads);
++numDefaultsTested; assertEquals("default mergeFactor", -1, sic.mergeFactor);
++numDefaultsTested; assertEquals("default ramBufferSizeMB", 100.0D, sic.ramBufferSizeMB, 0.0D);

View File

@ -143,9 +143,6 @@ public class TestMergePolicyConfig extends SolrTestCaseJ4 {
assertEquals(-1, solrConfig.indexConfig.maxBufferedDocs);
assertEquals(IndexWriterConfig.DISABLE_AUTO_FLUSH,
iwc.getMaxBufferedDocs());
assertEquals(-1, solrConfig.indexConfig.maxIndexingThreads);
assertEquals(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES,
iwc.getMaxThreadStates());
assertEquals(-1, solrConfig.indexConfig.ramBufferSizeMB, 0.0D);
assertEquals(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB,
iwc.getRAMBufferSizeMB(), 0.0D);

View File

@ -57,7 +57,6 @@ public class TestSolrIndexConfig extends SolrTestCaseJ4 {
private void checkIndexWriterConfig(LiveIndexWriterConfig iwc) {
assertEquals(123, iwc.getMaxThreadStates());
assertTrue(iwc.getInfoStream() instanceof LoggingInfoStream);
assertTrue(iwc.getMergePolicy().getClass().toString(),
iwc.getMergePolicy() instanceof RandomMergePolicy);

View File

@ -124,7 +124,6 @@ public class SolrIndexConfigTest extends SolrTestCaseJ4 {
++mSizeExpected; assertTrue(m.get("maxBufferedDocs") instanceof Integer);
++mSizeExpected; assertTrue(m.get("maxMergeDocs") instanceof Integer);
++mSizeExpected; assertTrue(m.get("maxIndexingThreads") instanceof Integer);
++mSizeExpected; assertTrue(m.get("mergeFactor") instanceof Integer);
++mSizeExpected; assertTrue(m.get("ramBufferSizeMB") instanceof Double);

View File

@ -185,12 +185,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -188,12 +188,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -185,12 +185,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -185,12 +185,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -186,12 +186,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -167,12 +167,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -166,12 +166,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -167,12 +167,6 @@
<!-- Maximum time to wait for a write lock (ms) for an IndexWriter. Default: 1000 -->
<!-- <writeLockTimeout>1000</writeLockTimeout> -->
<!-- The maximum number of simultaneous threads that may be
indexing documents at once in IndexWriter; if more than this
many threads arrive they will wait for others to finish.
Default in Solr/Lucene is 8. -->
<!-- <maxIndexingThreads>8</maxIndexingThreads> -->
<!-- Expert: Enabling compound file will use less files for the index,
using fewer file descriptors on the expense of performance decrease.
Default in Lucene is "true". Default in Solr is "false" (since 3.6) -->

View File

@ -370,13 +370,6 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
mergeSchedulerClass = "org.apache.lucene.index.ConcurrentMergeScheduler";
}
System.setProperty("solr.tests.mergeScheduler", mergeSchedulerClass);
// don't ask iwc.getMaxThreadStates(), sometimes newIWC uses
// RandomDocumentsWriterPerThreadPool and all hell breaks loose
int maxIndexingThreads = rarely(random())
? TestUtil.nextInt(random(), 5, 20) // crazy value
: TestUtil.nextInt(random(), 1, 4); // reasonable value
System.setProperty("solr.tests.maxIndexingThreads", String.valueOf(maxIndexingThreads));
}
public static Throwable getWrappedException(Throwable e) {