From 4f68df411ef45efba5eb10045768560c9918e739 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 10 Oct 2012 07:55:20 +0000 Subject: [PATCH] LUCENE-4462: Flush Deletes, SegmentInfos and build CFS concurrently in DWPT git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1396500 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 7 ++ .../apache/lucene/index/DocumentsWriter.java | 30 ++--- .../index/DocumentsWriterPerThread.java | 77 ++++++++++++- .../org/apache/lucene/index/IndexWriter.java | 103 ++++-------------- 4 files changed, 110 insertions(+), 107 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 8616104530c..49ac5f25fcf 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -16,6 +16,13 @@ Changes in backwards compatibility policy (Nikola Tanković, Uwe Schindler, Chris Male, Mike McCandless, Robert Muir) +Optimizations + +* LUCENE-4462: DocumentsWriter now flushes deletes, segment infos and builds + CFS files if necessary during segment flush and not during publishing. The latter + was a single threaded process while now all IO and CPU heavy computation is done + concurrently in DocumentsWriterPerThread. (Simon Willnauer) + ======================= Lucene 4.1.0 ======================= New Features diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 58dc668abef..e7828420081 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -33,7 +33,10 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FlushInfo; +import org.apache.lucene.store.IOContext; import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.MutableBits; /** * This class accepts multiple added documents and directly @@ -436,16 +439,12 @@ final class DocumentsWriter { // This means there is a backlog: the one // thread in innerPurge can't keep up with all // other threads flushing segments. In this case - // we forcefully stall the producers. But really - // this means we have a concurrency issue - // (TestBagOfPostings can provoke this): - // publishing a flush segment is too heavy today - // (it builds CFS, writes .si, etc.) ... we need - // to make those ops concurrent too: + // we forcefully stall the producers. ticketQueue.forcePurge(this); } else { ticketQueue.tryPurge(this); } + } finally { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); @@ -471,6 +470,7 @@ final class DocumentsWriter { return maybeMerge; } + void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) throws IOException { @@ -505,23 +505,17 @@ final class DocumentsWriter { throws IOException { assert newSegment != null; assert newSegment.segmentInfo != null; + final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes; //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name); - final SegmentInfoPerCommit segInfo = indexWriter.prepareFlushedSegment(newSegment); - final BufferedDeletes deletes = newSegment.segmentDeletes; if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + deletes); + infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes); } - FrozenBufferedDeletes packet = null; - if (deletes != null && deletes.any()) { - // Segment private delete - packet = new FrozenBufferedDeletes(deletes, true); - if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "flush: push buffered seg private deletes: " + packet); - } + + if (segmentDeletes != null && infoStream.isEnabled("DW")) { + infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes); } - // now publish! - indexWriter.publishFlushedSegment(segInfo, packet, globalPacket); + indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket); } // for asserts diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 772b9c056bd..705a42c9eea 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.text.NumberFormat; +import java.util.Collection; import java.util.HashSet; import java.util.Locale; @@ -118,7 +119,7 @@ class DocumentsWriterPerThread { static class FlushedSegment { final SegmentInfoPerCommit segmentInfo; final FieldInfos fieldInfos; - final BufferedDeletes segmentDeletes; + final FrozenBufferedDeletes segmentDeletes; final MutableBits liveDocs; final int delCount; @@ -126,7 +127,7 @@ class DocumentsWriterPerThread { BufferedDeletes segmentDeletes, MutableBits liveDocs, int delCount) { this.segmentInfo = segmentInfo; this.fieldInfos = fieldInfos; - this.segmentDeletes = segmentDeletes; + this.segmentDeletes = segmentDeletes != null && segmentDeletes.any() ? new FrozenBufferedDeletes(segmentDeletes, true) : null; this.liveDocs = liveDocs; this.delCount = delCount; } @@ -519,6 +520,7 @@ class DocumentsWriterPerThread { FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush); + sealFlushedSegment(fs); doAfterFlush(); success = true; @@ -526,14 +528,79 @@ class DocumentsWriterPerThread { } finally { if (!success) { if (segmentInfo != null) { - synchronized(parent.indexWriter) { - parent.indexWriter.deleter.refresh(segmentInfo.name); - } + writer.flushFailed(segmentInfo); } abort(); } } } + + /** + * Seals the {@link SegmentInfo} for the new flushed segment and persists + * the deleted documents {@link MutableBits}. + */ + void sealFlushedSegment(FlushedSegment flushedSegment) throws IOException { + assert flushedSegment != null; + + SegmentInfoPerCommit newSegment = flushedSegment.segmentInfo; + + IndexWriter.setDiagnostics(newSegment.info, "flush"); + + IOContext context = new IOContext(new FlushInfo(newSegment.info.getDocCount(), newSegment.info.sizeInBytes())); + + boolean success = false; + try { + if (writer.useCompoundFile(newSegment)) { + + // Now build compound file + Collection oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context); + newSegment.info.setUseCompoundFile(true); + writer.deleteNewFiles(oldFiles); + } + + // Have codec write SegmentInfo. Must do this after + // creating CFS so that 1) .si isn't slurped into CFS, + // and 2) .si reflects useCompoundFile=true change + // above: + codec.segmentInfoFormat().getSegmentInfoWriter().write(directory, newSegment.info, flushedSegment.fieldInfos, context); + + // TODO: ideally we would freeze newSegment here!! + // because any changes after writing the .si will be + // lost... + + // Must write deleted docs after the CFS so we don't + // slurp the del file into CFS: + if (flushedSegment.liveDocs != null) { + final int delCount = flushedSegment.delCount; + assert delCount > 0; + if (infoStream.isEnabled("DWPT")) { + infoStream.message("DWPT", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen()); + } + + // TODO: in the NRT case it'd be better to hand + // this del vector over to the + // shortly-to-be-opened SegmentReader and let it + // carry the changes; there's no reason to use + // filesystem as intermediary here. + + SegmentInfoPerCommit info = flushedSegment.segmentInfo; + Codec codec = info.info.getCodec(); + codec.liveDocsFormat().writeLiveDocs(flushedSegment.liveDocs, directory, info, delCount, context); + newSegment.setDelCount(delCount); + newSegment.advanceDelGen(); + } + + success = true; + } finally { + if (!success) { + if (infoStream.isEnabled("DWPT")) { + infoStream.message("DWPT", "hit exception " + + "reating compound file for newly flushed segment " + newSegment.info.name); + } + writer.flushFailed(newSegment.info); + } + } + } /** Get current segment info we are writing. */ SegmentInfo getSegmentInfo() { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 0ba15db616d..ab56cd585fa 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -2120,85 +2120,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { deleter.checkpoint(segmentInfos, false); } - /** - * Prepares the {@link SegmentInfo} for the new flushed segment and persists - * the deleted documents {@link MutableBits}. Use - * {@link #publishFlushedSegment(SegmentInfoPerCommit, FrozenBufferedDeletes, FrozenBufferedDeletes)} to - * publish the returned {@link SegmentInfo} together with its segment private - * delete packet. - * - * @see #publishFlushedSegment(SegmentInfoPerCommit, FrozenBufferedDeletes, FrozenBufferedDeletes) - */ - SegmentInfoPerCommit prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException { - assert flushedSegment != null; - - SegmentInfoPerCommit newSegment = flushedSegment.segmentInfo; - - setDiagnostics(newSegment.info, "flush"); - - IOContext context = new IOContext(new FlushInfo(newSegment.info.getDocCount(), newSegment.info.sizeInBytes())); - - boolean success = false; - try { - if (useCompoundFile(newSegment)) { - - // Now build compound file - Collection oldFiles = createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context); - newSegment.info.setUseCompoundFile(true); - - synchronized(this) { - deleter.deleteNewFiles(oldFiles); - } - } - - // Have codec write SegmentInfo. Must do this after - // creating CFS so that 1) .si isn't slurped into CFS, - // and 2) .si reflects useCompoundFile=true change - // above: - codec.segmentInfoFormat().getSegmentInfoWriter().write(directory, newSegment.info, flushedSegment.fieldInfos, context); - - // TODO: ideally we would freeze newSegment here!! - // because any changes after writing the .si will be - // lost... - - // Must write deleted docs after the CFS so we don't - // slurp the del file into CFS: - if (flushedSegment.liveDocs != null) { - final int delCount = flushedSegment.delCount; - assert delCount > 0; - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen()); - } - - // TODO: in the NRT case it'd be better to hand - // this del vector over to the - // shortly-to-be-opened SegmentReader and let it - // carry the changes; there's no reason to use - // filesystem as intermediary here. - - SegmentInfoPerCommit info = flushedSegment.segmentInfo; - Codec codec = info.info.getCodec(); - codec.liveDocsFormat().writeLiveDocs(flushedSegment.liveDocs, directory, info, delCount, context); - newSegment.setDelCount(delCount); - newSegment.advanceDelGen(); - } - - success = true; - } finally { - if (!success) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "hit exception " + - "reating compound file for newly flushed segment " + newSegment.info.name); - } - - synchronized(this) { - deleter.refresh(newSegment.info.name); - } - } - } - return newSegment; - } - synchronized void publishFrozenDeletes(FrozenBufferedDeletes packet) { assert packet != null && packet.any(); synchronized (bufferedDeletesStream) { @@ -2208,11 +2129,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { /** * Atomically adds the segment private delete packet and publishes the flushed - * segments SegmentInfo to the index writer. NOTE: use - * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the - * {@link SegmentInfo} for the flushed segment. - * - * @see #prepareFlushedSegment(DocumentsWriterPerThread.FlushedSegment) + * segments SegmentInfo to the index writer. */ synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment, FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException { @@ -4253,4 +4170,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { return files; } + + /** + * Tries to delete the given files if unreferenced + * @param files the files to delete + * @throws IOException if an {@link IOException} occurs + * @see IndexFileDeleter#deleteNewFiles(Collection) + */ + synchronized final void deleteNewFiles(Collection files) throws IOException { + deleter.deleteNewFiles(files); + } + + /** + * Cleans up residuals from a segment that could not be entirely flushed due to an error + * @see IndexFileDeleter#refresh(String) + */ + synchronized final void flushFailed(SegmentInfo info) throws IOException { + deleter.refresh(info.name); + } }