LUCENE-4080: Make SegmentReader.numDeletedDocs() always reliable.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1357195 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2012-07-04 09:39:47 +00:00
parent e96b143b6a
commit 0c05cb7c48
10 changed files with 81 additions and 70 deletions

View File

@ -29,6 +29,7 @@ import org.apache.lucene.document.PackedLongDocValuesField;
import org.apache.lucene.document.ShortDocValuesField;
import org.apache.lucene.document.SortedBytesDocValuesField;
import org.apache.lucene.document.StraightBytesDocValuesField;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DocValues.Source;
import org.apache.lucene.index.DocValues.Type;
import org.apache.lucene.index.DocValues;
@ -106,12 +107,12 @@ public abstract class DocValuesConsumer {
assert mergeState != null;
boolean hasMerged = false;
for(int readerIDX=0;readerIDX<mergeState.readers.size();readerIDX++) {
final org.apache.lucene.index.MergeState.IndexReaderAndLiveDocs reader = mergeState.readers.get(readerIDX);
final AtomicReader reader = mergeState.readers.get(readerIDX);
if (docValues[readerIDX] != null) {
hasMerged = true;
merge(docValues[readerIDX], mergeState.docBase[readerIDX],
reader.reader.maxDoc(), reader.liveDocs);
mergeState.checkAbort.work(reader.reader.maxDoc());
reader.maxDoc(), reader.getLiveDocs());
mergeState.checkAbort.work(reader.maxDoc());
}
}
// only finish if no exception is thrown!

View File

