work around starvation issue where multiple indexing threads flush segments faster than the single thread can publish them

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1394704 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-10-05 18:19:48 +00:00
parent c3e00c353e
commit 0009385b5a
4 changed files with 38 additions and 12 deletions

View File

@ -432,7 +432,20 @@ final class DocumentsWriter {
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
ticketQueue.tryPurge(this);
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadState()) {
// This means there is a backlog: the one
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers. But really
// this means we have a concurrency issue
// (TestBagOfPostings can provoke this):
// publishing a flush segment is too heavy today
// (it builds CFS, writes .si, etc.) ... we need
// to make those ops concurrent too:
ticketQueue.forcePurge(this);
} else {
ticketQueue.tryPurge(this);
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
@ -496,7 +509,7 @@ final class DocumentsWriter {
final SegmentInfoPerCommit segInfo = indexWriter.prepareFlushedSegment(newSegment);
final BufferedDeletes deletes = newSegment.segmentDeletes;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);
infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + deletes);
}
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {

View File

@ -41,8 +41,7 @@ class DocumentsWriterFlushQueue {
// a window for #anyChanges to fail
boolean success = false;
try {
queue
.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
success = true;
} finally {
if (!success) {
@ -111,7 +110,7 @@ class DocumentsWriterFlushQueue {
if (canPublish) {
try {
/*
* if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since ther could
* be a ticket still in the queue.
@ -119,7 +118,7 @@ class DocumentsWriterFlushQueue {
head.publish(writer);
} finally {
synchronized (this) {
// finally remove the publised ticket from the queue
// finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
ticketCount.decrementAndGet();
assert poll == head;
@ -152,6 +151,10 @@ class DocumentsWriterFlushQueue {
}
}
public int getTicketCount() {
return ticketCount.get();
}
synchronized void clear() {
queue.clear();
ticketCount.set(0);
@ -186,7 +189,7 @@ class DocumentsWriterFlushQueue {
return true;
}
}
static final class SegmentFlushTicket extends FlushTicket {
private FlushedSegment segment;
private boolean failed = false;

View File

@ -43,8 +43,10 @@ public class TestBagOfPostings extends LuceneTestCase {
List<String> postingsList = new ArrayList<String>();
int numTerms = atLeast(300);
final int maxTermsPerDoc = _TestUtil.nextInt(random(), 10, 20);
//System.out.println("maxTermsPerDoc=" + maxTermsPerDoc);
//System.out.println("numTerms=" + numTerms);
if (VERBOSE) {
System.out.println("maxTermsPerDoc=" + maxTermsPerDoc);
System.out.println("numTerms=" + numTerms);
}
for (int i = 0; i < numTerms; i++) {
String term = Integer.toString(i);
for (int j = 0; j < i; j++) {
@ -59,11 +61,14 @@ public class TestBagOfPostings extends LuceneTestCase {
final RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
int threadCount = _TestUtil.nextInt(random(), 1, 5);
//System.out.println("threadCount=" + threadCount);
if (VERBOSE) {
System.out.println("config: " + iw.w.getConfig());
System.out.println("threadCount=" + threadCount);
}
Thread[] threads = new Thread[threadCount];
final CountDownLatch startingGun = new CountDownLatch(1);
for(int threadID=0;threadID<threadCount;threadID++) {
threads[threadID] = new Thread() {
@Override

View File

@ -508,8 +508,14 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
final IndexInput ii;
int randomInt = randomState.nextInt(500);
if (randomInt == 0) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name);
}
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput);
} else if (randomInt == 1) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name);
}
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput);
} else {
ii = new MockIndexInputWrapper(this, name, delegateInput);
@ -660,7 +666,6 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
endFiles = endSet.toArray(new String[0]);
if (!Arrays.equals(startFiles, endFiles)) {
StringBuilder sb = new StringBuilder();
List<String> removed = new ArrayList<String>();
for(String fileName : startFiles) {
if (!endSet.contains(fileName)) {