LUCENE-4462: Flush Deletes, SegmentInfos and build CFS concurrently in DWPT

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1396500 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2012-10-10 07:55:20 +00:00
parent 9d2f4e562a
commit 4f68df411e
4 changed files with 110 additions and 107 deletions

View File

@ -16,6 +16,13 @@ Changes in backwards compatibility policy
(Nikola Tanković, Uwe Schindler, Chris Male, Mike McCandless,
Robert Muir)
Optimizations
* LUCENE-4462: DocumentsWriter now flushes deletes, segment infos and builds
CFS files if necessary during segment flush and not during publishing. The latter
was a single threaded process while now all IO and CPU heavy computation is done
concurrently in DocumentsWriterPerThread. (Simon Willnauer)
======================= Lucene 4.1.0 =======================
New Features

View File

@ -33,7 +33,10 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.MutableBits;
/**
* This class accepts multiple added documents and directly
@ -436,16 +439,12 @@ final class DocumentsWriter {
// This means there is a backlog: the one
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers. But really
// this means we have a concurrency issue
// (TestBagOfPostings can provoke this):
// publishing a flush segment is too heavy today
// (it builds CFS, writes .si, etc.) ... we need
// to make those ops concurrent too:
// we forcefully stall the producers.
ticketQueue.forcePurge(this);
} else {
ticketQueue.tryPurge(this);
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
@ -471,6 +470,7 @@ final class DocumentsWriter {
return maybeMerge;
}
void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
@ -505,23 +505,17 @@ final class DocumentsWriter {
throws IOException {
assert newSegment != null;
assert newSegment.segmentInfo != null;
final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
//System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
final SegmentInfoPerCommit segInfo = indexWriter.prepareFlushedSegment(newSegment);
final BufferedDeletes deletes = newSegment.segmentDeletes;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + deletes);
infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
}
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {
// Segment private delete
packet = new FrozenBufferedDeletes(deletes, true);
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "flush: push buffered seg private deletes: " + packet);
}
if (segmentDeletes != null && infoStream.isEnabled("DW")) {
infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
}
// now publish!
indexWriter.publishFlushedSegment(segInfo, packet, globalPacket);
indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
}
// for asserts

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.Collection;
import java.util.HashSet;
import java.util.Locale;
@ -118,7 +119,7 @@ class DocumentsWriterPerThread {
static class FlushedSegment {
final SegmentInfoPerCommit segmentInfo;
final FieldInfos fieldInfos;
final BufferedDeletes segmentDeletes;
final FrozenBufferedDeletes segmentDeletes;
final MutableBits liveDocs;
final int delCount;
@ -126,7 +127,7 @@ class DocumentsWriterPerThread {
BufferedDeletes segmentDeletes, MutableBits liveDocs, int delCount) {
this.segmentInfo = segmentInfo;
this.fieldInfos = fieldInfos;
this.segmentDeletes = segmentDeletes;
this.segmentDeletes = segmentDeletes != null && segmentDeletes.any() ? new FrozenBufferedDeletes(segmentDeletes, true) : null;
this.liveDocs = liveDocs;
this.delCount = delCount;
}
@ -519,6 +520,7 @@ class DocumentsWriterPerThread {
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
doAfterFlush();
success = true;
@ -526,14 +528,79 @@ class DocumentsWriterPerThread {
} finally {
if (!success) {
if (segmentInfo != null) {
synchronized(parent.indexWriter) {
parent.indexWriter.deleter.refresh(segmentInfo.name);
}
writer.flushFailed(segmentInfo);
}
abort();
}
}
}
/**
* Seals the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link MutableBits}.
*/
void sealFlushedSegment(FlushedSegment flushedSegment) throws IOException {
assert flushedSegment != null;
SegmentInfoPerCommit newSegment = flushedSegment.segmentInfo;
IndexWriter.setDiagnostics(newSegment.info, "flush");
IOContext context = new IOContext(new FlushInfo(newSegment.info.getDocCount(), newSegment.info.sizeInBytes()));
boolean success = false;
try {
if (writer.useCompoundFile(newSegment)) {
// Now build compound file
Collection<String> oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context);
newSegment.info.setUseCompoundFile(true);
writer.deleteNewFiles(oldFiles);
}
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
// above:
codec.segmentInfoFormat().getSegmentInfoWriter().write(directory, newSegment.info, flushedSegment.fieldInfos, context);
// TODO: ideally we would freeze newSegment here!!
// because any changes after writing the .si will be
// lost...
// Must write deleted docs after the CFS so we don't
// slurp the del file into CFS:
if (flushedSegment.liveDocs != null) {
final int delCount = flushedSegment.delCount;
assert delCount > 0;
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen());
}
// TODO: in the NRT case it'd be better to hand
// this del vector over to the
// shortly-to-be-opened SegmentReader and let it
// carry the changes; there's no reason to use
// filesystem as intermediary here.
SegmentInfoPerCommit info = flushedSegment.segmentInfo;
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs(flushedSegment.liveDocs, directory, info, delCount, context);
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "hit exception " +
"reating compound file for newly flushed segment " + newSegment.info.name);
}
writer.flushFailed(newSegment.info);
}
}
}
/** Get current segment info we are writing. */
SegmentInfo getSegmentInfo() {

View File

@ -2120,85 +2120,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
deleter.checkpoint(segmentInfos, false);
}
/**
* Prepares the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link MutableBits}. Use
* {@link #publishFlushedSegment(SegmentInfoPerCommit, FrozenBufferedDeletes, FrozenBufferedDeletes)} to
* publish the returned {@link SegmentInfo} together with its segment private
* delete packet.
*
* @see #publishFlushedSegment(SegmentInfoPerCommit, FrozenBufferedDeletes, FrozenBufferedDeletes)
*/
SegmentInfoPerCommit prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
assert flushedSegment != null;
SegmentInfoPerCommit newSegment = flushedSegment.segmentInfo;
setDiagnostics(newSegment.info, "flush");
IOContext context = new IOContext(new FlushInfo(newSegment.info.getDocCount(), newSegment.info.sizeInBytes()));
boolean success = false;
try {
if (useCompoundFile(newSegment)) {
// Now build compound file
Collection<String> oldFiles = createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context);
newSegment.info.setUseCompoundFile(true);
synchronized(this) {
deleter.deleteNewFiles(oldFiles);
}
}
// Have codec write SegmentInfo. Must do this after
// creating CFS so that 1) .si isn't slurped into CFS,
// and 2) .si reflects useCompoundFile=true change
// above:
codec.segmentInfoFormat().getSegmentInfoWriter().write(directory, newSegment.info, flushedSegment.fieldInfos, context);
// TODO: ideally we would freeze newSegment here!!
// because any changes after writing the .si will be
// lost...
// Must write deleted docs after the CFS so we don't
// slurp the del file into CFS:
if (flushedSegment.liveDocs != null) {
final int delCount = flushedSegment.delCount;
assert delCount > 0;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen());
}
// TODO: in the NRT case it'd be better to hand
// this del vector over to the
// shortly-to-be-opened SegmentReader and let it
// carry the changes; there's no reason to use
// filesystem as intermediary here.
SegmentInfoPerCommit info = flushedSegment.segmentInfo;
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs(flushedSegment.liveDocs, directory, info, delCount, context);
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception " +
"reating compound file for newly flushed segment " + newSegment.info.name);
}
synchronized(this) {
deleter.refresh(newSegment.info.name);
}
}
}
return newSegment;
}
synchronized void publishFrozenDeletes(FrozenBufferedDeletes packet) {
assert packet != null && packet.any();
synchronized (bufferedDeletesStream) {
@ -2208,11 +2129,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer. NOTE: use
* {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
* {@link SegmentInfo} for the flushed segment.
*
* @see #prepareFlushedSegment(DocumentsWriterPerThread.FlushedSegment)
* segments SegmentInfo to the index writer.
*/
synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment,
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
@ -4253,4 +4170,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
return files;
}
/**
* Tries to delete the given files if unreferenced
* @param files the files to delete
* @throws IOException if an {@link IOException} occurs
* @see IndexFileDeleter#deleteNewFiles(Collection)
*/
synchronized final void deleteNewFiles(Collection<String> files) throws IOException {
deleter.deleteNewFiles(files);
}
/**
* Cleans up residuals from a segment that could not be entirely flushed due to an error
* @see IndexFileDeleter#refresh(String)
*/
synchronized final void flushFailed(SegmentInfo info) throws IOException {
deleter.refresh(info.name);
}
}