LUCENE-3340: fix cases where buffered deletes fail to be flushed at the right time, resulting in memory leak

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1151081 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-07-26 13:07:42 +00:00
parent d45e49d2ee
commit 90986568e0
5 changed files with 163 additions and 11 deletions

View File

@ -100,7 +100,7 @@ class BufferedDeletesStream {
numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream != null) {
message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size());
message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size() + " totBytesUsed=" + bytesUsed.get());
}
assert checkDeleteStats();
return packet.delGen();

View File

@ -141,7 +141,7 @@ final class DocumentsWriter {
flushPolicy = configuredPolicy;
}
flushPolicy.init(this);
flushControl = new DocumentsWriterFlushControl(this, config );
flushControl = new DocumentsWriterFlushControl(this, config);
}
synchronized void deleteQueries(final Query... queries) throws IOException {
@ -309,6 +309,9 @@ final class DocumentsWriter {
}
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
if (flushingDWPT != null) {
maybeMerge |= doFlush(flushingDWPT);
} else {
@ -443,10 +446,22 @@ final class DocumentsWriter {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
indexWriter.flushCount.incrementAndGet();
indexWriter.doAfterFlush();
}
flushingDWPT = flushControl.nextPendingFlush();
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = indexWriter.getConfig().getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
applyAllDeletes(deleteQueue);
}
return maybeMerge;
}
@ -601,5 +616,4 @@ final class DocumentsWriter {
}
return true;
}
}

View File

@ -339,7 +339,11 @@ public final class DocumentsWriterFlushControl {
* Returns the number of delete terms in the global pool
*/
public int getNumGlobalTermDeletes() {
return documentsWriter.deleteQueue.numGlobalTermDeletes();
return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
}
public long getDeleteBytesUsed() {
return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
}
synchronized int numFlushingDWPT() {

View File

@ -60,15 +60,11 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
}
}
final DocumentsWriter writer = this.writer.get();
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
if ((flushOnRAM() &&
writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2))) {
control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) {
control.setApplyAllDeletes();
if (writer.infoStream != null) {
writer.message("force apply deletes bytesUsed=" + writer.deleteQueue.bytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
writer.message("force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
}
}
}
@ -82,8 +78,12 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
control.setFlushPending(state);
} else if (flushOnRAM()) {// flush by RAM
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
final long totalRam = control.activeBytes();
final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
if (totalRam >= limit) {
final DocumentsWriter writer = this.writer.get();
if (writer.infoStream != null) {
writer.message("flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
}
markLargestWriterPending(control, state, totalRam);
}
}

View File

@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockAnalyzer;
@ -944,4 +946,136 @@ public class TestIndexWriterDelete extends LuceneTestCase {
w.close();
dir.close();
}
// LUCENE-3340: make sure deletes that we don't apply
// during flush (ie are just pushed into the stream) are
// in fact later flushed due to their RAM usage:
public void testFlushPushedDeletesByRAM() throws Exception {
Directory dir = newDirectory();
// Cannot use RandomIndexWriter because we don't want to
// ever call commit() for this test:
IndexWriter w = new IndexWriter(dir,
newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random))
.setRAMBufferSizeMB(1.0f).setMaxBufferedDocs(1000).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false));
w.setInfoStream(VERBOSE ? System.out : null);
int count = 0;
while(true) {
Document doc = new Document();
doc.add(new Field("id", count+"", Field.Store.NO, Field.Index.NOT_ANALYZED));
final Term delTerm;
if (count == 1010) {
// This is the only delete that applies
delTerm = new Term("id", ""+0);
} else {
// These get buffered, taking up RAM, but delete
// nothing when applied:
delTerm = new Term("id", "x" + count);
}
w.updateDocument(delTerm, doc);
// Eventually segment 0 should get a del docs:
if (dir.fileExists("_0_1.del")) {
if (VERBOSE) {
System.out.println("TEST: deletes created @ count=" + count);
}
break;
}
count++;
// Today we applyDelets @ count=7199; even if we make
// sizable improvements to RAM efficiency of buffered
// del term we're unlikely to go over 100K:
if (count > 100000) {
fail("delete's were not applied");
}
}
w.close();
dir.close();
}
// LUCENE-3340: make sure deletes that we don't apply
// during flush (ie are just pushed into the stream) are
// in fact later flushed due to their RAM usage:
public void testFlushPushedDeletesByCount() throws Exception {
Directory dir = newDirectory();
// Cannot use RandomIndexWriter because we don't want to
// ever call commit() for this test:
final int flushAtDelCount = atLeast(1020);
IndexWriter w = new IndexWriter(dir,
newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
setMaxBufferedDeleteTerms(flushAtDelCount).setMaxBufferedDocs(1000).setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false));
w.setInfoStream(VERBOSE ? System.out : null);
int count = 0;
while(true) {
Document doc = new Document();
doc.add(new Field("id", count+"", Field.Store.NO, Field.Index.NOT_ANALYZED));
final Term delTerm;
if (count == 1010) {
// This is the only delete that applies
delTerm = new Term("id", ""+0);
} else {
// These get buffered, taking up RAM, but delete
// nothing when applied:
delTerm = new Term("id", "x" + count);
}
w.updateDocument(delTerm, doc);
// Eventually segment 0 should get a del docs:
if (dir.fileExists("_0_1.del")) {
break;
}
count++;
if (count > flushAtDelCount) {
fail("delete's were not applied at count=" + flushAtDelCount);
}
}
w.close();
dir.close();
}
// Make sure buffered (pushed) deletes don't use up so
// much RAM that it forces long tail of tiny segments:
public void testApplyDeletesOnFlush() throws Exception {
Directory dir = newDirectory();
// Cannot use RandomIndexWriter because we don't want to
// ever call commit() for this test:
final AtomicInteger docsInSegment = new AtomicInteger();
final AtomicBoolean closing = new AtomicBoolean();
final AtomicBoolean sawAfterFlush = new AtomicBoolean();
IndexWriter w = new IndexWriter(dir,
newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)).
setRAMBufferSizeMB(0.5).setMaxBufferedDocs(-1).setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES).setReaderPooling(false)) {
@Override
public void doAfterFlush() {
assertTrue("only " + docsInSegment.get() + " in segment", closing.get() || docsInSegment.get() >= 10);
docsInSegment.set(0);
sawAfterFlush.set(true);
}
};
w.setInfoStream(VERBOSE ? System.out : null);
int id = 0;
while(true) {
StringBuilder sb = new StringBuilder();
for(int termIDX=0;termIDX<100;termIDX++) {
sb.append(' ').append(_TestUtil.randomRealisticUnicodeString(random));
}
if (id == 500) {
w.deleteDocuments(new Term("id", "0"));
}
Document doc = new Document();
doc.add(newField("id", ""+id, Field.Index.NOT_ANALYZED));
doc.add(newField("body", sb.toString(), Field.Index.ANALYZED));
w.updateDocument(new Term("id", ""+id), doc);
docsInSegment.incrementAndGet();
if (dir.fileExists("_0_1.del")) {
if (VERBOSE) {
System.out.println("TEST: deletes created @ id=" + id);
}
break;
}
id++;
}
closing.set(true);
assertTrue(sawAfterFlush.get());
w.close();
dir.close();
}
}