diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index 78caa57a205..9a28d4374cb 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -338,6 +338,11 @@ final class DocumentsWriter { if (flushingDWPT != null) { maybeMerge |= doFlush(flushingDWPT); + } else { + final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); + if (nextPendingFlush != null) { + maybeMerge |= doFlush(nextPendingFlush); + } } return maybeMerge; } diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index 9cf88061272..33484deec62 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -64,7 +64,7 @@ import org.apache.lucene.search.Query; final class DocumentsWriterDeleteQueue { private volatile Node tail; - + private static final AtomicReferenceFieldUpdater tailUpdater = AtomicReferenceFieldUpdater .newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail"); diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 4b88bb8aba5..7b968a8b4b4 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -128,7 +128,7 @@ public final class DocumentsWriterFlushControl { } } } - final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread); + final DocumentsWriterPerThread flushingDWPT = tryCheckoutForFlush(perThread, false); healthiness.updateStalled(this); return flushingDWPT; } @@ -226,18 +226,6 @@ public final class DocumentsWriterFlushControl { return null; } - private DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) { - if (numPending > 0) { - final DocumentsWriterPerThread dwpt = perThread == null ? null - : tryCheckoutForFlush(perThread, false); - if (dwpt == null) { - return nextPendingFlush(); - } - return dwpt; - } - return null; - } - @Override public String toString() { return "DocumentsWriterFlushControl [activeBytes=" + activeBytes @@ -257,7 +245,7 @@ public final class DocumentsWriterFlushControl { while (allActiveThreads.hasNext() && numPending > 0) { ThreadState next = allActiveThreads.next(); if (next.flushPending) { - DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false); + final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false); if (dwpt != null) { return dwpt; } @@ -327,6 +315,7 @@ public final class DocumentsWriterFlushControl { if (!next.isActive()) { continue; } + assert next.perThread.deleteQueue == flushingQueue || next.perThread.deleteQueue == documentsWriter.deleteQueue; if (next.perThread.deleteQueue != flushingQueue) { // this one is already a new DWPT continue; @@ -346,6 +335,7 @@ public final class DocumentsWriterFlushControl { } } synchronized (this) { + assert assertBlockedFlushes(flushingQueue); flushQueue.addAll(blockedFlushes); blockedFlushes.clear(); flushQueue.addAll(toFlush); @@ -357,6 +347,7 @@ public final class DocumentsWriterFlushControl { assert flushQueue.isEmpty(); try { if (!blockedFlushes.isEmpty()) { + assert assertBlockedFlushes(documentsWriter.deleteQueue); flushQueue.addAll(blockedFlushes); blockedFlushes.clear(); } @@ -364,6 +355,14 @@ public final class DocumentsWriterFlushControl { fullFlush = false; } } + + boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) { + Queue flushes = this.blockedFlushes; + for (DocumentsWriterPerThread documentsWriterPerThread : flushes) { + assert documentsWriterPerThread.deleteQueue == flushingQueue; + } + return true; + } synchronized void abortFullFlushes() { try { diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index b94fb24f101..65cdc2dc97d 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -322,6 +322,11 @@ public class DocumentsWriterPerThread { numDocsInRAM = 0; } + /** + * Prepares this DWPT for flushing. This method will freeze and return the + * {@link DocumentsWriterDeleteQueue}s global buffer and apply all pending + * deletes to this DWPT. + */ FrozenBufferedDeletes prepareFlush() { assert numDocsInRAM > 0; final FrozenBufferedDeletes globalDeletes = deleteQueue.freezeGlobalBuffer(deleteSlice); @@ -330,6 +335,7 @@ public class DocumentsWriterPerThread { if (deleteSlice != null) { // apply all deletes before we flush and release the delete slice deleteSlice.apply(pendingDeletes, numDocsInRAM); + assert deleteSlice.isEmpty(); deleteSlice = null; } return globalDeletes; @@ -338,6 +344,7 @@ public class DocumentsWriterPerThread { /** Flush all pending docs to a new segment */ FlushedSegment flush() throws IOException { assert numDocsInRAM > 0; + assert deleteSlice == null : "all deletes must be applied in prepareFlush"; flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos, numDocsInRAM, writer.getConfig().getTermIndexInterval(), fieldInfos.buildSegmentCodecs(true), pendingDeletes); diff --git a/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java index 2d6861bf771..7560b00b17f 100644 --- a/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java +++ b/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java @@ -199,7 +199,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { for (int x = 0; x < threads.length; x++) { threads[x].join(); } - assertEquals(" all flushes must be due", 0, flushControl.flushBytes()); assertEquals(numDocumentsToIndex, writer.numDocs()); assertEquals(numDocumentsToIndex, writer.maxDoc()); @@ -334,6 +333,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { } } } + writer.commit(); } catch (Throwable ex) { throw new RuntimeException(ex); }