LUCENE-6166: deletions (alone) can now trigger new merges

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1650475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-01-09 10:01:14 +00:00
parent e947b4ad66
commit c242a300e0
3 changed files with 143 additions and 17 deletions

View File

@ -138,6 +138,8 @@ New Features
QueryScorer and WeighedSpanTermExtractor now have setUsePayloads(bool). QueryScorer and WeighedSpanTermExtractor now have setUsePayloads(bool).
(David Smiley) (David Smiley)
* LUCENE-6166: Deletions (alone) can now trigger new merges. (Mike McCandless)
Optimizations Optimizations
* LUCENE-5960: Use a more efficient bitset, not a Set<Integer>, to * LUCENE-5960: Use a more efficient bitset, not a Set<Integer>, to

View File

@ -402,7 +402,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
poolReaders = true; poolReaders = true;
DirectoryReader r = null; DirectoryReader r = null;
doBeforeFlush(); doBeforeFlush();
boolean anySegmentFlushed = false; boolean anyChanges = false;
/* /*
* for releasing a NRT reader we must ensure that * for releasing a NRT reader we must ensure that
* DW doesn't add any segments or deletes until we are * DW doesn't add any segments or deletes until we are
@ -415,8 +415,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) { synchronized (fullFlushLock) {
boolean success = false; boolean success = false;
try { try {
anySegmentFlushed = docWriter.flushAllThreads(); anyChanges = docWriter.flushAllThreads();
if (!anySegmentFlushed) { if (!anyChanges) {
// prevent double increment since docWriter#doFlush increments the flushcount // prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything. // if we flushed anything.
flushCount.incrementAndGet(); flushCount.incrementAndGet();
@ -426,7 +426,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// reader; in theory we could instead do similar retry logic, // reader; in theory we could instead do similar retry logic,
// just like we do when loading segments_N // just like we do when loading segments_N
synchronized(this) { synchronized(this) {
maybeApplyDeletes(applyAllDeletes); anyChanges |= maybeApplyDeletes(applyAllDeletes);
r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes); r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes);
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r); infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
@ -450,7 +450,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
} }
if (anySegmentFlushed) { if (anyChanges) {
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
} }
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -2991,6 +2991,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
/** Returns true a segment was flushed or deletes were applied. */
private boolean doFlush(boolean applyAllDeletes) throws IOException { private boolean doFlush(boolean applyAllDeletes) throws IOException {
if (tragedy != null) { if (tragedy != null) {
throw new IllegalStateException("this writer hit an unrecoverable error; cannot flush", tragedy); throw new IllegalStateException("this writer hit an unrecoverable error; cannot flush", tragedy);
@ -3005,12 +3006,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes); infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes);
infoStream.message("IW", " index before flush " + segString()); infoStream.message("IW", " index before flush " + segString());
} }
final boolean anySegmentFlushed; boolean anyChanges = false;
synchronized (fullFlushLock) { synchronized (fullFlushLock) {
boolean flushSuccess = false; boolean flushSuccess = false;
try { try {
anySegmentFlushed = docWriter.flushAllThreads(); anyChanges = docWriter.flushAllThreads();
if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
flushSuccess = true; flushSuccess = true;
} finally { } finally {
docWriter.finishFullFlush(flushSuccess); docWriter.finishFullFlush(flushSuccess);
@ -3018,14 +3023,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
synchronized(this) { synchronized(this) {
maybeApplyDeletes(applyAllDeletes); anyChanges |= maybeApplyDeletes(applyAllDeletes);
doAfterFlush(); doAfterFlush();
if (!anySegmentFlushed) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
success = true; success = true;
return anySegmentFlushed; return anyChanges;
} }
} catch (AbortingException | OutOfMemoryError tragedy) { } catch (AbortingException | OutOfMemoryError tragedy) {
tragicEvent(tragedy, "doFlush"); tragicEvent(tragedy, "doFlush");
@ -3040,18 +3041,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException { final synchronized boolean maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
if (applyAllDeletes) { if (applyAllDeletes) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "apply all deletes during flush"); infoStream.message("IW", "apply all deletes during flush");
} }
applyAllDeletesAndUpdates(); return applyAllDeletesAndUpdates();
} else if (infoStream.isEnabled("IW")) { } else if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "don't apply deletes now delTermCount=" + bufferedUpdatesStream.numTerms() + " bytesUsed=" + bufferedUpdatesStream.ramBytesUsed()); infoStream.message("IW", "don't apply deletes now delTermCount=" + bufferedUpdatesStream.numTerms() + " bytesUsed=" + bufferedUpdatesStream.ramBytesUsed());
} }
return false;
} }
final synchronized void applyAllDeletesAndUpdates() throws IOException { final synchronized boolean applyAllDeletesAndUpdates() throws IOException {
flushDeletesCount.incrementAndGet(); flushDeletesCount.incrementAndGet();
final BufferedUpdatesStream.ApplyDeletesResult result; final BufferedUpdatesStream.ApplyDeletesResult result;
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -3079,6 +3082,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
checkpoint(); checkpoint();
} }
bufferedUpdatesStream.prune(segmentInfos); bufferedUpdatesStream.prune(segmentInfos);
return result.anyDeletes;
} }
// for testing only // for testing only
@ -4575,7 +4579,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try { try {
purge(forcePurge); purge(forcePurge);
} finally { } finally {
applyAllDeletesAndUpdates(); if (applyAllDeletesAndUpdates()) {
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
flushCount.incrementAndGet(); flushCount.incrementAndGet();
} }
} }

