diff --git a/CHANGES.txt b/CHANGES.txt index bac10f8c041..4ee86022fb2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -179,6 +179,11 @@ Optimizations implementations that use SegmentInfos to access an index and acquire a write lock for index modifications. (Michael Busch) +11. LUCENE-1007: Allow flushing in IndexWriter to be triggered by + either RAM usage or document count or both (whichever comes + first), by adding symbolic constant DISABLE_AUTO_FLUSH to disable + one of the flush triggers. (Ning Li via Mike McCandless) + Documentation Build diff --git a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index c001a0bee1c..eec6d7b1fbc 100644 --- a/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -21,7 +21,6 @@ import org.apache.lucene.store.Directory; import java.io.IOException; import java.util.List; -import java.util.LinkedList; import java.util.ArrayList; /** A {@link MergeScheduler} that runs each merge using a diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index d3a191bcf12..87946e2021c 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -126,7 +126,7 @@ final class DocumentsWriter { private int pauseThreads; // Non-zero when we need all threads to // pause (eg to flush) private boolean flushPending; // True when a thread has decided to flush - private boolean postingsIsFull; // True when it's time to write segment + private boolean bufferIsFull; // True when it's time to write segment private PrintStream infoStream; @@ -148,6 +148,11 @@ final class DocumentsWriter { // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; + // Coarse estimates used to measure RAM usage of buffered deletes + private static int OBJECT_HEADER_BYTES = 12; + private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform + private static int BYTES_PER_CHAR = 2; + private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush DocumentsWriter(Directory directory, IndexWriter writer) throws IOException { @@ -165,18 +170,25 @@ final class DocumentsWriter { /** Set how much RAM we can use before flushing. */ void setRAMBufferSizeMB(double mb) { - ramBufferSize = (long) (mb*1024*1024); + if (mb == IndexWriter.DISABLE_AUTO_FLUSH) { + ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH; + } else { + ramBufferSize = (long) (mb*1024*1024); + } } double getRAMBufferSizeMB() { - return ramBufferSize/1024./1024.; + if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) { + return ramBufferSize; + } else { + return ramBufferSize/1024./1024.; + } } /** Set max buffered docs, which means we will flush by * doc count instead of by RAM usage. */ void setMaxBufferedDocs(int count) { maxBufferedDocs = count; - ramBufferSize = 0; } int getMaxBufferedDocs() { @@ -361,7 +373,7 @@ final class DocumentsWriter { threadBindings.clear(); numBytesUsed = 0; balanceRAM(); - postingsIsFull = false; + bufferIsFull = false; flushPending = false; segment = null; numDocsInRAM = 0; @@ -582,7 +594,7 @@ final class DocumentsWriter { } } - if (postingsIsFull && !flushPending) { + if (bufferIsFull && !flushPending) { flushPending = true; doFlushAfter = true; } @@ -961,7 +973,8 @@ final class DocumentsWriter { for(int i=0;i 0.95 * ramBufferSize) + if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH + && numBytesUsed > 0.95 * ramBufferSize) balanceRAM(); } @@ -2137,10 +2150,8 @@ final class DocumentsWriter { // We must at this point commit to flushing to ensure we // always get N docs when we flush by doc count, even if // > 1 thread is adding documents: - /* new merge policy - if (!flushPending && maxBufferedDocs > 0 && numDocsInRAM >= maxBufferedDocs) { - */ - if (!flushPending && ramBufferSize == 0 && numDocsInRAM >= maxBufferedDocs) { + if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH + && numDocsInRAM >= maxBufferedDocs) { flushPending = true; state.doFlushAfter = true; } else @@ -2163,8 +2174,12 @@ final class DocumentsWriter { } } - if (delTerm != null) + if (delTerm != null) { addDeleteTerm(delTerm, state.docID); + if (!state.doFlushAfter) { + state.doFlushAfter = timeToFlushDeletes(); + } + } return state; } @@ -2211,9 +2226,11 @@ final class DocumentsWriter { } // Reset buffered deletes. - synchronized void clearBufferedDeleteTerms() { + synchronized void clearBufferedDeleteTerms() throws IOException { bufferedDeleteTerms.clear(); numBufferedDeleteTerms = 0; + if (numBytesUsed > 0) + resetPostingsData(); } synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { @@ -2236,12 +2253,13 @@ final class DocumentsWriter { } synchronized private boolean timeToFlushDeletes() { - return numBufferedDeleteTerms >= maxBufferedDeleteTerms && setFlushPending(); + return (bufferIsFull + || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH + && numBufferedDeleteTerms >= maxBufferedDeleteTerms)) + && setFlushPending(); } void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { - if (maxBufferedDeleteTerms < 1) - throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1"); this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; } @@ -2285,6 +2303,13 @@ final class DocumentsWriter { Num num = (Num) bufferedDeleteTerms.get(term); if (num == null) { bufferedDeleteTerms.put(term, new Num(docCount)); + // This is coarse approximation of actual bytes used: + numBytesUsed += (term.field().length() + term.text().length()) * BYTES_PER_CHAR + + Integer.SIZE/8 + 5 * OBJECT_HEADER_BYTES + 5 * OBJECT_POINTER_BYTES; + if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH + && numBytesUsed > ramBufferSize) { + bufferIsFull = true; + } } else { num.setNum(docCount); } @@ -2827,7 +2852,7 @@ final class DocumentsWriter { * pools to match the current docs. */ private synchronized void balanceRAM() { - if (ramBufferSize == 0.0 || postingsIsFull) + if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH || bufferIsFull) return; // We free our allocations if we've allocated 5% over @@ -2864,9 +2889,9 @@ final class DocumentsWriter { while(numBytesAlloc > freeLevel) { if (0 == freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == postingsFreeCount) { // Nothing else to free -- must flush now. - postingsIsFull = true; + bufferIsFull = true; if (infoStream != null) - infoStream.println(" nothing to free; now set postingsIsFull"); + infoStream.println(" nothing to free; now set bufferIsFull"); break; } @@ -2909,7 +2934,7 @@ final class DocumentsWriter { " allocMB=" + nf.format(numBytesAlloc/1024./1024.) + " triggerMB=" + nf.format(flushTrigger/1024./1024.)); - postingsIsFull = true; + bufferIsFull = true; } } } diff --git a/src/java/org/apache/lucene/index/IndexModifier.java b/src/java/org/apache/lucene/index/IndexModifier.java index 5fd1447728f..fd87c9e48b1 100644 --- a/src/java/org/apache/lucene/index/IndexModifier.java +++ b/src/java/org/apache/lucene/index/IndexModifier.java @@ -208,7 +208,7 @@ public class IndexModifier { indexWriter.setMergeScheduler(new SerialMergeScheduler()); indexWriter.setInfoStream(infoStream); indexWriter.setUseCompoundFile(useCompoundFile); - if (maxBufferedDocs != 0) + if (maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH) indexWriter.setMaxBufferedDocs(maxBufferedDocs); indexWriter.setMaxFieldLength(maxFieldLength); indexWriter.setMergeFactor(mergeFactor); diff --git a/src/java/org/apache/lucene/index/IndexWriter.java b/src/java/org/apache/lucene/index/IndexWriter.java index d1122cb5bb4..5f69dfb8537 100644 --- a/src/java/org/apache/lucene/index/IndexWriter.java +++ b/src/java/org/apache/lucene/index/IndexWriter.java @@ -37,7 +37,6 @@ import java.util.Set; import java.util.HashSet; import java.util.LinkedList; import java.util.Iterator; -import java.util.ListIterator; import java.util.Map.Entry; /** @@ -205,10 +204,15 @@ public class IndexWriter { public final static int DEFAULT_MERGE_FACTOR = LogMergePolicy.DEFAULT_MERGE_FACTOR; /** - * Default value is 0 (because IndexWriter flushes by RAM - * usage by default). Change using {@link #setMaxBufferedDocs(int)}. + * Value to denote a flush trigger is disabled */ - public final static int DEFAULT_MAX_BUFFERED_DOCS = 0; + public final static int DISABLE_AUTO_FLUSH = -1; + + /** + * Disabled by default (because IndexWriter flushes by RAM usage + * by default). Change using {@link #setMaxBufferedDocs(int)}. + */ + public final static int DEFAULT_MAX_BUFFERED_DOCS = DISABLE_AUTO_FLUSH; /** * Default value is 16 MB (which means flush when buffered @@ -217,9 +221,10 @@ public class IndexWriter { public final static double DEFAULT_RAM_BUFFER_SIZE_MB = 16.0; /** - * Default value is 1000. Change using {@link #setMaxBufferedDeleteTerms(int)}. + * Disabled by default (because IndexWriter flushes by RAM usage + * by default). Change using {@link #setMaxBufferedDeleteTerms(int)}. */ - public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = 1000; + public final static int DEFAULT_MAX_BUFFERED_DELETE_TERMS = DISABLE_AUTO_FLUSH; /** * @deprecated @@ -817,20 +822,28 @@ public class IndexWriter { * indexing. * *

When this is set, the writer will flush every - * maxBufferedDocs added documents and never flush by RAM - * usage.

+ * maxBufferedDocs added documents. Pass in {@link + * #DISABLE_AUTO_FLUSH} to prevent triggering a flush due + * to number of buffered documents. Note that if flushing + * by RAM usage is also enabled, then the flush will be + * triggered by whichever comes first.

* - *

The default value is 0 (writer flushes by RAM - * usage).

+ *

Disabled by default (writer flushes by RAM usage).

* * @throws IllegalArgumentException if maxBufferedDocs is - * smaller than 2 + * enabled but smaller than 2, or it disables maxBufferedDocs + * when ramBufferSize is already disabled * @see #setRAMBufferSizeMB */ public void setMaxBufferedDocs(int maxBufferedDocs) { ensureOpen(); - if (maxBufferedDocs < 2) - throw new IllegalArgumentException("maxBufferedDocs must at least be 2"); + if (maxBufferedDocs != DISABLE_AUTO_FLUSH && maxBufferedDocs < 2) + throw new IllegalArgumentException( + "maxBufferedDocs must at least be 2 when enabled"); + if (maxBufferedDocs == DISABLE_AUTO_FLUSH + && getRAMBufferSizeMB() == DISABLE_AUTO_FLUSH) + throw new IllegalArgumentException( + "at least one of ramBufferSize and maxBufferedDocs must be enabled"); docWriter.setMaxBufferedDocs(maxBufferedDocs); pushMaxBufferedDocs(); } @@ -841,7 +854,7 @@ public class IndexWriter { * as its minMergeDocs, to keep backwards compatibility. */ private void pushMaxBufferedDocs() { - if (docWriter.getRAMBufferSizeMB() == 0.0) { + if (docWriter.getMaxBufferedDocs() != DISABLE_AUTO_FLUSH) { final MergePolicy mp = mergePolicy; if (mp instanceof LogDocMergePolicy) { LogDocMergePolicy lmp = (LogDocMergePolicy) mp; @@ -856,9 +869,8 @@ public class IndexWriter { } /** - * Returns 0 if this writer is flushing by RAM usage, else - * returns the number of buffered added documents that will - * trigger a flush. + * Returns the number of buffered added documents that will + * trigger a flush if enabled. * @see #setMaxBufferedDocs */ public int getMaxBufferedDocs() { @@ -873,20 +885,30 @@ public class IndexWriter { * count and use as large a RAM buffer as you can. * *

When this is set, the writer will flush whenever - * buffered documents use this much RAM.

+ * buffered documents use this much RAM. Pass in {@link + * #DISABLE_AUTO_FLUSH} to prevent triggering a flush due + * to RAM usage. Note that if flushing by document count + * is also enabled, then the flush will be triggered by + * whichever comes first.

* *

The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.

+ * + * @throws IllegalArgumentException if ramBufferSize is + * enabled but non-positive, or it disables ramBufferSize + * when maxBufferedDocs is already disabled */ public void setRAMBufferSizeMB(double mb) { - if (mb <= 0.0) - throw new IllegalArgumentException("ramBufferSize should be > 0.0 MB"); + if (mb != DISABLE_AUTO_FLUSH && mb <= 0.0) + throw new IllegalArgumentException( + "ramBufferSize should be > 0.0 MB when enabled"); + if (mb == DISABLE_AUTO_FLUSH && getMaxBufferedDocs() == DISABLE_AUTO_FLUSH) + throw new IllegalArgumentException( + "at least one of ramBufferSize and maxBufferedDocs must be enabled"); docWriter.setRAMBufferSizeMB(mb); } /** - * Returns 0.0 if this writer is flushing by document - * count, else returns the value set by {@link - * #setRAMBufferSizeMB}. + * Returns the value set by {@link #setRAMBufferSizeMB} if enabled. */ public double getRAMBufferSizeMB() { return docWriter.getRAMBufferSizeMB(); @@ -898,17 +920,24 @@ public class IndexWriter { * buffered in memory at the time, they are merged and a new segment is * created.

- *

The default value is {@link #DEFAULT_MAX_BUFFERED_DELETE_TERMS}. - * @throws IllegalArgumentException if maxBufferedDeleteTerms is smaller than 1

+ *

Disabled by default (writer flushes by RAM usage).

+ * + * @throws IllegalArgumentException if maxBufferedDeleteTerms + * is enabled but smaller than 1 + * @see #setRAMBufferSizeMB */ public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { ensureOpen(); + if (maxBufferedDeleteTerms != DISABLE_AUTO_FLUSH + && maxBufferedDeleteTerms < 1) + throw new IllegalArgumentException( + "maxBufferedDeleteTerms must at least be 1 when enabled"); docWriter.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms); } /** * Returns the number of buffered deleted terms that will - * trigger a flush. + * trigger a flush if enabled. * @see #setMaxBufferedDeleteTerms */ public int getMaxBufferedDeleteTerms() { @@ -1479,13 +1508,7 @@ public class IndexWriter { } } - /** Determines amount of RAM usage by the buffered docs at - * which point we trigger a flush to the index. - */ - private double ramBufferSize = DEFAULT_RAM_BUFFER_SIZE_MB*1024F*1024F; - /** If non-null, information about merges will be printed to this. - */ private PrintStream infoStream = null; private static PrintStream defaultInfoStream = null; diff --git a/src/java/org/apache/lucene/index/LogMergePolicy.java b/src/java/org/apache/lucene/index/LogMergePolicy.java index 9aef168238d..16862d1fefd 100644 --- a/src/java/org/apache/lucene/index/LogMergePolicy.java +++ b/src/java/org/apache/lucene/index/LogMergePolicy.java @@ -18,8 +18,6 @@ package org.apache.lucene.index; */ import java.io.IOException; -import java.util.List; -import java.util.ArrayList; import java.util.Set; import org.apache.lucene.store.Directory; diff --git a/src/java/org/apache/lucene/index/SerialMergeScheduler.java b/src/java/org/apache/lucene/index/SerialMergeScheduler.java index f39d70b12b8..fd3529fe465 100644 --- a/src/java/org/apache/lucene/index/SerialMergeScheduler.java +++ b/src/java/org/apache/lucene/index/SerialMergeScheduler.java @@ -18,7 +18,6 @@ package org.apache.lucene.index; */ import java.io.IOException; -import java.util.LinkedList; /** A {@link MergeScheduler} that simply does each merge * sequentially, using the current thread. */ diff --git a/src/java/org/apache/lucene/index/TermVectorsWriter.java b/src/java/org/apache/lucene/index/TermVectorsWriter.java index 046bac36485..8ccee037f18 100644 --- a/src/java/org/apache/lucene/index/TermVectorsWriter.java +++ b/src/java/org/apache/lucene/index/TermVectorsWriter.java @@ -22,7 +22,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.StringHelper; import java.io.IOException; -import java.util.Vector; final class TermVectorsWriter { diff --git a/src/test/org/apache/lucene/index/TestIndexModifier.java b/src/test/org/apache/lucene/index/TestIndexModifier.java index cc61c9e0dd0..d610f2b9b3c 100644 --- a/src/test/org/apache/lucene/index/TestIndexModifier.java +++ b/src/test/org/apache/lucene/index/TestIndexModifier.java @@ -75,7 +75,7 @@ public class TestIndexModifier extends TestCase { // Lucene defaults: assertNull(i.getInfoStream()); assertTrue(i.getUseCompoundFile()); - assertEquals(0, i.getMaxBufferedDocs()); + assertEquals(IndexWriter.DISABLE_AUTO_FLUSH, i.getMaxBufferedDocs()); assertEquals(10000, i.getMaxFieldLength()); assertEquals(10, i.getMergeFactor()); // test setting properties: diff --git a/src/test/org/apache/lucene/index/TestIndexWriter.java b/src/test/org/apache/lucene/index/TestIndexWriter.java index 7ee20b4f4fb..fbd0dbcbf2e 100644 --- a/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -1151,8 +1151,8 @@ public class TestIndexWriter extends TestCase RAMDirectory dir = new RAMDirectory(); IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); writer.setMaxBufferedDocs(10); + writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); - int lastNumFile = dir.list().length; long lastGen = -1; for(int j=1;j<52;j++) { Document doc = new Document(); @@ -1169,25 +1169,89 @@ public class TestIndexWriter extends TestCase assertTrue(gen > lastGen); lastGen = gen; writer.setRAMBufferSizeMB(0.000001); + writer.setMaxBufferedDocs(IndexWriter.DISABLE_AUTO_FLUSH); } else if (j < 20) { assertTrue(gen > lastGen); lastGen = gen; } else if (20 == j) { writer.setRAMBufferSizeMB(16); + writer.setMaxBufferedDocs(IndexWriter.DISABLE_AUTO_FLUSH); lastGen = gen; } else if (j < 30) { assertEquals(gen, lastGen); } else if (30 == j) { writer.setRAMBufferSizeMB(0.000001); + writer.setMaxBufferedDocs(IndexWriter.DISABLE_AUTO_FLUSH); } else if (j < 40) { assertTrue(gen> lastGen); lastGen = gen; } else if (40 == j) { writer.setMaxBufferedDocs(10); + writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); lastGen = gen; } else if (j < 50) { assertEquals(gen, lastGen); writer.setMaxBufferedDocs(10); + writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); + } else if (50 == j) { + assertTrue(gen > lastGen); + } + } + writer.close(); + dir.close(); + } + + public void testChangingRAMBuffer2() throws IOException { + RAMDirectory dir = new RAMDirectory(); + IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer(), true); + writer.setMaxBufferedDocs(10); + writer.setMaxBufferedDeleteTerms(10); + writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); + + for(int j=1;j<52;j++) { + Document doc = new Document(); + doc.add(new Field("field", "aaa" + j, Field.Store.YES, Field.Index.TOKENIZED)); + writer.addDocument(doc); + } + + long lastGen = -1; + for(int j=1;j<52;j++) { + writer.deleteDocuments(new Term("field", "aaa" + j)); + _TestUtil.syncConcurrentMerges(writer); + long gen = SegmentInfos.generationFromSegmentsFileName(SegmentInfos.getCurrentSegmentFileName(dir.list())); + if (j == 1) + lastGen = gen; + else if (j < 10) { + // No new files should be created + assertEquals(gen, lastGen); + } else if (10 == j) { + assertTrue(gen > lastGen); + lastGen = gen; + writer.setRAMBufferSizeMB(0.000001); + writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + } else if (j < 20) { + assertTrue(gen > lastGen); + lastGen = gen; + } else if (20 == j) { + writer.setRAMBufferSizeMB(16); + writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + lastGen = gen; + } else if (j < 30) { + assertEquals(gen, lastGen); + } else if (30 == j) { + writer.setRAMBufferSizeMB(0.000001); + writer.setMaxBufferedDeleteTerms(IndexWriter.DISABLE_AUTO_FLUSH); + } else if (j < 40) { + assertTrue(gen> lastGen); + lastGen = gen; + } else if (40 == j) { + writer.setMaxBufferedDeleteTerms(10); + writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); + lastGen = gen; + } else if (j < 50) { + assertEquals(gen, lastGen); + writer.setMaxBufferedDeleteTerms(10); + writer.setRAMBufferSizeMB(IndexWriter.DISABLE_AUTO_FLUSH); } else if (50 == j) { assertTrue(gen > lastGen); }