LUCENE-6161: speed up resolving deleted terms to doc ids

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1653891 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-01-22 14:54:11 +00:00
parent bea1e9c608
commit f5f2b28541
18 changed files with 790 additions and 439 deletions

View File

@ -41,6 +41,11 @@ Optimizations
* LUCENE-6184: Make BooleanScorer only score windows that contain * LUCENE-6184: Make BooleanScorer only score windows that contain
matches. (Adrien Grand) matches. (Adrien Grand)
* LUCENE-6161: Speed up resolving of deleted terms to docIDs by doing
a combined merge sort between deleted terms and segment terms
instead of a separate merge sort for each segment. In delete-heavy
use cases this can be a sizable speedup. (Mike McCandless)
Other Other
* LUCENE-6193: Collapse identical catch branches in try-catch statements. * LUCENE-6193: Collapse identical catch branches in try-catch statements.

View File

@ -35,7 +35,9 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.PriorityQueue;
/* Tracks the stream of {@link BufferedDeletes}. /* Tracks the stream of {@link BufferedDeletes}.
* When DocumentsWriterPerThread flushes, its buffered * When DocumentsWriterPerThread flushes, its buffered
@ -63,7 +65,7 @@ class BufferedUpdatesStream implements Accountable {
private long nextGen = 1; private long nextGen = 1;
// used only by assert // used only by assert
private Term lastDeleteTerm; private BytesRef lastDeleteTerm;
private final InfoStream infoStream; private final InfoStream infoStream;
private final AtomicLong bytesUsed = new AtomicLong(); private final AtomicLong bytesUsed = new AtomicLong();
@ -92,7 +94,7 @@ class BufferedUpdatesStream implements Accountable {
numTerms.addAndGet(packet.numTermDeletes); numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed); bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + updates.size() + " totBytesUsed=" + bytesUsed.get()); infoStream.message("BD", "push deletes " + packet + " segmentPrivate?=" + packet.isSegmentPrivate + " delGen=" + packet.delGen() + " packetCount=" + updates.size() + " totBytesUsed=" + bytesUsed.get());
} }
assert checkDeleteStats(); assert checkDeleteStats();
return packet.delGen(); return packet.delGen();
@ -147,188 +149,167 @@ class BufferedUpdatesStream implements Accountable {
/** Resolves the buffered deleted Term/Query/docIDs, into /** Resolves the buffered deleted Term/Query/docIDs, into
* actual deleted docIDs in the liveDocs MutableBits for * actual deleted docIDs in the liveDocs MutableBits for
* each SegmentReader. */ * each SegmentReader. */
public synchronized ApplyDeletesResult applyDeletesAndUpdates(IndexWriter.ReaderPool readerPool, List<SegmentCommitInfo> infos) throws IOException { public synchronized ApplyDeletesResult applyDeletesAndUpdates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
final long t0 = System.currentTimeMillis(); final long t0 = System.currentTimeMillis();
if (infos.size() == 0) {
return new ApplyDeletesResult(false, nextGen++, null);
}
assert checkDeleteStats();
if (!any()) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes: no deletes; skipping");
}
return new ApplyDeletesResult(false, nextGen++, null);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + updates.size());
}
final long gen = nextGen++; final long gen = nextGen++;
List<SegmentCommitInfo> infos2 = new ArrayList<>(); if (infos.size() == 0) {
infos2.addAll(infos); return new ApplyDeletesResult(false, gen, null);
Collections.sort(infos2, sortSegInfoByDelGen); }
CoalescedUpdates coalescedDeletes = null; // We only init these on demand, when we find our first deletes that need to be applied:
SegmentState[] segStates = null;
int infosIDX = infos2.size()-1;
int delIDX = updates.size()-1;
long totDelCount = 0; long totDelCount = 0;
long totTermVisitedCount = 0; long totTermVisitedCount = 0;
List<SegmentCommitInfo> allDeleted = null; boolean success = false;
while (infosIDX >= 0) { ApplyDeletesResult result = null;
//System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
final long segStartNS = System.nanoTime(); try {
final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null; if (infoStream.isEnabled("BD")) {
final SegmentCommitInfo info = infos2.get(infosIDX); infoStream.message("BD", String.format(Locale.ROOT, "applyDeletes: open segment readers took %d msec", System.currentTimeMillis()-t0));
final long segGen = info.getBufferedDeletesGen(); }
if (packet != null && segGen < packet.delGen()) { assert checkDeleteStats();
// System.out.println(" coalesce");
if (coalescedDeletes == null) {
coalescedDeletes = new CoalescedUpdates();
}
if (!packet.isSegmentPrivate) {
/*
* Only coalesce if we are NOT on a segment private del packet: the segment private del packet
* must only applied to segments with the same delGen. Yet, if a segment is already deleted
* from the SI since it had no more documents remaining after some del packets younger than
* its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
* removed.
*/
coalescedDeletes.update(packet);
}
delIDX--;
} else if (packet != null && segGen == packet.delGen()) {
assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + segGen;
//System.out.println(" eq");
// Lock order: IW -> BD -> RP
assert readerPool.infoIsLive(info);
final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
long termVisitedCount = 0;
final boolean segAllDeletes;
try {
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
if (coalescedDeletes != null) {
TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
delCount += counts.delCount;
termVisitedCount += counts.termVisitedCount;
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
}
//System.out.println(" del exact");
// Don't delete by Term here; DocumentsWriterPerThread
// already did that on flush:
delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader);
applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), rld, reader, dvUpdates);
applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), rld, reader, dvUpdates);
if (dvUpdates.any()) {
rld.writeFieldUpdates(info.info.dir, dvUpdates);
}
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
assert fullDelCount <= rld.info.info.getDocCount();
segAllDeletes = fullDelCount == rld.info.info.getDocCount();
} finally {
rld.release(reader);
readerPool.release(rld);
}
totDelCount += delCount;
totTermVisitedCount += termVisitedCount;
if (segAllDeletes) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(info);
}
if (!any()) {
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : "")); infoStream.message("BD", "applyDeletes: no segments; skipping");
} }
return new ApplyDeletesResult(false, gen, null);
}
if (coalescedDeletes == null) { if (infoStream.isEnabled("BD")) {
coalescedDeletes = new CoalescedUpdates(); infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + updates.size());
} }
/* infos = sortByDelGen(infos);
* Since we are on a segment private del packet we must not
* update the coalescedDeletes here! We can simply advance to the
* next packet and seginfo.
*/
delIDX--;
infosIDX--;
info.setBufferedDeletesGen(gen);
} else { CoalescedUpdates coalescedUpdates = null;
//System.out.println(" gt"); int infosIDX = infos.size()-1;
int delIDX = updates.size()-1;
if (coalescedDeletes != null) { // Backwards merge sort the segment delGens with the packet delGens in the buffered stream:
// Lock order: IW -> BD -> RP while (infosIDX >= 0) {
assert readerPool.infoIsLive(info); final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
final ReadersAndUpdates rld = readerPool.get(info, true); final SegmentCommitInfo info = infos.get(infosIDX);
final SegmentReader reader = rld.getReader(IOContext.READ); final long segGen = info.getBufferedDeletesGen();
int delCount = 0;
long termVisitedCount = 0; if (packet != null && segGen < packet.delGen()) {
final boolean segAllDeletes; if (!packet.isSegmentPrivate && packet.any()) {
try { /*
TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); * Only coalesce if we are NOT on a segment private del packet: the segment private del packet
delCount += counts.delCount; * must only apply to segments with the same delGen. Yet, if a segment is already deleted
termVisitedCount += counts.termVisitedCount; * from the SI since it had no more documents remaining after some del packets younger than
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); * removed.
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); */
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates); if (coalescedUpdates == null) {
if (dvUpdates.any()) { coalescedUpdates = new CoalescedUpdates();
rld.writeFieldUpdates(info.info.dir, dvUpdates);
} }
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); coalescedUpdates.update(packet);
assert fullDelCount <= rld.info.info.getDocCount(); }
segAllDeletes = fullDelCount == rld.info.info.getDocCount();
} finally { delIDX--;
rld.release(reader); } else if (packet != null && segGen == packet.delGen()) {
readerPool.release(rld); assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + segGen;
if (segStates == null) {
segStates = openSegmentStates(pool, infos);
}
SegmentState segState = segStates[infosIDX];
// Lock order: IW -> BD -> RP
assert pool.infoIsLive(info);
int delCount = 0;
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
if (coalescedUpdates != null) {
delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
applyDocValuesUpdates(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
applyDocValuesUpdates(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
}
delCount += applyQueryDeletes(packet.queriesIterable(), segState);
applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), segState, dvUpdates);
applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), segState, dvUpdates);
if (dvUpdates.any()) {
segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
} }
totDelCount += delCount; totDelCount += delCount;
totTermVisitedCount += termVisitedCount;
if (segAllDeletes) { /*
if (allDeleted == null) { * Since we are on a segment private del packet we must not
allDeleted = new ArrayList<>(); * update the coalescedUpdates here! We can simply advance to the
* next packet and seginfo.
*/
delIDX--;
infosIDX--;
} else {
if (coalescedUpdates != null) {
if (segStates == null) {
segStates = openSegmentStates(pool, infos);
} }
allDeleted.add(info); SegmentState segState = segStates[infosIDX];
// Lock order: IW -> BD -> RP
assert pool.infoIsLive(info);
int delCount = 0;
delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
applyDocValuesUpdates(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
applyDocValuesUpdates(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
if (dvUpdates.any()) {
segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
}
totDelCount += delCount;
} }
if (infoStream.isEnabled("BD")) { infosIDX--;
infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
}
} }
info.setBufferedDeletesGen(gen); }
infosIDX--; // Now apply all term deletes:
if (coalescedUpdates != null && coalescedUpdates.totalTermCount != 0) {
if (segStates == null) {
segStates = openSegmentStates(pool, infos);
}
totTermVisitedCount += applyTermDeletes(coalescedUpdates, segStates);
}
assert checkDeleteStats();
success = true;
} finally {
if (segStates != null) {
result = closeSegmentStates(pool, segStates, success, gen);
} }
} }
assert checkDeleteStats(); if (result == null) {
if (infoStream.isEnabled("BD")) { result = new ApplyDeletesResult(false, gen, null);
infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec for " + infos.size() + " segments, " + totDelCount + " deleted docs, " + totTermVisitedCount + " visited terms");
} }
// assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted); if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT,
"applyDeletes took %d msec for %d segments, %d newly deleted docs (query deletes), %d visited terms, allDeleted=%s",
System.currentTimeMillis()-t0, infos.size(), totDelCount, totTermVisitedCount, result.allDeleted));
}
return result;
}
private List<SegmentCommitInfo> sortByDelGen(List<SegmentCommitInfo> infos) {
infos = new ArrayList<>(infos);
// Smaller delGens come first:
Collections.sort(infos, sortSegInfoByDelGen);
return infos;
} }
synchronized long getNextGen() { synchronized long getNextGen() {
@ -386,97 +367,249 @@ class BufferedUpdatesStream implements Accountable {
} }
} }
private static class TermDeleteCounts { static class SegmentState {
/** How many documents were actually deleted. */ final long delGen;
public final int delCount; final ReadersAndUpdates rld;
final SegmentReader reader;
final int startDelCount;
/** How many terms we checked. */ TermsEnum termsEnum;
public final long termVisitedCount; DocsEnum docsEnum;
BytesRef term;
boolean any;
public TermDeleteCounts(int delCount, long termVisitedCount) { public SegmentState(IndexWriter.ReaderPool pool, SegmentCommitInfo info) throws IOException {
this.delCount = delCount; rld = pool.get(info, true);
this.termVisitedCount = termVisitedCount; startDelCount = rld.getPendingDeleteCount();
reader = rld.getReader(IOContext.READ);
delGen = info.getBufferedDeletesGen();
}
public void finish(IndexWriter.ReaderPool pool) throws IOException {
try {
rld.release(reader);
} finally {
pool.release(rld);
}
} }
} }
// Delete by Term /** Does a merge sort by current term across all segments. */
private synchronized TermDeleteCounts applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException { static class SegmentQueue extends PriorityQueue<SegmentState> {
int delCount = 0; public SegmentQueue(int size) {
long termVisitedCount = 0; super(size);
Fields fields = reader.fields(); }
TermsEnum termsEnum = null; @Override
protected boolean lessThan(SegmentState a, SegmentState b) {
return a.term.compareTo(b.term) < 0;
}
}
String currentField = null; /** Opens SegmentReader and inits SegmentState for each segment. */
DocsEnum docsEnum = null; private SegmentState[] openSegmentStates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
int numReaders = infos.size();
assert checkDeleteTerm(null); SegmentState[] segStates = new SegmentState[numReaders];
boolean success = false;
boolean any = false; try {
for(int i=0;i<numReaders;i++) {
long ns = System.nanoTime(); segStates[i] = new SegmentState(pool, infos.get(i));
for (Term term : termsIter) {
termVisitedCount++;
// Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only
// forwards
if (!term.field().equals(currentField)) {
assert currentField == null || currentField.compareTo(term.field()) < 0;
currentField = term.field();
Terms terms = fields.terms(currentField);
if (terms != null) {
termsEnum = terms.iterator(termsEnum);
} else {
termsEnum = null;
}
} }
success = true;
if (termsEnum == null) { } finally {
// no terms in this field if (success == false) {
continue; for(int j=0;j<numReaders;j++) {
} if (segStates[j] != null) {
try {
assert checkDeleteTerm(term); segStates[j].finish(pool);
} catch (Throwable th) {
// System.out.println(" term=" + term); // suppress so we keep throwing original exc
}
if (termsEnum.seekExact(term.bytes())) {
// we don't need term frequencies for this
docsEnum = termsEnum.docs(rld.getLiveDocs(), docsEnum, DocsEnum.FLAG_NONE);
//System.out.println("BDS: got docsEnum=" + docsEnum);
assert docsEnum != null;
while (true) {
final int docID = docsEnum.nextDoc();
//System.out.println(Thread.currentThread().getName() + " del term=" + term + " doc=" + docID);
if (docID == DocIdSetIterator.NO_MORE_DOCS) {
break;
}
if (!any) {
rld.initWritableLiveDocs();
any = true;
}
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (rld.delete(docID)) {
delCount++;
} }
} }
} }
} }
return new TermDeleteCounts(delCount, termVisitedCount); return segStates;
}
/** Close segment states previously opened with openSegmentStates. */
private ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success, long gen) throws IOException {
int numReaders = segStates.length;
Throwable firstExc = null;
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
for (int j=0;j<numReaders;j++) {
SegmentState segState = segStates[j];
if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
segState.reader.getSegmentInfo().setBufferedDeletesGen(gen);
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
assert fullDelCount <= segState.rld.info.info.getDocCount();
if (fullDelCount == segState.rld.info.info.getDocCount()) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(segState.reader.getSegmentInfo());
}
}
try {
segStates[j].finish(pool);
} catch (Throwable th) {
if (firstExc != null) {
firstExc = th;
}
}
}
if (success) {
// Does nothing if firstExc is null:
IOUtils.reThrow(firstExc);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "applyDeletes: " + totDelCount + " new deleted documents");
}
return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);
}
/** Merge sorts the deleted terms and all segments to resolve terms to docIDs for deletion. */
private synchronized long applyTermDeletes(CoalescedUpdates updates, SegmentState[] segStates) throws IOException {
long startNS = System.nanoTime();
int numReaders = segStates.length;
long delTermVisitedCount = 0;
long segTermVisitedCount = 0;
FieldTermIterator iter = updates.termIterator();
String field = null;
SegmentQueue queue = null;
while (true) {
boolean newField;
newField = iter.next();
if (newField) {
field = iter.field();
if (field == null) {
// No more terms:
break;
}
queue = new SegmentQueue(numReaders);
long segTermCount = 0;
for(int i=0;i<numReaders;i++) {
SegmentState state = segStates[i];
Terms terms = state.reader.fields().terms(field);
if (terms != null) {
segTermCount += terms.size();
state.termsEnum = terms.iterator(state.termsEnum);
state.term = state.termsEnum.next();
if (state.term != null) {
queue.add(state);
}
}
}
assert checkDeleteTerm(null);
}
// Get next term to delete
BytesRef term = iter.term();
assert checkDeleteTerm(term);
delTermVisitedCount++;
long delGen = iter.delGen();
while (queue.size() != 0) {
// Get next term merged across all segments
SegmentState state = queue.top();
segTermVisitedCount++;
int cmp = term.compareTo(state.term);
if (cmp < 0) {
break;
} else if (cmp == 0) {
// fall through
} else {
TermsEnum.SeekStatus status = state.termsEnum.seekCeil(term);
if (status == TermsEnum.SeekStatus.FOUND) {
// fallthrough
} else {
if (status == TermsEnum.SeekStatus.NOT_FOUND) {
state.term = state.termsEnum.term();
queue.updateTop();
} else {
// No more terms in this segment
queue.pop();
}
continue;
}
}
assert state.delGen != delGen;
if (state.delGen < delGen) {
// we don't need term frequencies for this
state.docsEnum = state.termsEnum.docs(state.rld.getLiveDocs(), state.docsEnum, DocsEnum.FLAG_NONE);
assert state.docsEnum != null;
while (true) {
final int docID = state.docsEnum.nextDoc();
if (docID == DocIdSetIterator.NO_MORE_DOCS) {
break;
}
if (!state.any) {
state.rld.initWritableLiveDocs();
state.any = true;
}
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
state.rld.delete(docID);
}
}
state.term = state.termsEnum.next();
if (state.term == null) {
queue.pop();
} else {
queue.updateTop();
}
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyTermDeletes took %.1f msec for %d segments and %d packets; %d del terms visited; %d seg terms visited",
(System.nanoTime()-startNS)/1000000.,
numReaders,
updates.terms.size(),
delTermVisitedCount, segTermVisitedCount));
}
return delTermVisitedCount;
} }
// DocValues updates // DocValues updates
private synchronized void applyDocValuesUpdates(Iterable<? extends DocValuesUpdate> updates, private synchronized void applyDocValuesUpdates(Iterable<? extends DocValuesUpdate> updates,
ReadersAndUpdates rld, SegmentReader reader, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException { SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
Fields fields = reader.fields(); Fields fields = segState.reader.fields();
// TODO: we can process the updates per DV field, from last to first so that // TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update // if multiple terms affect same document for the same field, we add an update
@ -492,7 +625,6 @@ class BufferedUpdatesStream implements Accountable {
TermsEnum termsEnum = null; TermsEnum termsEnum = null;
DocsEnum docsEnum = null; DocsEnum docsEnum = null;
//System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader);
for (DocValuesUpdate update : updates) { for (DocValuesUpdate update : updates) {
Term term = update.term; Term term = update.term;
int limit = update.docIDUpto; int limit = update.docIDUpto;
@ -524,20 +656,16 @@ class BufferedUpdatesStream implements Accountable {
continue; continue;
} }
// System.out.println(" term=" + term);
if (termsEnum.seekExact(term.bytes())) { if (termsEnum.seekExact(term.bytes())) {
// we don't need term frequencies for this // we don't need term frequencies for this
docsEnum = termsEnum.docs(rld.getLiveDocs(), docsEnum, DocsEnum.FLAG_NONE); docsEnum = termsEnum.docs(segState.rld.getLiveDocs(), docsEnum, DocsEnum.FLAG_NONE);
//System.out.println("BDS: got docsEnum=" + docsEnum);
DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type); DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type);
if (dvUpdates == null) { if (dvUpdates == null) {
dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, reader.maxDoc()); dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, segState.reader.maxDoc());
} }
int doc; int doc;
while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
//System.out.println(Thread.currentThread().getName() + " numericDVUpdate term=" + term + " doc=" + docID);
if (doc >= limit) { if (doc >= limit) {
break; // no more docs that can be updated for this term break; // no more docs that can be updated for this term
} }
@ -557,29 +685,27 @@ class BufferedUpdatesStream implements Accountable {
} }
// Delete by query // Delete by query
private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, ReadersAndUpdates rld, final SegmentReader reader) throws IOException { private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentState segState) throws IOException {
long delCount = 0; long delCount = 0;
final LeafReaderContext readerContext = reader.getContext(); final LeafReaderContext readerContext = segState.reader.getContext();
boolean any = false;
for (QueryAndLimit ent : queriesIter) { for (QueryAndLimit ent : queriesIter) {
Query query = ent.query; Query query = ent.query;
int limit = ent.limit; int limit = ent.limit;
final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(readerContext, reader.getLiveDocs()); final DocIdSet docs = new QueryWrapperFilter(query).getDocIdSet(readerContext, segState.reader.getLiveDocs());
if (docs != null) { if (docs != null) {
final DocIdSetIterator it = docs.iterator(); final DocIdSetIterator it = docs.iterator();
if (it != null) { if (it != null) {
while(true) { while (true) {
int doc = it.nextDoc(); int doc = it.nextDoc();
if (doc >= limit) { if (doc >= limit) {
break; break;
} }
if (!any) { if (!segState.any) {
rld.initWritableLiveDocs(); segState.rld.initWritableLiveDocs();
any = true; segState.any = true;
} }
if (segState.rld.delete(doc)) {
if (rld.delete(doc)) {
delCount++; delCount++;
} }
} }
@ -591,12 +717,12 @@ class BufferedUpdatesStream implements Accountable {
} }
// used only by assert // used only by assert
private boolean checkDeleteTerm(Term term) { private boolean checkDeleteTerm(BytesRef term) {
if (term != null) { if (term != null) {
assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) >= 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
} }
// TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert
lastDeleteTerm = term == null ? null : new Term(term.field(), BytesRef.deepCopyOf(term.bytes)); lastDeleteTerm = term == null ? null : BytesRef.deepCopyOf(term);
return true; return true;
} }

View File

@ -28,11 +28,10 @@ import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.MergedIterator;
class CoalescedUpdates { class CoalescedUpdates {
final Map<Query,Integer> queries = new HashMap<>(); final Map<Query,Integer> queries = new HashMap<>();
final List<Iterable<Term>> iterables = new ArrayList<>(); final List<PrefixCodedTerms> terms = new ArrayList<>();
final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>(); final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>();
final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>(); final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>();
int totalTermCount; int totalTermCount;
@ -40,7 +39,7 @@ class CoalescedUpdates {
@Override @Override
public String toString() { public String toString() {
// note: we could add/collect more debugging information // note: we could add/collect more debugging information
return "CoalescedUpdates(termSets=" + iterables.size() return "CoalescedUpdates(termSets=" + terms.size()
+ ",totalTermCount=" + totalTermCount + ",totalTermCount=" + totalTermCount
+ ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size() + ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
+ ",binaryDVUpdates=" + binaryDVUpdates.size() + ")"; + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
@ -48,7 +47,7 @@ class CoalescedUpdates {
void update(FrozenBufferedUpdates in) { void update(FrozenBufferedUpdates in) {
totalTermCount += in.termCount; totalTermCount += in.termCount;
iterables.add(in.termsIterable()); terms.add(in.terms);
for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) { for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
final Query query = in.queries[queryIdx]; final Query query = in.queries[queryIdx];
@ -68,18 +67,12 @@ class CoalescedUpdates {
} }
} }
public Iterable<Term> termsIterable() { public FieldTermIterator termIterator() {
return new Iterable<Term>() { if (terms.size() == 1) {
@SuppressWarnings({"unchecked","rawtypes"}) return terms.get(0).iterator();
@Override } else {
public Iterator<Term> iterator() { return new MergedPrefixCodedTermsIterator(terms);
Iterator<Term> subs[] = new Iterator[iterables.size()]; }
for (int i = 0; i < iterables.size(); i++) {
subs[i] = iterables.get(i).iterator();
}
return new MergedIterator<>(subs);
}
};
} }
public Iterable<QueryAndLimit> queriesIterable() { public Iterable<QueryAndLimit> queriesIterable() {

View File

@ -0,0 +1,40 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.util.BytesRef;
// TODO: maybe TermsFilter could use this?
/** Iterates over terms in multiple fields, notifying the caller when a new field is started. */
interface FieldTermIterator {
/** Advances to the next term, returning true if it's in a new field or there are no more terms. Call {@link #field} to see which
* field; if that returns null then the iteration ended. */
boolean next();
/** Returns current field, or null if the iteration ended. */
String field();
/** Returns current term. */
BytesRef term();
/** Del gen of the current term. */
// TODO: this is really per-iterator not per term, but when we use MergedPrefixCodedTermsIterator we need to know which iterator we are on
long delGen();
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit; import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.RamUsageEstimator;
@ -57,7 +58,7 @@ class FrozenBufferedUpdates {
final int bytesUsed; final int bytesUsed;
final int numTermDeletes; final int numTermDeletes;
private long gen = -1; // assigned by BufferedDeletesStream once pushed private long gen = -1; // assigned by BufferedUpdatesStream once pushed
final boolean isSegmentPrivate; // set to true iff this frozen packet represents final boolean isSegmentPrivate; // set to true iff this frozen packet represents
// a segment private deletes. in that case is should // a segment private deletes. in that case is should
// only have Queries // only have Queries
@ -122,6 +123,7 @@ class FrozenBufferedUpdates {
public void setDelGen(long gen) { public void setDelGen(long gen) {
assert this.gen == -1; assert this.gen == -1;
this.gen = gen; this.gen = gen;
terms.setDelGen(gen);
} }
public long delGen() { public long delGen() {
@ -129,13 +131,8 @@ class FrozenBufferedUpdates {
return gen; return gen;
} }
public Iterable<Term> termsIterable() { public TermIterator termIterator() {
return new Iterable<Term>() { return terms.iterator();
@Override
public Iterator<Term> iterator() {
return terms.iterator();
}
};
} }
public Iterable<QueryAndLimit> queriesIterable() { public Iterable<QueryAndLimit> queriesIterable() {

View File

@ -0,0 +1,134 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.List;
import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
/** Merges multiple {@link FieldTermIterator}s */
class MergedPrefixCodedTermsIterator implements FieldTermIterator {
private static class TermMergeQueue extends PriorityQueue<TermIterator> {
TermMergeQueue(int size) {
super(size);
}
@Override
protected boolean lessThan(TermIterator a, TermIterator b) {
int cmp = a.bytes.compareTo(b.bytes);
if (cmp < 0) {
return true;
} else if (cmp > 0) {
return false;
} else {
return a.delGen() > b.delGen();
}
}
}
private static class FieldMergeQueue extends PriorityQueue<TermIterator> {
FieldMergeQueue(int size) {
super(size);
}
@Override
protected boolean lessThan(TermIterator a, TermIterator b) {
return a.field.compareTo(b.field) < 0;
}
}
final TermMergeQueue termQueue;
final FieldMergeQueue fieldQueue;
public MergedPrefixCodedTermsIterator(List<PrefixCodedTerms> termsList) {
fieldQueue = new FieldMergeQueue(termsList.size());
for (PrefixCodedTerms terms : termsList) {
TermIterator iter = terms.iterator();
iter.next();
if (iter.field != null) {
fieldQueue.add(iter);
}
}
termQueue = new TermMergeQueue(termsList.size());
}
String field;
@Override
public boolean next() {
if (termQueue.size() == 0) {
// Current field is done:
if (fieldQueue.size() == 0) {
// No more fields:
field = null;
return true;
}
// Transfer all iterators on the next field into the term queue:
TermIterator top = fieldQueue.pop();
termQueue.add(top);
assert top.field() != null;
while (fieldQueue.size() != 0 && fieldQueue.top().field.equals(top.field)) {
termQueue.add(fieldQueue.pop());
}
field = top.field;
return true;
} else {
TermIterator top = termQueue.top();
if (top.next()) {
// New field
termQueue.pop();
if (top.field() != null) {
fieldQueue.add(top);
}
} else {
termQueue.updateTop();
}
if (termQueue.size() != 0) {
// Still terms left in this field
return false;
} else {
// Recurse (just once) to go to next field:
return next();
}
}
}
@Override
public BytesRef term() {
return termQueue.top().bytes;
}
@Override
public String field() {
return field;
}
@Override
public long delGen() {
return termQueue.top().delGen();
}
}

View File

@ -18,7 +18,6 @@ package org.apache.lucene.index;
*/ */
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RAMFile; import org.apache.lucene.store.RAMFile;
@ -32,8 +31,9 @@ import org.apache.lucene.util.BytesRefBuilder;
* Prefix codes term instances (prefixes are shared) * Prefix codes term instances (prefixes are shared)
* @lucene.experimental * @lucene.experimental
*/ */
class PrefixCodedTerms implements Iterable<Term>, Accountable { class PrefixCodedTerms implements Accountable {
final RAMFile buffer; final RAMFile buffer;
private long delGen;
private PrefixCodedTerms(RAMFile buffer) { private PrefixCodedTerms(RAMFile buffer) {
this.buffer = buffer; this.buffer = buffer;
@ -44,56 +44,9 @@ class PrefixCodedTerms implements Iterable<Term>, Accountable {
return buffer.ramBytesUsed(); return buffer.ramBytesUsed();
} }
/** @return iterator over the bytes */ /** Records del gen for this packet. */
@Override public void setDelGen(long delGen) {
public Iterator<Term> iterator() { this.delGen = delGen;
return new PrefixCodedTermsIterator();
}
class PrefixCodedTermsIterator implements Iterator<Term> {
final IndexInput input;
String field = "";
BytesRefBuilder bytes = new BytesRefBuilder();
Term term = new Term(field, bytes.get());
PrefixCodedTermsIterator() {
try {
input = new RAMInputStream("PrefixCodedTermsIterator", buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
return input.getFilePointer() < input.length();
}
@Override
public Term next() {
assert hasNext();
try {
int code = input.readVInt();
if ((code & 1) != 0) {
// new field
field = input.readString();
}
int prefix = code >>> 1;
int suffix = input.readVInt();
bytes.grow(prefix + suffix);
input.readBytes(bytes.bytes(), prefix, suffix);
bytes.setLength(prefix + suffix);
term.set(field, bytes.get());
return term;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
} }
/** Builds a PrefixCodedTerms: call add repeatedly, then finish. */ /** Builds a PrefixCodedTerms: call add repeatedly, then finish. */
@ -150,4 +103,71 @@ class PrefixCodedTerms implements Iterable<Term>, Accountable {
return pos1; return pos1;
} }
} }
public static class TermIterator implements FieldTermIterator {
final IndexInput input;
final BytesRefBuilder builder = new BytesRefBuilder();
final BytesRef bytes = builder.get();
final long end;
final long delGen;
String field = "";
public TermIterator(long delGen, RAMFile buffer) {
try {
input = new RAMInputStream("MergedPrefixCodedTermsIterator", buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
end = input.length();
this.delGen = delGen;
}
@Override
public boolean next() {
if (input.getFilePointer() < end) {
try {
int code = input.readVInt();
boolean newField = (code & 1) != 0;
if (newField) {
field = input.readString();
}
int prefix = code >>> 1;
int suffix = input.readVInt();
readTermBytes(prefix, suffix);
return newField;
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
field = null;
return true;
}
}
// TODO: maybe we should freeze to FST or automaton instead?
private void readTermBytes(int prefix, int suffix) throws IOException {
builder.grow(prefix + suffix);
input.readBytes(builder.bytes(), prefix, suffix);
builder.setLength(prefix + suffix);
}
@Override
public BytesRef term() {
return bytes;
}
@Override
public String field() {
return field;
}
@Override
public long delGen() {
return delGen;
}
}
public TermIterator iterator() {
return new TermIterator(delGen, buffer);
}
} }

View File

@ -54,6 +54,13 @@ public abstract class Terms {
* <p><b>NOTE</b>: the returned TermsEnum cannot * <p><b>NOTE</b>: the returned TermsEnum cannot
* seek</p>. */ * seek</p>. */
public TermsEnum intersect(CompiledAutomaton compiled, final BytesRef startTerm) throws IOException { public TermsEnum intersect(CompiledAutomaton compiled, final BytesRef startTerm) throws IOException {
// TODO: could we factor out a common interface b/w
// CompiledAutomaton and FST? Then we could pass FST there too,
// and likely speed up resolving terms to deleted docs ... but
// AutomatonTermsEnum makes this tricky because of its on-the-fly cycle
// detection
// TODO: eventually we could support seekCeil/Exact on // TODO: eventually we could support seekCeil/Exact on
// the returned enum, instead of only being able to seek // the returned enum, instead of only being able to seek
// at the start // at the start

View File

@ -19,16 +19,21 @@ package org.apache.lucene.util.automaton;
//import java.io.IOException; //import java.io.IOException;
//import java.io.PrintWriter; //import java.io.PrintWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.BitSet; import java.util.BitSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.InPlaceMergeSorter; import org.apache.lucene.util.InPlaceMergeSorter;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.Sorter; import org.apache.lucene.util.Sorter;
// TODO // TODO
// - could use packed int arrays instead // - could use packed int arrays instead
// - could encode dest w/ delta from to? // - could encode dest w/ delta from to?
@ -47,7 +52,8 @@ import org.apache.lucene.util.Sorter;
* *
* @lucene.experimental */ * @lucene.experimental */
public class Automaton { public class Automaton implements Accountable {
/** Where we next write to the int[] states; this increments by 2 for /** Where we next write to the int[] states; this increments by 2 for
* each added state because we pack a pointer to the transitions * each added state because we pack a pointer to the transitions
* array and a count of how many transitions leave the state. */ * array and a count of how many transitions leave the state. */
@ -840,4 +846,14 @@ public class Automaton {
} }
} }
} }
@Override
public long ramBytesUsed() {
// TODO: BitSet RAM usage (isAccept.size()/8) isn't fully accurate...
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.sizeOf(states) + RamUsageEstimator.sizeOf(transitions) +
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + (isAccept.size() / 8) + RamUsageEstimator.NUM_BYTES_OBJECT_REF +
2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF +
3 * RamUsageEstimator.NUM_BYTES_INT +
RamUsageEstimator.NUM_BYTES_BOOLEAN;
}
} }

View File

@ -198,6 +198,7 @@ public class CompiledAutomaton {
if (this.finite) { if (this.finite) {
commonSuffixRef = null; commonSuffixRef = null;
} else { } else {
// NOTE: this is a very costly operation! We should test if it's really warranted in practice...
commonSuffixRef = Operations.getCommonSuffixBytesRef(utf8, maxDeterminizedStates); commonSuffixRef = Operations.getCommonSuffixBytesRef(utf8, maxDeterminizedStates);
} }
runAutomaton = new ByteRunAutomaton(utf8, true, maxDeterminizedStates); runAutomaton = new ByteRunAutomaton(utf8, true, maxDeterminizedStates);

View File

@ -305,40 +305,6 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
dir.close(); dir.close();
} }
public void testUpdateAndDeleteSameDocument() throws Exception {
// update and delete same document in same commit session
Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
conf.setMaxBufferedDocs(10); // control segment flushing
IndexWriter writer = new IndexWriter(dir, conf);
writer.addDocument(doc(0));
writer.addDocument(doc(1));
if (random().nextBoolean()) {
writer.commit();
}
writer.deleteDocuments(new Term("id", "doc-0"));
writer.updateBinaryDocValue(new Term("id", "doc-0"), "val", toBytes(17L));
final DirectoryReader reader;
if (random().nextBoolean()) { // not NRT
writer.close();
reader = DirectoryReader.open(dir);
} else { // NRT
reader = DirectoryReader.open(writer, true);
writer.close();
}
LeafReader r = reader.leaves().get(0).reader();
assertFalse(r.getLiveDocs().get(0));
assertEquals(1, getValue(r.getBinaryDocValues("val"), 0)); // deletes are currently applied first
reader.close();
dir.close();
}
public void testMultipleDocValuesTypes() throws Exception { public void testMultipleDocValuesTypes() throws Exception {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random())); IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
@ -664,6 +630,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
writer.commit(); writer.commit();
reader = DirectoryReader.open(dir); reader = DirectoryReader.open(dir);
} }
//System.out.println("TEST: isNRT=" + isNRT);
final int numFields = random.nextInt(4) + 3; // 3-7 final int numFields = random.nextInt(4) + 3; // 3-7
final long[] fieldValues = new long[numFields]; final long[] fieldValues = new long[numFields];
@ -675,7 +642,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
int docID = 0; int docID = 0;
for (int i = 0; i < numRounds; i++) { for (int i = 0; i < numRounds; i++) {
int numDocs = atLeast(5); int numDocs = atLeast(5);
// System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs); //System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs);
for (int j = 0; j < numDocs; j++) { for (int j = 0; j < numDocs; j++) {
Document doc = new Document(); Document doc = new Document();
doc.add(new StringField("id", "doc-" + docID, Store.NO)); doc.add(new StringField("id", "doc-" + docID, Store.NO));

View File

@ -16,6 +16,7 @@ package org.apache.lucene.index;
* License for the specific language governing permissions and limitations under * License for the specific language governing permissions and limitations under
* the License. * the License.
*/ */
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -24,12 +25,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice; import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** /**
* Unit test for {@link DocumentsWriterDeleteQueue} * Unit test for {@link DocumentsWriterDeleteQueue}
*/ */
@ -75,9 +78,18 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
assertEquals(uniqueValues, bd2.terms.keySet()); assertEquals(uniqueValues, bd2.terms.keySet());
HashSet<Term> frozenSet = new HashSet<>(); HashSet<Term> frozenSet = new HashSet<>();
BytesRefBuilder bytesRef = new BytesRefBuilder(); BytesRefBuilder bytesRef = new BytesRefBuilder();
for (Term t : queue.freezeGlobalBuffer(null).termsIterable()) { TermIterator iter = queue.freezeGlobalBuffer(null).termIterator();
bytesRef.copyBytes(t.bytes); String field = null;
frozenSet.add(new Term(t.field, bytesRef.toBytesRef())); while (true) {
boolean newField = iter.next();
if (newField) {
field = iter.field;
if (field == null) {
break;
}
}
bytesRef.copyBytes(iter.bytes);
frozenSet.add(new Term(field, bytesRef.toBytesRef()));
} }
assertEquals(uniqueValues, frozenSet); assertEquals(uniqueValues, frozenSet);
assertEquals("num deletes must be 0 after freeze", 0, queue assertEquals("num deletes must be 0 after freeze", 0, queue
@ -204,10 +216,21 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
queue.tryApplyGlobalSlice(); queue.tryApplyGlobalSlice();
Set<Term> frozenSet = new HashSet<>(); Set<Term> frozenSet = new HashSet<>();
BytesRefBuilder builder = new BytesRefBuilder(); BytesRefBuilder builder = new BytesRefBuilder();
for (Term t : queue.freezeGlobalBuffer(null).termsIterable()) {
builder.copyBytes(t.bytes); TermIterator iter = queue.freezeGlobalBuffer(null).termIterator();
frozenSet.add(new Term(t.field, builder.toBytesRef())); String field = null;
while (true) {
boolean newField = iter.next();
if (newField) {
field = iter.field;
if (field == null) {
break;
}
}
builder.copyBytes(iter.bytes);
frozenSet.add(new Term(field, builder.toBytesRef()));
} }
assertEquals("num deletes must be 0 after freeze", 0, queue assertEquals("num deletes must be 0 after freeze", 0, queue
.numGlobalTermDeletes()); .numGlobalTermDeletes());
assertEquals(uniqueValues.size(), frozenSet.size()); assertEquals(uniqueValues.size(), frozenSet.size());

View File

@ -789,7 +789,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
doc.add(newTextField("city", text[i], Field.Store.YES)); doc.add(newTextField("city", text[i], Field.Store.YES));
modifier.addDocument(doc); modifier.addDocument(doc);
} }
// flush (and commit if ac) // flush
if (VERBOSE) { if (VERBOSE) {
System.out.println("TEST: now full merge"); System.out.println("TEST: now full merge");
@ -818,7 +818,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
modifier.deleteDocuments(term); modifier.deleteDocuments(term);
// add a doc (needed for the !ac case; see below) // add a doc
// doc remains buffered // doc remains buffered
if (VERBOSE) { if (VERBOSE) {

View File

@ -71,7 +71,7 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase {
int docID = 0; int docID = 0;
for (int i = 0; i < numRounds; i++) { for (int i = 0; i < numRounds; i++) {
int numDocs = atLeast(5); int numDocs = atLeast(5);
// System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs); // System.out.println("TEST: round=" + i + ", numDocs=" + numDocs);
for (int j = 0; j < numDocs; j++) { for (int j = 0; j < numDocs; j++) {
Document doc = new Document(); Document doc = new Document();
doc.add(new StringField("id", "doc-" + docID, Store.NO)); doc.add(new StringField("id", "doc-" + docID, Store.NO));
@ -95,7 +95,7 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase {
} else { } else {
writer.updateBinaryDocValue(new Term("key", "all"), updateField, TestBinaryDocValuesUpdates.toBytes(++fieldValues[fieldIdx])); writer.updateBinaryDocValue(new Term("key", "all"), updateField, TestBinaryDocValuesUpdates.toBytes(++fieldValues[fieldIdx]));
} }
// System.out.println("[" + Thread.currentThread().getName() + "]: updated field '" + updateField + "' to value " + fieldValues[fieldIdx]); //System.out.println("TEST: updated field '" + updateField + "' to value " + fieldValues[fieldIdx]);
if (random.nextDouble() < 0.2) { if (random.nextDouble() < 0.2) {
int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok! int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok!
@ -137,9 +137,9 @@ public class TestMixedDocValuesUpdates extends LuceneTestCase {
// System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + getValue(bdv, doc, scratch)); // System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + getValue(bdv, doc, scratch));
assertTrue(docsWithField.get(doc)); assertTrue(docsWithField.get(doc));
if (field < numNDVFields) { if (field < numNDVFields) {
assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], ndv.get(doc)); assertEquals("invalid numeric value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], ndv.get(doc));
} else { } else {
assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], TestBinaryDocValuesUpdates.getValue(bdv, doc)); assertEquals("invalid binary value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], TestBinaryDocValuesUpdates.getValue(bdv, doc));
} }
} }
} }

View File

@ -24,7 +24,6 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
@ -293,41 +292,6 @@ public class TestNumericDocValuesUpdates extends LuceneTestCase {
dir.close(); dir.close();
} }
@Test
public void testUpdateAndDeleteSameDocument() throws Exception {
// update and delete same document in same commit session
Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(new MockAnalyzer(random()));
conf.setMaxBufferedDocs(10); // control segment flushing
IndexWriter writer = new IndexWriter(dir, conf);
writer.addDocument(doc(0));
writer.addDocument(doc(1));
if (random().nextBoolean()) {
writer.commit();
}
writer.deleteDocuments(new Term("id", "doc-0"));
writer.updateNumericDocValue(new Term("id", "doc-0"), "val", 17L);
final DirectoryReader reader;
if (random().nextBoolean()) { // not NRT
writer.close();
reader = DirectoryReader.open(dir);
} else { // NRT
reader = DirectoryReader.open(writer, true);
writer.close();
}
LeafReader r = reader.leaves().get(0).reader();
assertFalse(r.getLiveDocs().get(0));
assertEquals(1, r.getNumericDocValues("val").get(0)); // deletes are currently applied first
reader.close();
dir.close();
}
@Test @Test
public void testMultipleDocValuesTypes() throws Exception { public void testMultipleDocValuesTypes() throws Exception {
Directory dir = newDirectory(); Directory dir = newDirectory();

View File

@ -17,14 +17,14 @@ package org.apache.lucene.index;
* limitations under the License. * limitations under the License.
*/ */
import java.util.ArrayList; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.MergedIterator;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
public class TestPrefixCodedTerms extends LuceneTestCase { public class TestPrefixCodedTerms extends LuceneTestCase {
@ -32,7 +32,9 @@ public class TestPrefixCodedTerms extends LuceneTestCase {
public void testEmpty() { public void testEmpty() {
PrefixCodedTerms.Builder b = new PrefixCodedTerms.Builder(); PrefixCodedTerms.Builder b = new PrefixCodedTerms.Builder();
PrefixCodedTerms pb = b.finish(); PrefixCodedTerms pb = b.finish();
assertFalse(pb.iterator().hasNext()); TermIterator iter = pb.iterator();
assertTrue(iter.next());
assertNull(iter.field);
} }
public void testOne() { public void testOne() {
@ -40,9 +42,12 @@ public class TestPrefixCodedTerms extends LuceneTestCase {
PrefixCodedTerms.Builder b = new PrefixCodedTerms.Builder(); PrefixCodedTerms.Builder b = new PrefixCodedTerms.Builder();
b.add(term); b.add(term);
PrefixCodedTerms pb = b.finish(); PrefixCodedTerms pb = b.finish();
Iterator<Term> iterator = pb.iterator(); TermIterator iter = pb.iterator();
assertTrue(iterator.hasNext()); assertTrue(iter.next());
assertEquals(term, iterator.next()); assertEquals("foo", iter.field);
assertEquals("bogus", iter.bytes.utf8ToString());
assertTrue(iter.next());
assertNull(iter.field);
} }
public void testRandom() { public void testRandom() {
@ -59,11 +64,23 @@ public class TestPrefixCodedTerms extends LuceneTestCase {
} }
PrefixCodedTerms pb = b.finish(); PrefixCodedTerms pb = b.finish();
TermIterator iter = pb.iterator();
Iterator<Term> expected = terms.iterator(); Iterator<Term> expected = terms.iterator();
for (Term t : pb) { String field = "";
//System.out.println("TEST: now iter");
while (true) {
boolean newField = iter.next();
//System.out.println(" newField=" + newField);
if (newField) {
field = iter.field;
if (field == null) {
break;
}
}
assertTrue(expected.hasNext()); assertTrue(expected.hasNext());
assertEquals(expected.next(), t); assertEquals(expected.next(), new Term(field, iter.bytes));
} }
assertFalse(expected.hasNext()); assertFalse(expected.hasNext());
} }
@ -79,11 +96,14 @@ public class TestPrefixCodedTerms extends LuceneTestCase {
b2.add(t2); b2.add(t2);
PrefixCodedTerms pb2 = b2.finish(); PrefixCodedTerms pb2 = b2.finish();
Iterator<Term> merged = new MergedIterator<>(pb1.iterator(), pb2.iterator()); MergedPrefixCodedTermsIterator merged = new MergedPrefixCodedTermsIterator(Arrays.asList(new PrefixCodedTerms[] {pb1, pb2}));
assertTrue(merged.hasNext()); assertTrue(merged.next());
assertEquals(t1, merged.next()); assertEquals("foo", merged.field());
assertTrue(merged.hasNext()); assertEquals("a", merged.term().utf8ToString());
assertEquals(t2, merged.next()); assertFalse(merged.next());
assertEquals("b", merged.term().utf8ToString());
assertTrue(merged.next());
assertNull(merged.field());
} }
@SuppressWarnings({"unchecked","rawtypes"}) @SuppressWarnings({"unchecked","rawtypes"})
@ -95,31 +115,49 @@ public class TestPrefixCodedTerms extends LuceneTestCase {
Set<Term> terms = new TreeSet<>(); Set<Term> terms = new TreeSet<>();
int nterms = TestUtil.nextInt(random(), 0, 10000); int nterms = TestUtil.nextInt(random(), 0, 10000);
for (int j = 0; j < nterms; j++) { for (int j = 0; j < nterms; j++) {
Term term = new Term(TestUtil.randomUnicodeString(random(), 2), TestUtil.randomUnicodeString(random(), 4)); String field = TestUtil.randomUnicodeString(random(), 2);
//String field = TestUtil.randomSimpleString(random(), 2);
Term term = new Term(field, TestUtil.randomUnicodeString(random(), 4));
terms.add(term); terms.add(term);
} }
superSet.addAll(terms); superSet.addAll(terms);
PrefixCodedTerms.Builder b = new PrefixCodedTerms.Builder(); PrefixCodedTerms.Builder b = new PrefixCodedTerms.Builder();
//System.out.println("TEST: sub " + i + " has " + terms.size() + " terms");
for (Term ref: terms) { for (Term ref: terms) {
//System.out.println(" add " + ref.field() + " " + ref.bytes());
b.add(ref); b.add(ref);
} }
pb[i] = b.finish(); pb[i] = b.finish();
} }
List<Iterator<Term>> subs = new ArrayList<>(); Iterator<Term> expected = superSet.iterator();
for (int i = 0; i < pb.length; i++) {
subs.add(pb[i].iterator()); MergedPrefixCodedTermsIterator actual = new MergedPrefixCodedTermsIterator(Arrays.asList(pb));
String field = "";
BytesRef lastTerm = null;
while (true) {
if (actual.next()) {
field = actual.field();
if (field == null) {
break;
}
lastTerm = null;
//System.out.println("\nTEST: new field: " + field);
}
if (lastTerm != null && lastTerm.equals(actual.term())) {
continue;
}
//System.out.println("TEST: iter: field=" + field + " term=" + actual.term());
lastTerm = BytesRef.deepCopyOf(actual.term());
assertTrue(expected.hasNext());
Term expectedTerm = expected.next();
assertEquals(expectedTerm, new Term(field, actual.term()));
} }
Iterator<Term> expected = superSet.iterator();
// NOTE: currenlty using diamond operator on MergedIterator (without explicit Term class) causes
// errors on Eclipse Compiler (ecj) used for javadoc lint
Iterator<Term> actual = new MergedIterator<Term>(subs.toArray(new Iterator[0]));
while (actual.hasNext()) {
assertTrue(expected.hasNext());
assertEquals(expected.next(), actual.next());
}
assertFalse(expected.hasNext()); assertFalse(expected.hasNext());
} }
} }

View File

@ -102,7 +102,13 @@ public class TestRollingUpdates extends LuceneTestCase {
updateCount++; updateCount++;
if (doUpdate) { if (doUpdate) {
w.updateDocument(idTerm, doc); if (random().nextBoolean()) {
w.updateDocument(idTerm, doc);
} else {
// It's OK to not be atomic for this test (no separate thread reopening readers):
w.deleteDocuments(new TermQuery(idTerm));
w.addDocument(doc);
}
} else { } else {
w.addDocument(doc); w.addDocument(doc);
} }

View File

@ -48,6 +48,7 @@ public class TestStressDeletes extends LuceneTestCase {
final Map<Integer,Boolean> exists = new ConcurrentHashMap<>(); final Map<Integer,Boolean> exists = new ConcurrentHashMap<>();
Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 6)]; Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 6)];
final CountDownLatch startingGun = new CountDownLatch(1); final CountDownLatch startingGun = new CountDownLatch(1);
final int deleteMode = random().nextInt(3);
for(int i=0;i<threads.length;i++) { for(int i=0;i<threads.length;i++) {
threads[i] = new Thread() { threads[i] = new Thread() {
@Override @Override
@ -64,7 +65,20 @@ public class TestStressDeletes extends LuceneTestCase {
w.addDocument(doc); w.addDocument(doc);
exists.put(id, true); exists.put(id, true);
} else { } else {
w.deleteDocuments(new Term("id", ""+id)); if (deleteMode == 0) {
// Always delete by term
w.deleteDocuments(new Term("id", ""+id));
} else if (deleteMode == 1) {
// Always delete by query
w.deleteDocuments(new TermQuery(new Term("id", ""+id)));
} else {
// Mixed
if (random().nextBoolean()) {
w.deleteDocuments(new Term("id", ""+id));
} else {
w.deleteDocuments(new TermQuery(new Term("id", ""+id)));
}
}
exists.put(id, false); exists.put(id, false);
} }
} }