added missing synchronization to flushAllThread to check and set DWPT flushPending atomically

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1100103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-05-06 06:46:31 +00:00
parent 96878534a0
commit 826b1d32bd
1 changed files with 12 additions and 10 deletions

View File

@ -122,13 +122,13 @@ public final class DocumentsWriterFlushControl {
// is super important since we can not address more than 2048 MB per DWPT // is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread); setFlushPending(perThread);
if (fullFlush) { if (fullFlush) {
DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false); DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread);
assert toBlock != null; assert toBlock != null;
blockedFlushes.add(toBlock); blockedFlushes.add(toBlock);
} }
} }
} }
final DocumentsWriterPerThread flushingDWPT = tryCheckoutForFlush(perThread, false); final DocumentsWriterPerThread flushingDWPT = tryCheckoutForFlush(perThread);
healthiness.updateStalled(this); healthiness.updateStalled(this);
return flushingDWPT; return flushingDWPT;
} }
@ -189,18 +189,15 @@ public final class DocumentsWriterFlushControl {
} }
synchronized DocumentsWriterPerThread tryCheckoutForFlush( synchronized DocumentsWriterPerThread tryCheckoutForFlush(
ThreadState perThread, boolean setPending) { ThreadState perThread) {
if (fullFlush) { if (fullFlush) {
return null; return null;
} }
return internalTryCheckOutForFlush(perThread, setPending); return internalTryCheckOutForFlush(perThread);
} }
private DocumentsWriterPerThread internalTryCheckOutForFlush( private DocumentsWriterPerThread internalTryCheckOutForFlush(
ThreadState perThread, boolean setPending) { ThreadState perThread) {
if (setPending && !perThread.flushPending) {
setFlushPending(perThread);
}
if (perThread.flushPending) { if (perThread.flushPending) {
// We are pending so all memory is already moved to flushBytes // We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) { if (perThread.tryLock()) {
@ -245,7 +242,7 @@ public final class DocumentsWriterFlushControl {
while (allActiveThreads.hasNext() && numPending > 0) { while (allActiveThreads.hasNext() && numPending > 0) {
ThreadState next = allActiveThreads.next(); ThreadState next = allActiveThreads.next();
if (next.flushPending) { if (next.flushPending) {
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false); final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
if (dwpt != null) { if (dwpt != null) {
return dwpt; return dwpt;
} }
@ -330,7 +327,12 @@ public final class DocumentsWriterFlushControl {
} }
if (next.perThread.getNumDocsInRAM() > 0 ) { if (next.perThread.getNumDocsInRAM() > 0 ) {
final DocumentsWriterPerThread dwpt = next.perThread; // just for assert final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next, true); synchronized (this) {
if (!next.flushPending) {
setFlushPending(next);
}
}
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT"; assert dwpt == flushingDWPT : "flushControl returned different DWPT";
toFlush.add(flushingDWPT); toFlush.add(flushingDWPT);