LUCENE-2897: apply delete-by-term on flushed segment while we flush (still buffer delete-by-terms for past segments)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1065855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2011-01-31 23:35:02 +00:00
parent 25f16877dd
commit 62b692e9a3
16 changed files with 466 additions and 147 deletions

View File

@ -330,6 +330,9 @@ Optimizations
seek the term dictionary in TermQuery / TermWeight. seek the term dictionary in TermQuery / TermWeight.
(Simon Willnauer, Mike McCandless, Robert Muir) (Simon Willnauer, Mike McCandless, Robert Muir)
* LUCENE-2897: Apply deleted terms while flushing a segment. We still
buffer deleted terms to later apply to past segments. (Mike McCandless)
Bug fixes Bug fixes
* LUCENE-2633: PackedInts Packed32 and Packed64 did not support internal * LUCENE-2633: PackedInts Packed32 and Packed64 did not support internal

View File

@ -18,21 +18,23 @@ package org.apache.lucene.index;
*/ */
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
/** Holds buffered deletes, by docID, term or query for a /* Holds buffered deletes, by docID, term or query for a
* single segment. This is used to hold buffered pending * single segment. This is used to hold buffered pending
* deletes against the to-be-flushed segment as well as * deletes against the to-be-flushed segment. Once the
* per-segment deletes for each segment in the index. */ * deletes are pushed (on flush in DocumentsWriter), these
* deletes are converted to a FrozenDeletes instance. */
// NOTE: we are sync'd by BufferedDeletes, ie, all access to // NOTE: we are sync'd by BufferedDeletes, ie, all access to
// instances of this class is via sync'd methods on // instances of this class is via sync'd methods on
@ -63,13 +65,8 @@ class BufferedDeletes {
undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24; final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24;
// TODO: many of the deletes stored here will map to
// Integer.MAX_VALUE; we could be more efficient for this
// case ie use a SortedSet not a SortedMap. But: Java's
// SortedSet impls are simply backed by a Map so we won't
// save anything unless we do something custom...
final AtomicInteger numTermDeletes = new AtomicInteger(); final AtomicInteger numTermDeletes = new AtomicInteger();
final SortedMap<Term,Integer> terms = new TreeMap<Term,Integer>(); final Map<Term,Integer> terms;
final Map<Query,Integer> queries = new HashMap<Query,Integer>(); final Map<Query,Integer> queries = new HashMap<Query,Integer>();
final List<Integer> docIDs = new ArrayList<Integer>(); final List<Integer> docIDs = new ArrayList<Integer>();
@ -81,6 +78,14 @@ class BufferedDeletes {
long gen; long gen;
public BufferedDeletes(boolean sortTerms) {
if (sortTerms) {
terms = new TreeMap<Term,Integer>();
} else {
terms = new HashMap<Term,Integer>();
}
}
@Override @Override
public String toString() { public String toString() {
if (VERBOSE_DELETES) { if (VERBOSE_DELETES) {
@ -130,6 +135,26 @@ class BufferedDeletes {
// should already be cleared // should already be cleared
} }
void update(FrozenBufferedDeletes in) {
numTermDeletes.addAndGet(in.numTermDeletes);
for(Term term : in.terms) {
if (!terms.containsKey(term)) {
// only incr bytesUsed if this term wasn't already buffered:
bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
}
terms.put(term, MAX_INT);
}
for(int queryIdx=0;queryIdx<in.queries.length;queryIdx++) {
final Query query = in.queries[queryIdx];
if (!queries.containsKey(query)) {
// only incr bytesUsed if this query wasn't already buffered:
bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
}
queries.put(query, MAX_INT);
}
}
public void addQuery(Query query, int docIDUpto) { public void addQuery(Query query, int docIDUpto) {
Integer current = queries.put(query, docIDUpto); Integer current = queries.put(query, docIDUpto);
// increment bytes used only if the query wasn't added so far. // increment bytes used only if the query wasn't added so far.
@ -162,6 +187,43 @@ class BufferedDeletes {
bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length); bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length);
} }
} }
public Iterable<Term> termsIterable() {
return new Iterable<Term>() {
// @Override -- not until Java 1.6
public Iterator<Term> iterator() {
return terms.keySet().iterator();
}
};
}
public Iterable<QueryAndLimit> queriesIterable() {
return new Iterable<QueryAndLimit>() {
// @Override -- not until Java 1.6
public Iterator<QueryAndLimit> iterator() {
return new Iterator<QueryAndLimit>() {
private final Iterator<Map.Entry<Query,Integer>> iter = queries.entrySet().iterator();
// @Override -- not until Java 1.6
public boolean hasNext() {
return iter.hasNext();
}
// @Override -- not until Java 1.6
public QueryAndLimit next() {
final Map.Entry<Query,Integer> ent = iter.next();
return new QueryAndLimit(ent.getKey(), ent.getValue());
}
// @Override -- not until Java 1.6
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
void clear() { void clear() {
terms.clear(); terms.clear();

View File

@ -22,7 +22,6 @@ import java.io.PrintStream;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.Map.Entry;
import java.util.Comparator; import java.util.Comparator;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -52,7 +51,7 @@ import org.apache.lucene.search.Weight;
class BufferedDeletesStream { class BufferedDeletesStream {
// TODO: maybe linked list? // TODO: maybe linked list?
private final List<BufferedDeletes> deletes = new ArrayList<BufferedDeletes>(); private final List<FrozenBufferedDeletes> deletes = new ArrayList<FrozenBufferedDeletes>();
// Starts at 1 so that SegmentInfos that have never had // Starts at 1 so that SegmentInfos that have never had
// deletes applied (whose bufferedDelGen defaults to 0) // deletes applied (whose bufferedDelGen defaults to 0)
@ -83,13 +82,13 @@ class BufferedDeletesStream {
// Appends a new packet of buffered deletes to the stream, // Appends a new packet of buffered deletes to the stream,
// setting its generation: // setting its generation:
public synchronized void push(BufferedDeletes packet) { public synchronized void push(FrozenBufferedDeletes packet) {
assert packet.any(); assert packet.any();
assert checkDeleteStats(); assert checkDeleteStats();
packet.gen = nextGen++; assert packet.gen < nextGen;
deletes.add(packet); deletes.add(packet);
numTerms.addAndGet(packet.numTermDeletes.get()); numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed.get()); bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream != null) { if (infoStream != null) {
message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size()); message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
} }
@ -182,14 +181,14 @@ class BufferedDeletesStream {
while (infosIDX >= 0) { while (infosIDX >= 0) {
//System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX); //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
final BufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null; final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
final SegmentInfo info = infos2.get(infosIDX); final SegmentInfo info = infos2.get(infosIDX);
final long segGen = info.getBufferedDeletesGen(); final long segGen = info.getBufferedDeletesGen();
if (packet != null && segGen < packet.gen) { if (packet != null && segGen < packet.gen) {
//System.out.println(" coalesce"); //System.out.println(" coalesce");
if (coalescedDeletes == null) { if (coalescedDeletes == null) {
coalescedDeletes = new BufferedDeletes(); coalescedDeletes = new BufferedDeletes(true);
} }
coalescedDeletes.update(packet); coalescedDeletes.update(packet);
delIDX--; delIDX--;
@ -202,25 +201,25 @@ class BufferedDeletesStream {
int delCount = 0; int delCount = 0;
try { try {
if (coalescedDeletes != null) { if (coalescedDeletes != null) {
delCount += applyDeletes(coalescedDeletes, reader); //System.out.println(" del coalesced");
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
} }
delCount += applyDeletes(packet, reader); //System.out.println(" del exact");
// Don't delete by Term here; DocumentsWriter
// already did that on flush:
delCount += applyQueryDeletes(packet.queriesIterable(), reader);
} finally { } finally {
readerPool.release(reader); readerPool.release(reader);
} }
anyNewDeletes |= delCount > 0; anyNewDeletes |= delCount > 0;
// We've applied doc ids, and they're only applied
// on the current segment
bytesUsed.addAndGet(-packet.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
packet.clearDocIDs();
if (infoStream != null) { if (infoStream != null) {
message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount); message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount);
} }
if (coalescedDeletes == null) { if (coalescedDeletes == null) {
coalescedDeletes = new BufferedDeletes(); coalescedDeletes = new BufferedDeletes(true);
} }
coalescedDeletes.update(packet); coalescedDeletes.update(packet);
delIDX--; delIDX--;
@ -236,7 +235,8 @@ class BufferedDeletesStream {
SegmentReader reader = readerPool.get(info, false); SegmentReader reader = readerPool.get(info, false);
int delCount = 0; int delCount = 0;
try { try {
delCount += applyDeletes(coalescedDeletes, reader); delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
} finally { } finally {
readerPool.release(reader); readerPool.release(reader);
} }
@ -301,121 +301,122 @@ class BufferedDeletesStream {
message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain"); message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain");
} }
for(int delIDX=0;delIDX<count;delIDX++) { for(int delIDX=0;delIDX<count;delIDX++) {
final BufferedDeletes packet = deletes.get(delIDX); final FrozenBufferedDeletes packet = deletes.get(delIDX);
numTerms.addAndGet(-packet.numTermDeletes.get()); numTerms.addAndGet(-packet.numTermDeletes);
assert numTerms.get() >= 0; assert numTerms.get() >= 0;
bytesUsed.addAndGet(-packet.bytesUsed.get()); bytesUsed.addAndGet(-packet.bytesUsed);
assert bytesUsed.get() >= 0; assert bytesUsed.get() >= 0;
} }
deletes.subList(0, count).clear(); deletes.subList(0, count).clear();
} }
} }
private synchronized long applyDeletes(BufferedDeletes deletes, SegmentReader reader) throws IOException { // Delete by Term
private synchronized long applyTermDeletes(Iterable<Term> termsIter, SegmentReader reader) throws IOException {
long delCount = 0; long delCount = 0;
Fields fields = reader.fields();
if (fields == null) {
// This reader has no postings
return 0;
}
TermsEnum termsEnum = null;
String currentField = null;
DocsEnum docs = null;
assert checkDeleteTerm(null); assert checkDeleteTerm(null);
if (deletes.terms.size() > 0) { for (Term term : termsIter) {
Fields fields = reader.fields(); // Since we visit terms sorted, we gain performance
if (fields == null) { // by re-using the same TermsEnum and seeking only
// This reader has no postings // forwards
return 0; if (term.field() != currentField) {
assert currentField == null || currentField.compareTo(term.field()) < 0;
currentField = term.field();
Terms terms = fields.terms(currentField);
if (terms != null) {
termsEnum = terms.iterator();
} else {
termsEnum = null;
}
} }
TermsEnum termsEnum = null; if (termsEnum == null) {
continue;
String currentField = null; }
DocsEnum docs = null; assert checkDeleteTerm(term);
for (Entry<Term,Integer> entry: deletes.terms.entrySet()) { // System.out.println(" term=" + term);
Term term = entry.getKey();
// Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only
// forwards
if (term.field() != currentField) {
assert currentField == null || currentField.compareTo(term.field()) < 0;
currentField = term.field();
Terms terms = fields.terms(currentField);
if (terms != null) {
termsEnum = terms.iterator();
} else {
termsEnum = null;
}
}
if (termsEnum == null) { if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
continue; DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
}
assert checkDeleteTerm(term);
if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
if (docsEnum != null) { if (docsEnum != null) {
docs = docsEnum; while (true) {
final int limit = entry.getValue(); final int docID = docsEnum.nextDoc();
while (true) { if (docID == DocsEnum.NO_MORE_DOCS) {
final int docID = docs.nextDoc(); break;
if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) {
break;
}
reader.deleteDocument(docID);
// TODO: we could/should change
// reader.deleteDocument to return boolean
// true if it did in fact delete, because here
// we could be deleting an already-deleted doc
// which makes this an upper bound:
delCount++;
} }
reader.deleteDocument(docID);
// TODO: we could/should change
// reader.deleteDocument to return boolean
// true if it did in fact delete, because here
// we could be deleting an already-deleted doc
// which makes this an upper bound:
delCount++;
} }
} }
} }
} }
// Delete by docID
for (Integer docIdInt : deletes.docIDs) {
int docID = docIdInt.intValue();
reader.deleteDocument(docID);
delCount++;
}
// Delete by query
if (deletes.queries.size() > 0) {
IndexSearcher searcher = new IndexSearcher(reader);
assert searcher.getTopReaderContext().isAtomic;
final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
try {
for (Entry<Query, Integer> entry : deletes.queries.entrySet()) {
Query query = entry.getKey();
int limit = entry.getValue().intValue();
Weight weight = query.weight(searcher);
Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
if (scorer != null) {
while(true) {
int doc = scorer.nextDoc();
if (doc >= limit)
break;
reader.deleteDocument(doc);
// TODO: we could/should change
// reader.deleteDocument to return boolean
// true if it did in fact delete, because here
// we could be deleting an already-deleted doc
// which makes this an upper bound:
delCount++;
}
}
}
} finally {
searcher.close();
}
}
return delCount; return delCount;
} }
public static class QueryAndLimit {
public final Query query;
public final int limit;
public QueryAndLimit(Query query, int limit) {
this.query = query;
this.limit = limit;
}
}
// Delete by query
private synchronized long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentReader reader) throws IOException {
long delCount = 0;
IndexSearcher searcher = new IndexSearcher(reader);
assert searcher.getTopReaderContext().isAtomic;
final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
try {
for (QueryAndLimit ent : queriesIter) {
Query query = ent.query;
int limit = ent.limit;
Weight weight = query.weight(searcher);
Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
if (scorer != null) {
while(true) {
int doc = scorer.nextDoc();
if (doc >= limit)
break;
reader.deleteDocument(doc);
// TODO: we could/should change
// reader.deleteDocument to return boolean
// true if it did in fact delete, because here
// we could be deleting an already-deleted doc
// which makes this an upper bound:
delCount++;
}
}
}
} finally {
searcher.close();
}
return delCount;
}
// used only by assert // used only by assert
private boolean checkDeleteTerm(Term term) { private boolean checkDeleteTerm(Term term) {
if (term != null) { if (term != null) {
@ -429,9 +430,9 @@ class BufferedDeletesStream {
private boolean checkDeleteStats() { private boolean checkDeleteStats() {
int numTerms2 = 0; int numTerms2 = 0;
long bytesUsed2 = 0; long bytesUsed2 = 0;
for(BufferedDeletes packet : deletes) { for(FrozenBufferedDeletes packet : deletes) {
numTerms2 += packet.numTermDeletes.get(); numTerms2 += packet.numTermDeletes;
bytesUsed2 += packet.bytesUsed.get(); bytesUsed2 += packet.bytesUsed;
} }
assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get(); assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed; assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;

View File

@ -35,9 +35,11 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMFile; import org.apache.lucene.store.RAMFile;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.RecyclingByteBlockAllocator; import org.apache.lucene.util.RecyclingByteBlockAllocator;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.RamUsageEstimator;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK; import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE; import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
@ -133,8 +135,9 @@ final class DocumentsWriter {
// this, they wait for others to finish first // this, they wait for others to finish first
private final int maxThreadStates; private final int maxThreadStates;
// TODO: cutover to BytesRefHash
// Deletes for our still-in-RAM (to be flushed next) segment // Deletes for our still-in-RAM (to be flushed next) segment
private BufferedDeletes pendingDeletes = new BufferedDeletes(); private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
static class DocState { static class DocState {
DocumentsWriter docWriter; DocumentsWriter docWriter;
@ -336,6 +339,9 @@ final class DocumentsWriter {
return doFlush; return doFlush;
} }
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
boolean deleteTerm(Term term, boolean skipWait) { boolean deleteTerm(Term term, boolean skipWait) {
final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait); final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
synchronized(this) { synchronized(this) {
@ -507,17 +513,19 @@ final class DocumentsWriter {
private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) { private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
// Lock order: DW -> BD // Lock order: DW -> BD
final long delGen = bufferedDeletesStream.getNextGen();
if (pendingDeletes.any()) { if (pendingDeletes.any()) {
if (segmentInfos.size() > 0 || newSegment != null) { if (segmentInfos.size() > 0 || newSegment != null) {
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
if (infoStream != null) { if (infoStream != null) {
message("flush: push buffered deletes"); message("flush: push buffered deletes");
} }
bufferedDeletesStream.push(pendingDeletes); bufferedDeletesStream.push(packet);
if (infoStream != null) { if (infoStream != null) {
message("flush: delGen=" + pendingDeletes.gen); message("flush: delGen=" + packet.gen);
} }
if (newSegment != null) { if (newSegment != null) {
newSegment.setBufferedDeletesGen(pendingDeletes.gen); newSegment.setBufferedDeletesGen(packet.gen);
} }
} else { } else {
if (infoStream != null) { if (infoStream != null) {
@ -527,9 +535,9 @@ final class DocumentsWriter {
// there are no segments, the deletions cannot // there are no segments, the deletions cannot
// affect anything. // affect anything.
} }
pendingDeletes = new BufferedDeletes(); pendingDeletes.clear();
} else if (newSegment != null) { } else if (newSegment != null) {
newSegment.setBufferedDeletesGen(bufferedDeletesStream.getNextGen()); newSegment.setBufferedDeletesGen(delGen);
} }
} }
@ -580,7 +588,19 @@ final class DocumentsWriter {
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos, final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
numDocs, writer.getConfig().getTermIndexInterval(), numDocs, writer.getConfig().getTermIndexInterval(),
SegmentCodecs.build(fieldInfos, writer.codecs)); SegmentCodecs.build(fieldInfos, writer.codecs),
pendingDeletes);
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
// doc, eg if analyzer has some problem w/ the text):
if (pendingDeletes.docIDs.size() > 0) {
flushState.deletedDocs = new BitVector(numDocs);
for(int delDocID : pendingDeletes.docIDs) {
flushState.deletedDocs.set(delDocID);
}
pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
pendingDeletes.docIDs.clear();
}
newSegment = new SegmentInfo(segment, numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false); newSegment = new SegmentInfo(segment, numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false);
@ -592,10 +612,14 @@ final class DocumentsWriter {
double startMBUsed = bytesUsed()/1024./1024.; double startMBUsed = bytesUsed()/1024./1024.;
consumer.flush(threads, flushState); consumer.flush(threads, flushState);
newSegment.setHasVectors(flushState.hasVectors); newSegment.setHasVectors(flushState.hasVectors);
if (infoStream != null) { if (infoStream != null) {
message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors")); message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
if (flushState.deletedDocs != null) {
message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
}
message("flushedFiles=" + newSegment.files()); message("flushedFiles=" + newSegment.files());
message("flushed codecs=" + newSegment.getSegmentCodecs()); message("flushed codecs=" + newSegment.getSegmentCodecs());
} }
@ -616,6 +640,30 @@ final class DocumentsWriter {
newSegment.setUseCompoundFile(true); newSegment.setUseCompoundFile(true);
} }
// Must write deleted docs after the CFS so we don't
// slurp the del file into CFS:
if (flushState.deletedDocs != null) {
final int delCount = flushState.deletedDocs.count();
assert delCount > 0;
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
final String delFileName = newSegment.getDelFileName();
boolean success2 = false;
try {
flushState.deletedDocs.write(directory, delFileName);
success2 = true;
} finally {
if (!success2) {
try {
directory.deleteFile(delFileName);
} catch (Throwable t) {
// suppress this so we keep throwing the
// original exception
}
}
}
}
if (infoStream != null) { if (infoStream != null) {
message("flush: segment=" + newSegment); message("flush: segment=" + newSegment);
final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.; final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;

View File

@ -26,8 +26,9 @@ import java.util.Map;
import org.apache.lucene.index.codecs.FieldsConsumer; import org.apache.lucene.index.codecs.FieldsConsumer;
import org.apache.lucene.index.codecs.PostingsConsumer; import org.apache.lucene.index.codecs.PostingsConsumer;
import org.apache.lucene.index.codecs.TermsConsumer;
import org.apache.lucene.index.codecs.TermStats; import org.apache.lucene.index.codecs.TermStats;
import org.apache.lucene.index.codecs.TermsConsumer;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
@ -108,7 +109,7 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
// If this field has postings then add them to the // If this field has postings then add them to the
// segment // segment
appendPostings(fields, consumer); appendPostings(fieldName, state, fields, consumer);
for(int i=0;i<fields.length;i++) { for(int i=0;i<fields.length;i++) {
TermsHashPerField perField = fields[i].termsHashPerField; TermsHashPerField perField = fields[i].termsHashPerField;
@ -133,7 +134,8 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
/* Walk through all unique text tokens (Posting /* Walk through all unique text tokens (Posting
* instances) found in this field and serialize them * instances) found in this field and serialize them
* into a single RAM segment. */ * into a single RAM segment. */
void appendPostings(FreqProxTermsWriterPerField[] fields, void appendPostings(String fieldName, SegmentWriteState state,
FreqProxTermsWriterPerField[] fields,
FieldsConsumer consumer) FieldsConsumer consumer)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
@ -156,11 +158,20 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
assert result; assert result;
} }
final Term protoTerm = new Term(fieldName);
FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields]; FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions; final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions;
//System.out.println("flush terms field=" + fields[0].fieldInfo.name); //System.out.println("flush terms field=" + fields[0].fieldInfo.name);
final Map<Term,Integer> segDeletes;
if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
segDeletes = state.segDeletes.terms;
} else {
segDeletes = null;
}
// TODO: really TermsHashPerField should take over most // TODO: really TermsHashPerField should take over most
// of this loop, including merge sort of terms from // of this loop, including merge sort of terms from
// multiple threads and interacting with the // multiple threads and interacting with the
@ -195,6 +206,18 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text); final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
final int delDocLimit;
if (segDeletes != null) {
final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
if (docIDUpto != null) {
delDocLimit = docIDUpto;
} else {
delDocLimit = 0;
}
} else {
delDocLimit = 0;
}
// Now termStates has numToMerge FieldMergeStates // Now termStates has numToMerge FieldMergeStates
// which all share the same term. Now we must // which all share the same term. Now we must
// interleave the docID streams. // interleave the docID streams.
@ -214,7 +237,28 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
assert minState.docID < flushedDocCount: "doc=" + minState.docID + " maxDoc=" + flushedDocCount; assert minState.docID < flushedDocCount: "doc=" + minState.docID + " maxDoc=" + flushedDocCount;
// NOTE: we could check here if the docID was
// deleted, and skip it. However, this is somewhat
// dangerous because it can yield non-deterministic
// behavior since we may see the docID before we see
// the term that caused it to be deleted. This
// would mean some (but not all) of its postings may
// make it into the index, which'd alter the docFreq
// for those terms. We could fix this by doing two
// passes, ie first sweep marks all del docs, and
// 2nd sweep does the real flush, but I suspect
// that'd add too much time to flush.
postingsConsumer.startDoc(minState.docID, termDocFreq); postingsConsumer.startDoc(minState.docID, termDocFreq);
if (minState.docID < delDocLimit) {
// Mark it deleted. TODO: we could also skip
// writing its postings; this would be
// deterministic (just for this Term's docs).
if (state.deletedDocs == null) {
state.deletedDocs = new BitVector(state.numDocs);
}
state.deletedDocs.set(minState.docID);
}
final ByteSliceReader prox = minState.prox; final ByteSliceReader prox = minState.prox;

View File

@ -0,0 +1,145 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Iterator;
import java.util.Map;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
/** Holds buffered deletes by term or query, once pushed.
* Pushed deltes are write-once, so we shift to more
* memory efficient data structure to hold them. We don't
* hold docIDs because these are applied on flush. */
class FrozenBufferedDeletes {
/* Rough logic: Term is object w/
String field and String text (OBJ_HEADER + 2*POINTER).
We don't count Term's field since it's interned.
Term's text is String (OBJ_HEADER + 4*INT + POINTER +
OBJ_HEADER + string.length*CHAR). */
final static int BYTES_PER_DEL_TERM = 3*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 4*RamUsageEstimator.NUM_BYTES_INT;
/* Query we often undercount (say 24 bytes), plus int. */
final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_INT + 24;
// Terms, in sorted order:
// TODO: would be more RAM efficient to store BytesRef[],
// per field:
final Term[] terms;
// Parallel array of deleted query, and the docIDUpto for
// each
final Query[] queries;
final int[] queryLimits;
final int bytesUsed;
final int numTermDeletes;
final long gen;
public FrozenBufferedDeletes(BufferedDeletes deletes, long gen) {
terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]);
queries = new Query[deletes.queries.size()];
queryLimits = new int[deletes.queries.size()];
int upto = 0;
for(Map.Entry<Query,Integer> ent : deletes.queries.entrySet()) {
queries[upto] = ent.getKey();
queryLimits[upto] = ent.getValue();
upto++;
}
bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY;
numTermDeletes = deletes.numTermDeletes.get();
this.gen = gen;
}
public Iterable<Term> termsIterable() {
return new Iterable<Term>() {
// @Override -- not until Java 1.6
public Iterator<Term> iterator() {
return new Iterator<Term>() {
private int upto;
// @Override -- not until Java 1.6
public boolean hasNext() {
return upto < terms.length;
}
// @Override -- not until Java 1.6
public Term next() {
return terms[upto++];
}
// @Override -- not until Java 1.6
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
public Iterable<QueryAndLimit> queriesIterable() {
return new Iterable<QueryAndLimit>() {
// @Override -- not until Java 1.6
public Iterator<QueryAndLimit> iterator() {
return new Iterator<QueryAndLimit>() {
private int upto;
// @Override -- not until Java 1.6
public boolean hasNext() {
return upto < queries.length;
}
// @Override -- not until Java 1.6
public QueryAndLimit next() {
QueryAndLimit ret = new QueryAndLimit(queries[upto], queryLimits[upto]);
upto++;
return ret;
}
// @Override -- not until Java 1.6
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public String toString() {
String s = "";
if (numTermDeletes != 0) {
s += " " + numTermDeletes + " deleted terms (unique count=" + terms.length + ")";
}
if (queries.length != 0) {
s += " " + queries.length + " deleted queries";
}
if (bytesUsed != 0) {
s += " bytesUsed=" + bytesUsed;
}
return s;
}
boolean any() {
return terms.length > 0 || queries.length > 0;
}
}

View File

@ -339,7 +339,7 @@ public class IndexWriter implements Closeable {
*/ */
IndexReader getReader(boolean applyAllDeletes) throws IOException { IndexReader getReader(boolean applyAllDeletes) throws IOException {
ensureOpen(); ensureOpen();
final long tStart = System.currentTimeMillis(); final long tStart = System.currentTimeMillis();
if (infoStream != null) { if (infoStream != null) {

View File

@ -661,7 +661,7 @@ public abstract class LogMergePolicy extends MergePolicy {
sb.append("maxMergeSizeForOptimize=").append(maxMergeSizeForOptimize).append(", "); sb.append("maxMergeSizeForOptimize=").append(maxMergeSizeForOptimize).append(", ");
sb.append("calibrateSizeByDeletes=").append(calibrateSizeByDeletes).append(", "); sb.append("calibrateSizeByDeletes=").append(calibrateSizeByDeletes).append(", ");
sb.append("maxMergeDocs=").append(maxMergeDocs).append(", "); sb.append("maxMergeDocs=").append(maxMergeDocs).append(", ");
sb.append("useCompoundFile=").append(useCompoundFile); sb.append("useCompoundFile=").append(useCompoundFile).append(", ");
sb.append("requireContiguousMerge=").append(requireContiguousMerge); sb.append("requireContiguousMerge=").append(requireContiguousMerge);
sb.append("]"); sb.append("]");
return sb.toString(); return sb.toString();

View File

@ -266,7 +266,7 @@ final class SegmentMerger {
// details. // details.
throw new RuntimeException("mergeFields produced an invalid result: docCount is " + docCount + " but fdx file size is " + fdxFileLength + " file=" + fileName + " file exists?=" + directory.fileExists(fileName) + "; now aborting this merge to prevent index corruption"); throw new RuntimeException("mergeFields produced an invalid result: docCount is " + docCount + " but fdx file size is " + fdxFileLength + " file=" + fileName + " file exists?=" + directory.fileExists(fileName) + "; now aborting this merge to prevent index corruption");
segmentWriteState = new SegmentWriteState(null, directory, segment, fieldInfos, docCount, termIndexInterval, codecInfo); segmentWriteState = new SegmentWriteState(null, directory, segment, fieldInfos, docCount, termIndexInterval, codecInfo, null);
return docCount; return docCount;
} }

View File

@ -20,6 +20,7 @@ package org.apache.lucene.index;
import java.io.PrintStream; import java.io.PrintStream;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BitVector;
/** /**
* @lucene.experimental * @lucene.experimental
@ -32,6 +33,16 @@ public class SegmentWriteState {
public final int numDocs; public final int numDocs;
public boolean hasVectors; public boolean hasVectors;
// Deletes to apply while we are flushing the segment. A
// Term is enrolled in here if it was deleted at one
// point, and it's mapped to the docIDUpto, meaning any
// docID < docIDUpto containing this term should be
// deleted.
public final BufferedDeletes segDeletes;
// Lazily created:
public BitVector deletedDocs;
final SegmentCodecs segmentCodecs; final SegmentCodecs segmentCodecs;
public final String codecId; public final String codecId;
@ -57,8 +68,9 @@ public class SegmentWriteState {
public SegmentWriteState(PrintStream infoStream, Directory directory, String segmentName, FieldInfos fieldInfos, public SegmentWriteState(PrintStream infoStream, Directory directory, String segmentName, FieldInfos fieldInfos,
int numDocs, int termIndexInterval, SegmentCodecs segmentCodecs) { int numDocs, int termIndexInterval, SegmentCodecs segmentCodecs, BufferedDeletes segDeletes) {
this.infoStream = infoStream; this.infoStream = infoStream;
this.segDeletes = segDeletes;
this.directory = directory; this.directory = directory;
this.segmentName = segmentName; this.segmentName = segmentName;
this.fieldInfos = fieldInfos; this.fieldInfos = fieldInfos;
@ -80,5 +92,6 @@ public class SegmentWriteState {
termIndexInterval = state.termIndexInterval; termIndexInterval = state.termIndexInterval;
segmentCodecs = state.segmentCodecs; segmentCodecs = state.segmentCodecs;
this.codecId = codecId; this.codecId = codecId;
segDeletes = state.segDeletes;
} }
} }

View File

@ -108,7 +108,7 @@ public class BlockTermsReader extends FieldsProducer {
} }
} }
private String segment; //private String segment;
public BlockTermsReader(TermsIndexReaderBase indexReader, Directory dir, FieldInfos fieldInfos, String segment, PostingsReaderBase postingsReader, int readBufferSize, public BlockTermsReader(TermsIndexReaderBase indexReader, Directory dir, FieldInfos fieldInfos, String segment, PostingsReaderBase postingsReader, int readBufferSize,
Comparator<BytesRef> termComp, int termsCacheSize, String codecId) Comparator<BytesRef> termComp, int termsCacheSize, String codecId)
@ -118,7 +118,7 @@ public class BlockTermsReader extends FieldsProducer {
termsCache = new DoubleBarrelLRUCache<FieldAndTerm,BlockTermState>(termsCacheSize); termsCache = new DoubleBarrelLRUCache<FieldAndTerm,BlockTermState>(termsCacheSize);
this.termComp = termComp; this.termComp = termComp;
this.segment = segment; //this.segment = segment;
in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, BlockTermsWriter.TERMS_EXTENSION), in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, BlockTermsWriter.TERMS_EXTENSION),
readBufferSize); readBufferSize);

View File

@ -157,6 +157,7 @@ public class TestAddIndexes extends LuceneTestCase {
setUpDirs(dir, aux); setUpDirs(dir, aux);
IndexWriter writer = newWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.APPEND)); IndexWriter writer = newWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.APPEND));
writer.setInfoStream(VERBOSE ? System.out : null);
writer.addIndexes(aux); writer.addIndexes(aux);
// Adds 10 docs, then replaces them with another 10 // Adds 10 docs, then replaces them with another 10

View File

@ -589,7 +589,7 @@ public class TestCodecs extends LuceneTestCase {
final int termIndexInterval = _TestUtil.nextInt(random, 13, 27); final int termIndexInterval = _TestUtil.nextInt(random, 13, 27);
final SegmentCodecs codecInfo = SegmentCodecs.build(fieldInfos, CodecProvider.getDefault()); final SegmentCodecs codecInfo = SegmentCodecs.build(fieldInfos, CodecProvider.getDefault());
final SegmentWriteState state = new SegmentWriteState(null, dir, SEGMENT, fieldInfos, 10000, termIndexInterval, codecInfo); final SegmentWriteState state = new SegmentWriteState(null, dir, SEGMENT, fieldInfos, 10000, termIndexInterval, codecInfo, null);
final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state); final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);
Arrays.sort(fields); Arrays.sort(fields);

View File

@ -2576,7 +2576,7 @@ public class TestIndexWriter extends LuceneTestCase {
count++; count++;
} }
} }
assertTrue("flush happened too quickly during " + (doIndexing ? "indexing" : "deleting") + " count=" + count, count > 2500); assertTrue("flush happened too quickly during " + (doIndexing ? "indexing" : "deleting") + " count=" + count, count > 1500);
} }
w.close(); w.close();
dir.close(); dir.close();

View File

@ -157,8 +157,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
assertEquals(0, modifier.getSegmentCount()); assertEquals(0, modifier.getSegmentCount());
modifier.commit(); modifier.commit();
modifier.commit();
IndexReader reader = IndexReader.open(dir, true); IndexReader reader = IndexReader.open(dir, true);
assertEquals(1, reader.numDocs()); assertEquals(1, reader.numDocs());

View File

@ -29,10 +29,14 @@
# #
# Where to get documents from: # Where to get documents from:
content.source=org.apache.lucene.benchmark.byTask.feeds.ReutersContentSource content.source=org.apache.lucene.benchmark.byTask.feeds.EnwikiContentSource
# Where to write the line file output: # Where to write the line file output:
line.file.out=work/reuters.lines.txt line.file.out=/x/tmp/enwiki.out.txt
docs.file=/x/lucene/data/enwiki/enwiki-20110115-pages-articles.xml
keep.image.only.docs = false
# Stop after processing the document feed once: # Stop after processing the document feed once:
content.source.forever=false content.source.forever=false