diff --git a/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java b/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java index 772f0ca02ed..15dbbe20ee1 100644 --- a/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java +++ b/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java @@ -100,7 +100,7 @@ class BufferedDeletesStream { numTerms.addAndGet(packet.numTermDeletes); bytesUsed.addAndGet(packet.bytesUsed); if (infoStream != null) { - message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size()); + message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size() + " totBytesUsed=" + bytesUsed.get()); } assert checkDeleteStats(); return packet.delGen(); diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index d35aef2b0eb..6cfae56701c 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -141,7 +141,7 @@ final class DocumentsWriter { flushPolicy = configuredPolicy; } flushPolicy.init(this); - flushControl = new DocumentsWriterFlushControl(this, config ); + flushControl = new DocumentsWriterFlushControl(this, config); } synchronized void deleteQueries(final Query... queries) throws IOException { @@ -309,6 +309,9 @@ final class DocumentsWriter { } private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { + if (flushControl.doApplyAllDeletes()) { + applyAllDeletes(deleteQueue); + } if (flushingDWPT != null) { maybeMerge |= doFlush(flushingDWPT); } else { @@ -443,10 +446,22 @@ final class DocumentsWriter { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); indexWriter.flushCount.incrementAndGet(); + indexWriter.doAfterFlush(); } flushingDWPT = flushControl.nextPendingFlush(); } + + // If deletes alone are consuming > 1/2 our RAM + // buffer, force them all to apply now. This is to + // prevent too-frequent flushing of a long tail of + // tiny segments: + final double ramBufferSizeMB = indexWriter.getConfig().getRAMBufferSizeMB(); + if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && + flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { + applyAllDeletes(deleteQueue); + } + return maybeMerge; } @@ -601,5 +616,4 @@ final class DocumentsWriter { } return true; } - } diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index d90502ed4fd..4bb8491ea8c 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -339,7 +339,11 @@ public final class DocumentsWriterFlushControl { * Returns the number of delete terms in the global pool */ public int getNumGlobalTermDeletes() { - return documentsWriter.deleteQueue.numGlobalTermDeletes(); + return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms(); + } + + public long getDeleteBytesUsed() { + return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed(); } synchronized int numFlushingDWPT() { diff --git a/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java index 81e3676246b..335a6bdf87b 100644 --- a/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java +++ b/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java @@ -60,15 +60,11 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy { } } final DocumentsWriter writer = this.writer.get(); - // If deletes alone are consuming > 1/2 our RAM - // buffer, force them all to apply now. This is to - // prevent too-frequent flushing of a long tail of - // tiny segments: if ((flushOnRAM() && - writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2))) { + control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) { control.setApplyAllDeletes(); if (writer.infoStream != null) { - writer.message("force apply deletes bytesUsed=" + writer.deleteQueue.bytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); + writer.message("force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); } } } @@ -82,8 +78,12 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy { control.setFlushPending(state); } else if (flushOnRAM()) {// flush by RAM final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); - final long totalRam = control.activeBytes(); + final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); if (totalRam >= limit) { + final DocumentsWriter writer = this.writer.get(); + if (writer.infoStream != null) { + writer.message("flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); + } markLargestWriterPending(control, state, totalRam); } } diff --git a/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java index 087abc608db..bf66897fc1a 100644 --- a/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java +++ b/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.MockAnalyzer; @@ -944,4 +946,136 @@ public class TestIndexWriterDelete extends LuceneTestCase { w.close(); dir.close(); } + + // LUCENE-3340: make sure deletes that we don't apply + // during flush (ie are just pushed into the stream) are + // in fact later flushed due to their RAM usage: + public void testFlushPushedDeletesByRAM() throws Exception { + Directory dir = newDirectory(); + // Cannot use RandomIndexWriter because we don't want to + // ever call commit() for this test: + IndexWriter w = new IndexWriter(dir, + newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)) + .setRAMBufferSizeMB(1.0f).setMaxBufferedDocs(1000).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false)); + w.setInfoStream(VERBOSE ? System.out : null); + int count = 0; + while(true) { + Document doc = new Document(); + doc.add(new Field("id", count+"", Field.Store.NO, Field.Index.NOT_ANALYZED)); + final Term delTerm; + if (count == 1010) { + // This is the only delete that applies + delTerm = new Term("id", ""+0); + } else { + // These get buffered, taking up RAM, but delete + // nothing when applied: + delTerm = new Term("id", "x" + count); + } + w.updateDocument(delTerm, doc); + // Eventually segment 0 should get a del docs: + if (dir.fileExists("_0_1.del")) { + if (VERBOSE) { + System.out.println("TEST: deletes created @ count=" + count); + } + break; + } + count++; + + // Today we applyDelets @ count=7199; even if we make + // sizable improvements to RAM efficiency of buffered + // del term we're unlikely to go over 100K: + if (count > 100000) { + fail("delete's were not applied"); + } + } + w.close(); + dir.close(); + } + + // LUCENE-3340: make sure deletes that we don't apply + // during flush (ie are just pushed into the stream) are + // in fact later flushed due to their RAM usage: + public void testFlushPushedDeletesByCount() throws Exception { + Directory dir = newDirectory(); + // Cannot use RandomIndexWriter because we don't want to + // ever call commit() for this test: + final int flushAtDelCount = atLeast(1020); + IndexWriter w = new IndexWriter(dir, + newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)). + setMaxBufferedDeleteTerms(flushAtDelCount).setMaxBufferedDocs(1000).setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false)); + w.setInfoStream(VERBOSE ? System.out : null); + int count = 0; + while(true) { + Document doc = new Document(); + doc.add(new Field("id", count+"", Field.Store.NO, Field.Index.NOT_ANALYZED)); + final Term delTerm; + if (count == 1010) { + // This is the only delete that applies + delTerm = new Term("id", ""+0); + } else { + // These get buffered, taking up RAM, but delete + // nothing when applied: + delTerm = new Term("id", "x" + count); + } + w.updateDocument(delTerm, doc); + // Eventually segment 0 should get a del docs: + if (dir.fileExists("_0_1.del")) { + break; + } + count++; + if (count > flushAtDelCount) { + fail("delete's were not applied at count=" + flushAtDelCount); + } + } + w.close(); + dir.close(); + } + + // Make sure buffered (pushed) deletes don't use up so + // much RAM that it forces long tail of tiny segments: + public void testApplyDeletesOnFlush() throws Exception { + Directory dir = newDirectory(); + // Cannot use RandomIndexWriter because we don't want to + // ever call commit() for this test: + final AtomicInteger docsInSegment = new AtomicInteger(); + final AtomicBoolean closing = new AtomicBoolean(); + final AtomicBoolean sawAfterFlush = new AtomicBoolean(); + IndexWriter w = new IndexWriter(dir, + newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)). + setRAMBufferSizeMB(0.5).setMaxBufferedDocs(-1).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false)) { + @Override + public void doAfterFlush() { + assertTrue("only " + docsInSegment.get() + " in segment", closing.get() || docsInSegment.get() >= 10); + docsInSegment.set(0); + sawAfterFlush.set(true); + } + }; + w.setInfoStream(VERBOSE ? System.out : null); + int id = 0; + while(true) { + StringBuilder sb = new StringBuilder(); + for(int termIDX=0;termIDX<100;termIDX++) { + sb.append(' ').append(_TestUtil.randomRealisticUnicodeString(random)); + } + if (id == 500) { + w.deleteDocuments(new Term("id", "0")); + } + Document doc = new Document(); + doc.add(newField("id", ""+id, Field.Index.NOT_ANALYZED)); + doc.add(newField("body", sb.toString(), Field.Index.ANALYZED)); + w.updateDocument(new Term("id", ""+id), doc); + docsInSegment.incrementAndGet(); + if (dir.fileExists("_0_1.del")) { + if (VERBOSE) { + System.out.println("TEST: deletes created @ id=" + id); + } + break; + } + id++; + } + closing.set(true); + assertTrue(sawAfterFlush.get()); + w.close(); + dir.close(); + } }