LUCENE-2324: new doc deletes approach; various bug fixes

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/realtime_search@1074414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Busch 2011-02-25 07:15:28 +00:00
parent 768f61a62b
commit da65d16dbe
5 changed files with 132 additions and 131 deletions

View File

@ -27,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BitVector;
/**
* This class accepts multiple added documents and directly
@ -145,28 +145,24 @@ final class DocumentsWriter {
}
boolean deleteQueries(final Query... queries) throws IOException {
synchronized(this) {
for (Query query : queries) {
pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
}
}
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
boolean deleted = false;
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
state.lock();
try {
state.perThread.deleteQueries(queries);
deleted = true;
} finally {
state.unlock();
}
}
if (!deleted) {
synchronized(this) {
for (Query query : queries) {
pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
}
}
}
return false;
}
@ -175,12 +171,16 @@ final class DocumentsWriter {
}
boolean deleteTerms(final Term... terms) throws IOException {
synchronized(this) {
for (Term term : terms) {
pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
}
}
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
boolean deleted = false;
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
deleted = true;
state.lock();
try {
state.perThread.deleteTerms(terms);
@ -189,14 +189,6 @@ final class DocumentsWriter {
}
}
if (!deleted) {
synchronized(this) {
for (Term term : terms) {
pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
}
}
}
return false;
}
@ -207,12 +199,14 @@ final class DocumentsWriter {
return deleteTerms(term);
}
boolean deleteTerm(final Term term, ThreadState exclude) {
void deleteTerm(final Term term, ThreadState exclude) {
synchronized(this) {
pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
}
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
boolean deleted = false;
while (threadsIterator.hasNext()) {
deleted = true;
ThreadState state = threadsIterator.next();
if (state != exclude) {
state.lock();
@ -223,8 +217,6 @@ final class DocumentsWriter {
}
}
}
return deleted;
}
/** If non-null, various details of indexing are printed
@ -303,6 +295,10 @@ final class DocumentsWriter {
synchronized void abort() throws IOException {
boolean success = false;
synchronized (this) {
pendingDeletes.clear();
}
try {
if (infoStream != null) {
message("docWriter: abort");
@ -328,7 +324,7 @@ final class DocumentsWriter {
}
}
synchronized boolean anyChanges() {
boolean anyChanges() {
return numDocsInRAM.get() != 0 || anyDeletions();
}
@ -355,29 +351,10 @@ final class DocumentsWriter {
return numDeletes;
}
// TODO: can we improve performance of this method by keeping track
// here in DW of whether any DWPT has deletions?
public synchronized boolean anyDeletions() {
if (pendingDeletes.any()) {
return true;
public boolean anyDeletions() {
return pendingDeletes.any();
}
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
state.lock();
try {
if (state.perThread.pendingDeletes.any()) {
return true;
}
} finally {
state.unlock();
}
}
return false;
}
void close() {
closed = true;
}
@ -386,9 +363,7 @@ final class DocumentsWriter {
throws CorruptIndexException, IOException {
ensureOpen();
SegmentInfo newSegment = null;
BufferedDeletes segmentDeletes = null;
BitVector deletedDocs = null;
FlushedSegment newSegment = null;
ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
try {
@ -398,39 +373,38 @@ final class DocumentsWriter {
numDocsInRAM.incrementAndGet();
newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
if (newSegment != null) {
deletedDocs = dwpt.flushState.deletedDocs;
if (dwpt.pendingDeletes.any()) {
segmentDeletes = dwpt.pendingDeletes;
dwpt.pendingDeletes = new BufferedDeletes(false);
}
}
} finally {
perThread.unlock();
}
if (segmentDeletes != null) {
pushDeletes(newSegment, segmentDeletes);
}
if (newSegment != null) {
perThreadPool.clearThreadBindings(perThread);
indexWriter.addFlushedSegment(newSegment, deletedDocs);
return true;
}
// delete term from other DWPTs later, so that this thread
// doesn't have to lock multiple DWPTs at the same time
if (delTerm != null) {
deleteTerm(delTerm, perThread);
}
if (newSegment != null) {
finishFlushedSegment(newSegment);
}
if (newSegment != null) {
perThreadPool.clearThreadBindings(perThread);
return true;
}
return false;
}
private void finishFlushedSegment(FlushedSegment newSegment) throws IOException {
pushDeletes(newSegment);
if (newSegment != null) {
indexWriter.addFlushedSegment(newSegment);
}
}
private final SegmentInfo finishAddDocument(DocumentsWriterPerThread perThread,
private final FlushedSegment finishAddDocument(DocumentsWriterPerThread perThread,
long perThreadRAMUsedBeforeAdd) throws IOException {
SegmentInfo newSegment = null;
FlushedSegment newSegment = null;
if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
newSegment = perThread.flush();
@ -445,20 +419,21 @@ final class DocumentsWriter {
return newSegment;
}
final void substractFlushedNumDocs(int numFlushed) {
final void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
oldValue = numDocsInRAM.get();
}
}
private final void pushDeletes(SegmentInfo segmentInfo, BufferedDeletes segmentDeletes) {
synchronized(indexWriter) {
// Lock order: DW -> BD
private synchronized void pushDeletes(FlushedSegment flushedSegment) {
maybePushPendingDeletes();
if (flushedSegment != null) {
BufferedDeletes deletes = flushedSegment.segmentDeletes;
final long delGen = bufferedDeletesStream.getNextGen();
if (segmentDeletes.any()) {
if (indexWriter.segmentInfos.size() > 0 || segmentInfo != null) {
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(segmentDeletes, delGen);
// Lock order: DW -> BD
if (deletes != null && deletes.any()) {
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, delGen);
if (infoStream != null) {
message("flush: push buffered deletes");
}
@ -466,40 +441,27 @@ final class DocumentsWriter {
if (infoStream != null) {
message("flush: delGen=" + packet.gen);
}
if (segmentInfo != null) {
segmentInfo.setBufferedDeletesGen(packet.gen);
}
} else {
if (infoStream != null) {
message("flush: drop buffered deletes: no segments");
flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
}
// We can safely discard these deletes: since
// there are no segments, the deletions cannot
// affect anything.
}
} else if (segmentInfo != null) {
segmentInfo.setBufferedDeletesGen(delGen);
private synchronized final void maybePushPendingDeletes() {
final long delGen = bufferedDeletesStream.getNextGen();
if (pendingDeletes.any()) {
bufferedDeletesStream.push(new FrozenBufferedDeletes(pendingDeletes, delGen));
pendingDeletes.clear();
}
}
}
final boolean flushAllThreads(final boolean flushDeletes)
throws IOException {
if (flushDeletes) {
synchronized (this) {
pushDeletes(null, pendingDeletes);
pendingDeletes = new BufferedDeletes(false);
}
}
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
boolean anythingFlushed = false;
while (threadsIterator.hasNext()) {
SegmentInfo newSegment = null;
BufferedDeletes segmentDeletes = null;
BitVector deletedDocs = null;
FlushedSegment newSegment = null;
ThreadState perThread = threadsIterator.next();
perThread.lock();
@ -520,34 +482,24 @@ final class DocumentsWriter {
newSegment = dwpt.flush();
if (newSegment != null) {
anythingFlushed = true;
deletedDocs = dwpt.flushState.deletedDocs;
perThreadPool.clearThreadBindings(perThread);
if (dwpt.pendingDeletes.any()) {
segmentDeletes = dwpt.pendingDeletes;
dwpt.pendingDeletes = new BufferedDeletes(false);
}
}
} else if (flushDeletes && dwpt.pendingDeletes.any()) {
segmentDeletes = dwpt.pendingDeletes;
dwpt.pendingDeletes = new BufferedDeletes(false);
}
} finally {
perThread.unlock();
}
if (segmentDeletes != null) {
pushDeletes(newSegment, segmentDeletes);
}
if (newSegment != null) {
// important do unlock the perThread before finishFlushedSegment
// is called to prevent deadlock on IndexWriter mutex
indexWriter.addFlushedSegment(newSegment, deletedDocs);
anythingFlushed = true;
finishFlushedSegment(newSegment);
}
}
if (!anythingFlushed && flushDeletes) {
maybePushPendingDeletes();
}
return anythingFlushed;
}

View File

@ -101,6 +101,26 @@ public class DocumentsWriterPerThread {
public boolean testPoint(String name) {
return docWriter.writer.testPoint(name);
}
public void clear() {
// don't hold onto doc nor analyzer, in case it is
// largish:
doc = null;
analyzer = null;
}
}
static class FlushedSegment {
final SegmentInfo segmentInfo;
final BufferedDeletes segmentDeletes;
final BitVector deletedDocuments;
private FlushedSegment(SegmentInfo segmentInfo,
BufferedDeletes segmentDeletes, BitVector deletedDocuments) {
this.segmentInfo = segmentInfo;
this.segmentDeletes = segmentDeletes;
this.deletedDocuments = deletedDocuments;
}
}
/** Called if we hit an exception at a bad time (when
@ -136,7 +156,6 @@ public class DocumentsWriterPerThread {
final Directory directory;
final DocState docState;
final DocConsumer consumer;
private DocFieldProcessor docFieldProcessor;
String segment; // Current segment we are working on
boolean aborting; // True if an abort is pending
@ -160,10 +179,7 @@ public class DocumentsWriterPerThread {
this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
consumer = indexingChain.getChain(this);
if (consumer instanceof DocFieldProcessor) {
docFieldProcessor = (DocFieldProcessor) consumer;
}
}
void setAborting() {
aborting = true;
@ -175,7 +191,7 @@ public class DocumentsWriterPerThread {
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (delTerm != null) {
pendingDeletes.addTerm(delTerm, docState.docID);
pendingDeletes.addTerm(delTerm, numDocsInRAM);
}
if (segment == null) {
@ -186,7 +202,11 @@ public class DocumentsWriterPerThread {
boolean success = false;
try {
try {
consumer.processDocument(fieldInfos);
} finally {
docState.clear();
}
success = true;
} finally {
@ -230,16 +250,20 @@ public class DocumentsWriterPerThread {
}
void deleteQueries(Query... queries) {
if (numDocsInRAM > 0) {
for (Query query : queries) {
pendingDeletes.addQuery(query, numDocsInRAM);
}
}
}
void deleteTerms(Term... terms) {
if (numDocsInRAM > 0) {
for (Term term : terms) {
pendingDeletes.addTerm(term, numDocsInRAM);
}
}
}
int getNumDocsInRAM() {
return numDocsInRAM;
@ -254,12 +278,12 @@ public class DocumentsWriterPerThread {
segment = null;
consumer.doAfterFlush();
fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
parent.substractFlushedNumDocs(numDocsInRAM);
parent.subtractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0;
}
/** Flush all pending docs to a new segment */
SegmentInfo flush() throws IOException {
FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
@ -295,6 +319,7 @@ public class DocumentsWriterPerThread {
SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, flushState.segmentCodecs, fieldInfos);
consumer.flush(flushState);
pendingDeletes.terms.clear();
newSegment.clearFilesCache();
if (infoStream != null) {
@ -305,11 +330,19 @@ public class DocumentsWriterPerThread {
}
flushedDocCount += flushState.numDocs;
BufferedDeletes segmentDeletes = null;
if (pendingDeletes.queries.isEmpty()) {
pendingDeletes.clear();
} else {
segmentDeletes = pendingDeletes;
pendingDeletes = new BufferedDeletes(false);
}
doAfterFlush();
success = true;
return newSegment;
return new FlushedSegment(newSegment, segmentDeletes, flushState.deletedDocs);
} finally {
if (!success) {
if (segment != null) {

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.index.codecs.CodecProvider;
@ -2080,8 +2081,10 @@ public class IndexWriter implements Closeable {
deleter.checkpoint(segmentInfos, false);
}
void addFlushedSegment(SegmentInfo newSegment, BitVector deletedDocs) throws IOException {
assert newSegment != null;
void addFlushedSegment(FlushedSegment flushedSegment) throws IOException {
assert flushedSegment != null;
SegmentInfo newSegment = flushedSegment.segmentInfo;
setDiagnostics(newSegment, "flush");
@ -2107,8 +2110,8 @@ public class IndexWriter implements Closeable {
// Must write deleted docs after the CFS so we don't
// slurp the del file into CFS:
if (deletedDocs != null) {
final int delCount = deletedDocs.count();
if (flushedSegment.deletedDocuments != null) {
final int delCount = flushedSegment.deletedDocuments.count();
assert delCount > 0;
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
@ -2123,7 +2126,7 @@ public class IndexWriter implements Closeable {
// shortly-to-be-opened SegmentReader and let it
// carry the changes; there's no reason to use
// filesystem as intermediary here.
deletedDocs.write(directory, delFileName);
flushedSegment.deletedDocuments.write(directory, delFileName);
success2 = true;
} finally {
if (!success2) {

View File

@ -71,7 +71,6 @@ final class TermVectorsTermsWriterPerField extends TermsHashConsumerPerField {
if (doVectors) {
termsWriter.hasVectors = true;
if (termsWriter.tvx != null) {
if (termsHashPerField.bytesHash.size() != 0) {
// Only necessary if previous doc hit a
// non-aborting exception while writing vectors in
@ -79,7 +78,6 @@ final class TermVectorsTermsWriterPerField extends TermsHashConsumerPerField {
termsHashPerField.reset();
}
}
}
// TODO: only if needed for performance
//perThread.postingsCount = 0;

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.*;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.*;
import org.apache.lucene.util.*;
import org.junit.Test;
@ -48,7 +49,21 @@ public class TestRollingUpdates extends LuceneTestCase {
id++;
}
doc.getField("id").setValue(myID);
w.updateDocument(new Term("id", myID), doc);
int mode = docIter % 3;
switch (mode) {
case 0: {
w.deleteDocuments(new Term("id", myID));
w.addDocument(doc);
break;
}
case 1: {
w.deleteDocuments(new TermQuery(new Term("id", myID)));
w.addDocument(doc);
break;
}
default : w.updateDocument(new Term("id", myID), doc);
}
if (docIter >= SIZE && random.nextInt(50) == 17) {
if (r != null) {