View File

@ -1242,4 +1242,122 @@ public class TestIndexWriterDelete extends LuceneTestCase {
r.close(); r.close();
d.close(); d.close();
} }
public void testOnlyDeletesTriggersMergeOnClose() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMaxBufferedDocs(2);
LogDocMergePolicy mp = new LogDocMergePolicy();
mp.setMinMergeDocs(1);
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(new SerialMergeScheduler());
IndexWriter w = new IndexWriter(dir, iwc);
for(int i=0;i<38;i++) {
Document doc = new Document();
doc.add(newStringField("id", ""+i, Field.Store.NO));
w.addDocument(doc);
}
w.commit();
for(int i=0;i<18;i++) {
w.deleteDocuments(new Term("id", ""+i));
}
w.close();
DirectoryReader r = DirectoryReader.open(dir);
assertEquals(1, r.leaves().size());
r.close();
dir.close();
}
public void testOnlyDeletesTriggersMergeOnGetReader() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMaxBufferedDocs(2);
LogDocMergePolicy mp = new LogDocMergePolicy();
mp.setMinMergeDocs(1);
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(new SerialMergeScheduler());
IndexWriter w = new IndexWriter(dir, iwc);
for(int i=0;i<38;i++) {
Document doc = new Document();
doc.add(newStringField("id", ""+i, Field.Store.NO));
w.addDocument(doc);
}
w.commit();
for(int i=0;i<18;i++) {
w.deleteDocuments(new Term("id", ""+i));
}
// First one triggers, but does not reflect, the merge:
DirectoryReader.open(w, true).close();
IndexReader r =DirectoryReader.open(w, true);
assertEquals(1, r.leaves().size());
r.close();
w.close();
dir.close();
}
public void testOnlyDeletesTriggersMergeOnFlush() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMaxBufferedDocs(2);
LogDocMergePolicy mp = new LogDocMergePolicy();
mp.setMinMergeDocs(1);
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(new SerialMergeScheduler());
iwc.setMaxBufferedDeleteTerms(18);
IndexWriter w = new IndexWriter(dir, iwc);
for(int i=0;i<38;i++) {
Document doc = new Document();
doc.add(newStringField("id", ""+i, Field.Store.NO));
w.addDocument(doc);
}
w.commit();
for(int i=0;i<18;i++) {
w.deleteDocuments(new Term("id", ""+i));
}
w.commit();
DirectoryReader r = DirectoryReader.open(dir);
assertEquals(1, r.leaves().size());
r.close();
w.close();
dir.close();
}
public void testOnlyDeletesDeleteAllDocs() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setMaxBufferedDocs(2);
LogDocMergePolicy mp = new LogDocMergePolicy();
mp.setMinMergeDocs(1);
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(new SerialMergeScheduler());
iwc.setMaxBufferedDeleteTerms(18);
IndexWriter w = new IndexWriter(dir, iwc);
for(int i=0;i<38;i++) {
Document doc = new Document();
doc.add(newStringField("id", ""+i, Field.Store.NO));
w.addDocument(doc);
}
w.commit();
for(int i=0;i<38;i++) {
w.deleteDocuments(new Term("id", ""+i));
}
DirectoryReader r = DirectoryReader.open(w, true);
assertEquals(0, r.leaves().size());
assertEquals(0, r.maxDoc());
r.close();
w.close();
dir.close();
}
} }