@ -50,7 +50,7 @@ public abstract class PerDocConsumer implements Closeable {
mergeState.fieldInfo = fieldInfo; // set the field we are merging
if (canMerge(fieldInfo)) {
for (int i = 0; i < docValues.length; i++) {
docValues[i] = getDocValuesForMerge(mergeState.readers.get(i).reader, fieldInfo);
docValues[i] = getDocValuesForMerge(mergeState.readers.get(i), fieldInfo);
}
Type docValuesType = getDocValuesType(fieldInfo);
assert docValuesType != null;

View File

@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexableField;
@ -74,9 +75,9 @@ public abstract class StoredFieldsWriter implements Closeable {
* merging (bulk-byte copying, etc). */
public int merge(MergeState mergeState) throws IOException {
int docCount = 0;
for (MergeState.IndexReaderAndLiveDocs reader : mergeState.readers) {
final int maxDoc = reader.reader.maxDoc();
final Bits liveDocs = reader.liveDocs;
for (AtomicReader reader : mergeState.readers) {
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < maxDoc; i++) {
if (liveDocs != null && !liveDocs.get(i)) {
// skip deleted docs
@ -88,7 +89,7 @@ public abstract class StoredFieldsWriter implements Closeable {
// on the fly?
// NOTE: it's very important to first assign to doc then pass it to
// fieldsWriter.addDocument; see LUCENE-1282
Document doc = reader.reader.document(i);
Document doc = reader.document(i);
addDocument(doc, mergeState.fieldInfos);
docCount++;
mergeState.checkAbort.work(300);

View File

@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DocsAndPositionsEnum;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
@ -143,9 +144,9 @@ public abstract class TermVectorsWriter implements Closeable {
* merging (bulk-byte copying, etc). */
public int merge(MergeState mergeState) throws IOException {
int docCount = 0;
for (MergeState.IndexReaderAndLiveDocs reader : mergeState.readers) {
final int maxDoc = reader.reader.maxDoc();
final Bits liveDocs = reader.liveDocs;
for (AtomicReader reader : mergeState.readers) {
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && !liveDocs.get(docID)) {
// skip deleted docs
@ -153,7 +154,7 @@ public abstract class TermVectorsWriter implements Closeable {
}
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
Fields vectors = reader.reader.getTermVectors(docID);
Fields vectors = reader.getTermVectors(docID);
addAllDocVectors(vectors, mergeState.fieldInfos);
docCount++;
mergeState.checkAbort.work(300);

View File

@ -22,6 +22,7 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
@ -224,7 +225,7 @@ public final class Lucene40StoredFieldsWriter extends StoredFieldsWriter {
int rawDocLengths[] = new int[MAX_RAW_MERGE_DOCS];
int idx = 0;
for (MergeState.IndexReaderAndLiveDocs reader : mergeState.readers) {
for (AtomicReader reader : mergeState.readers) {
final SegmentReader matchingSegmentReader = mergeState.matchingSegmentReaders[idx++];
Lucene40StoredFieldsReader matchingFieldsReader = null;
if (matchingSegmentReader != null) {
@ -235,7 +236,7 @@ public final class Lucene40StoredFieldsWriter extends StoredFieldsWriter {
}
}
if (reader.liveDocs != null) {
if (reader.getLiveDocs() != null) {
docCount += copyFieldsWithDeletions(mergeState,
reader, matchingFieldsReader, rawDocLengths);
} else {
@ -251,12 +252,12 @@ public final class Lucene40StoredFieldsWriter extends StoredFieldsWriter {
when merging stored fields */
private final static int MAX_RAW_MERGE_DOCS = 4192;
private int copyFieldsWithDeletions(MergeState mergeState, final MergeState.IndexReaderAndLiveDocs reader,
private int copyFieldsWithDeletions(MergeState mergeState, final AtomicReader reader,
final Lucene40StoredFieldsReader matchingFieldsReader, int rawDocLengths[])
throws IOException {
int docCount = 0;
final int maxDoc = reader.reader.maxDoc();
final Bits liveDocs = reader.liveDocs;
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
assert liveDocs != null;
if (matchingFieldsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@ -296,7 +297,7 @@ public final class Lucene40StoredFieldsWriter extends StoredFieldsWriter {
// on the fly?
// NOTE: it's very important to first assign to doc then pass it to
// fieldsWriter.addDocument; see LUCENE-1282
Document doc = reader.reader.document(j);
Document doc = reader.document(j);
addDocument(doc, mergeState.fieldInfos);
docCount++;
mergeState.checkAbort.work(300);
@ -305,10 +306,10 @@ public final class Lucene40StoredFieldsWriter extends StoredFieldsWriter {
return docCount;
}
private int copyFieldsNoDeletions(MergeState mergeState, final MergeState.IndexReaderAndLiveDocs reader,
private int copyFieldsNoDeletions(MergeState mergeState, final AtomicReader reader,
final Lucene40StoredFieldsReader matchingFieldsReader, int rawDocLengths[])
throws IOException {
final int maxDoc = reader.reader.maxDoc();
final int maxDoc = reader.maxDoc();
int docCount = 0;
if (matchingFieldsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@ -323,7 +324,7 @@ public final class Lucene40StoredFieldsWriter extends StoredFieldsWriter {
for (; docCount < maxDoc; docCount++) {
// NOTE: it's very important to first assign to doc then pass it to
// fieldsWriter.addDocument; see LUCENE-1282
Document doc = reader.reader.document(docCount);
Document doc = reader.document(docCount);
addDocument(doc, mergeState.fieldInfos);
mergeState.checkAbort.work(300);
}

View File

@ -23,6 +23,7 @@ import java.util.Comparator;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
@ -254,7 +255,7 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
int idx = 0;
int numDocs = 0;
for (final MergeState.IndexReaderAndLiveDocs reader : mergeState.readers) {
for (final AtomicReader reader : mergeState.readers) {
final SegmentReader matchingSegmentReader = mergeState.matchingSegmentReaders[idx++];
Lucene40TermVectorsReader matchingVectorsReader = null;
if (matchingSegmentReader != null) {
@ -264,7 +265,7 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
matchingVectorsReader = (Lucene40TermVectorsReader) vectorsReader;
}
}
if (reader.liveDocs != null) {
if (reader.getLiveDocs() != null) {
numDocs += copyVectorsWithDeletions(mergeState, matchingVectorsReader, reader, rawDocLengths, rawDocLengths2);
} else {
numDocs += copyVectorsNoDeletions(mergeState, matchingVectorsReader, reader, rawDocLengths, rawDocLengths2);
@ -280,12 +281,12 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
private int copyVectorsWithDeletions(MergeState mergeState,
final Lucene40TermVectorsReader matchingVectorsReader,
final MergeState.IndexReaderAndLiveDocs reader,
final AtomicReader reader,
int rawDocLengths[],
int rawDocLengths2[])
throws IOException {
final int maxDoc = reader.reader.maxDoc();
final Bits liveDocs = reader.liveDocs;
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
int totalNumDocs = 0;
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@ -322,7 +323,7 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
Fields vectors = reader.reader.getTermVectors(docNum);
Fields vectors = reader.getTermVectors(docNum);
addAllDocVectors(vectors, mergeState.fieldInfos);
totalNumDocs++;
mergeState.checkAbort.work(300);
@ -333,11 +334,11 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
private int copyVectorsNoDeletions(MergeState mergeState,
final Lucene40TermVectorsReader matchingVectorsReader,
final MergeState.IndexReaderAndLiveDocs reader,
final AtomicReader reader,
int rawDocLengths[],
int rawDocLengths2[])
throws IOException {
final int maxDoc = reader.reader.maxDoc();
final int maxDoc = reader.maxDoc();
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
int docCount = 0;
@ -352,7 +353,7 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
for (int docNum = 0; docNum < maxDoc; docNum++) {
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
Fields vectors = reader.reader.getTermVectors(docNum);
Fields vectors = reader.getTermVectors(docNum);
addAllDocVectors(vectors, mergeState.fieldInfos);
mergeState.checkAbort.work(300);
}

View File

@ -2881,7 +2881,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
SegmentInfoPerCommit info = sourceSegments.get(i);
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
final int docCount = info.info.getDocCount();
final Bits prevLiveDocs = merge.readerLiveDocs.get(i);
final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs();
final Bits currentLiveDocs;
final ReadersAndLiveDocs rld = readerPool.get(info, false);
// We hold a ref so it should still be in the pool:
@ -3429,7 +3429,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
merge.readers = new ArrayList<SegmentReader>();
merge.readerLiveDocs = new ArrayList<Bits>();
// This is try/finally to make sure merger's readers are
// closed:
@ -3443,7 +3442,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// Hold onto the "live" reader; we will use this to
// commit merged deletes
final ReadersAndLiveDocs rld = readerPool.get(info, true);
final SegmentReader reader = rld.getMergeReader(context);
SegmentReader reader = rld.getMergeReader(context);
assert reader != null;
// Carefully pull the most recent live docs:
@ -3469,11 +3468,33 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
merge.readerLiveDocs.add(liveDocs);
// Deletes might have happened after we pulled the merge reader and
// before we got a read-only copy of the segment's actual live docs
// (taking pending deletes into account). In that case we need to
// make a new reader with updated live docs and del count.
if (reader.numDeletedDocs() != delCount) {
// fix the reader's live docs and del count
assert delCount > reader.numDeletedDocs(); // beware of zombies
SegmentReader newReader = new SegmentReader(info, reader.core, liveDocs, info.info.getDocCount() - delCount);
boolean released = false;
try {
rld.release(reader);
released = true;
} finally {
if (!released) {
newReader.decRef();
}
}
reader = newReader;
}
merge.readers.add(reader);
assert delCount <= info.info.getDocCount(): "delCount=" + delCount + " info.docCount=" + info.info.getDocCount() + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
if (delCount < info.info.getDocCount()) {
merger.add(reader, liveDocs, delCount);
merger.add(reader);
}
segUpto++;
}

View File

@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.apache.lucene.util.SetOnce;
@ -74,7 +73,6 @@ public abstract class MergePolicy implements java.io.Closeable, Cloneable {
int maxNumSegments = -1; // used by IndexWriter
public long estimatedMergeBytes; // used by IndexWriter
List<SegmentReader> readers; // used by IndexWriter
List<Bits> readerLiveDocs; // used by IndexWriter
public final List<SegmentInfoPerCommit> segments;
public final int totalDocCount;
boolean aborted;

View File

@ -31,18 +31,6 @@ import org.apache.lucene.util.packed.PackedInts;
* @lucene.experimental */
public class MergeState {
public static class IndexReaderAndLiveDocs {
public final AtomicReader reader;
public final Bits liveDocs;
public final int numDeletedDocs;
public IndexReaderAndLiveDocs(AtomicReader reader, Bits liveDocs, int numDeletedDocs) {
this.reader = reader;
this.liveDocs = liveDocs;
this.numDeletedDocs = numDeletedDocs;
}
}
public static abstract class DocMap {
private final Bits liveDocs;
@ -50,17 +38,17 @@ public class MergeState {
this.liveDocs = liveDocs;
}
public static DocMap build(IndexReaderAndLiveDocs reader) {
final int maxDoc = reader.reader.maxDoc();
final int numDeletes = reader.numDeletedDocs;
public static DocMap build(AtomicReader reader) {
final int maxDoc = reader.maxDoc();
final int numDeletes = reader.numDeletedDocs();
final int numDocs = maxDoc - numDeletes;
assert reader.liveDocs != null || numDeletes == 0;
assert reader.getLiveDocs() != null || numDeletes == 0;
if (numDeletes == 0) {
return new NoDelDocMap(maxDoc);
} else if (numDeletes < numDocs) {
return buildDelCountDocmap(maxDoc, numDeletes, reader.liveDocs, PackedInts.COMPACT);
return buildDelCountDocmap(maxDoc, numDeletes, reader.getLiveDocs(), PackedInts.COMPACT);
} else {
return buildDirectDocMap(maxDoc, numDocs, reader.liveDocs, PackedInts.COMPACT);
return buildDirectDocMap(maxDoc, numDocs, reader.getLiveDocs(), PackedInts.COMPACT);
}
}
@ -197,7 +185,7 @@ public class MergeState {
public SegmentInfo segmentInfo;
public FieldInfos fieldInfos;
public List<IndexReaderAndLiveDocs> readers; // Readers & liveDocs being merged
public List<AtomicReader> readers; // Readers being merged
public DocMap[] docMaps; // Maps docIDs around deletions
public int[] docBase; // New docID base per reader
public CheckAbort checkAbort;

View File

@ -60,7 +60,7 @@ final class SegmentMerger {
FieldInfos.FieldNumbers fieldNumbers, IOContext context) {
mergeState.segmentInfo = segmentInfo;
mergeState.infoStream = infoStream;
mergeState.readers = new ArrayList<MergeState.IndexReaderAndLiveDocs>();
mergeState.readers = new ArrayList<AtomicReader>();
mergeState.checkAbort = checkAbort;
mergeState.payloadProcessorProvider = payloadProcessorProvider;
directory = dir;
@ -77,12 +77,12 @@ final class SegmentMerger {
final void add(IndexReader reader) {
for (final AtomicReaderContext ctx : reader.getTopReaderContext().leaves()) {
final AtomicReader r = ctx.reader();
mergeState.readers.add(new MergeState.IndexReaderAndLiveDocs(r, r.getLiveDocs(), r.numDeletedDocs()));
mergeState.readers.add(r);
}
}
final void add(SegmentReader reader, Bits liveDocs, int delCount) {
mergeState.readers.add(new MergeState.IndexReaderAndLiveDocs(reader, liveDocs, delCount));
final void add(SegmentReader reader) {
mergeState.readers.add(reader);
}
/**
@ -138,14 +138,14 @@ final class SegmentMerger {
// FieldInfos, then we can do a bulk copy of the
// stored fields:
for (int i = 0; i < numReaders; i++) {
MergeState.IndexReaderAndLiveDocs reader = mergeState.readers.get(i);
AtomicReader reader = mergeState.readers.get(i);
// TODO: we may be able to broaden this to
// non-SegmentReaders, since FieldInfos is now
// required? But... this'd also require exposing
// bulk-copy (TVs and stored fields) API in foreign
// readers..
if (reader.reader instanceof SegmentReader) {
SegmentReader segmentReader = (SegmentReader) reader.reader;
if (reader instanceof SegmentReader) {
SegmentReader segmentReader = (SegmentReader) reader;
boolean same = true;
FieldInfos segmentFieldInfos = segmentReader.getFieldInfos();
for (FieldInfo fi : segmentFieldInfos) {
@ -188,8 +188,7 @@ final class SegmentMerger {
Map<FieldInfo,TypePromoter> docValuesTypes = new HashMap<FieldInfo,TypePromoter>();
Map<FieldInfo,TypePromoter> normValuesTypes = new HashMap<FieldInfo,TypePromoter>();
for (MergeState.IndexReaderAndLiveDocs readerAndLiveDocs : mergeState.readers) {
final AtomicReader reader = readerAndLiveDocs.reader;
for (AtomicReader reader : mergeState.readers) {
FieldInfos readerFieldInfos = reader.getFieldInfos();
for (FieldInfo fi : readerFieldInfos) {
FieldInfo merged = fieldInfosBuilder.add(fi);
@ -283,7 +282,7 @@ final class SegmentMerger {
int i = 0;
while(i < mergeState.readers.size()) {
final MergeState.IndexReaderAndLiveDocs reader = mergeState.readers.get(i);
final AtomicReader reader = mergeState.readers.get(i);
mergeState.docBase[i] = docBase;
final MergeState.DocMap docMap = MergeState.DocMap.build(reader);
@ -291,7 +290,7 @@ final class SegmentMerger {
docBase += docMap.numDocs();
if (mergeState.payloadProcessorProvider != null) {
mergeState.readerPayloadProcessor[i] = mergeState.payloadProcessorProvider.getReaderProcessor(reader.reader);
mergeState.readerPayloadProcessor[i] = mergeState.payloadProcessorProvider.getReaderProcessor(reader);
}
i++;
@ -308,9 +307,9 @@ final class SegmentMerger {
int docBase = 0;
for(int readerIndex=0;readerIndex<mergeState.readers.size();readerIndex++) {
final MergeState.IndexReaderAndLiveDocs r = mergeState.readers.get(readerIndex);
final Fields f = r.reader.fields();
final int maxDoc = r.reader.maxDoc();
final AtomicReader reader = mergeState.readers.get(readerIndex);
final Fields f = reader.fields();
final int maxDoc = reader.maxDoc();
if (f != null) {
slices.add(new ReaderSlice(docBase, maxDoc, readerIndex));
fields.add(f);