LUCENE-2956: Support updateDocument() with DWPTs

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/realtime_search@1091720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-04-13 09:20:39 +00:00
parent a7d460ca14
commit e95097a7c8
12 changed files with 1188 additions and 393 deletions

View File

@ -33,8 +33,8 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight; import org.apache.lucene.search.Weight;
/* Tracks the stream of {@link BuffereDeletes}. /* Tracks the stream of {@link BufferedDeletes}.
* When DocumensWriter flushes, its buffered * When DocumentsWriterPerThread flushes, its buffered
* deletes are appended to this stream. We later * deletes are appended to this stream. We later
* apply these deletes (resolve them to the actual * apply these deletes (resolve them to the actual
* docIDs, per segment) when a merge is started * docIDs, per segment) when a merge is started
@ -82,17 +82,27 @@ class BufferedDeletesStream {
// Appends a new packet of buffered deletes to the stream, // Appends a new packet of buffered deletes to the stream,
// setting its generation: // setting its generation:
public synchronized void push(FrozenBufferedDeletes packet) { public synchronized long push(FrozenBufferedDeletes packet) {
/*
* The insert operation must be atomic. If we let threads increment the gen
* and push the packet afterwards we risk that packets are out of order.
* With DWPT this is possible if two or more flushes are racing for pushing
* updates. If the pushed packets get our of order would loose documents
* since deletes are applied to the wrong segments.
*/
packet.setDelGen(nextGen++);
assert packet.any(); assert packet.any();
assert checkDeleteStats(); assert checkDeleteStats();
assert packet.gen < nextGen; assert packet.delGen() < nextGen;
assert deletes.isEmpty() || deletes.get(deletes.size()-1).delGen() < packet.delGen() : "Delete packets must be in order";
deletes.add(packet); deletes.add(packet);
numTerms.addAndGet(packet.numTermDeletes); numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed); bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream != null) { if (infoStream != null) {
message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size()); message("push deletes " + packet + " delGen=" + packet.delGen() + " packetCount=" + deletes.size());
} }
assert checkDeleteStats(); assert checkDeleteStats();
return packet.delGen();
} }
public synchronized void clear() { public synchronized void clear() {
@ -132,7 +142,7 @@ class BufferedDeletesStream {
} }
// Sorts SegmentInfos from smallest to biggest bufferedDelGen: // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
private static final Comparator<SegmentInfo> sortByDelGen = new Comparator<SegmentInfo>() { private static final Comparator<SegmentInfo> sortSegInfoByDelGen = new Comparator<SegmentInfo>() {
// @Override -- not until Java 1.6 // @Override -- not until Java 1.6
public int compare(SegmentInfo si1, SegmentInfo si2) { public int compare(SegmentInfo si1, SegmentInfo si2) {
final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen(); final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen();
@ -147,10 +157,10 @@ class BufferedDeletesStream {
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
return sortByDelGen == other; return sortSegInfoByDelGen == other;
} }
}; };
/** Resolves the buffered deleted Term/Query/docIDs, into /** Resolves the buffered deleted Term/Query/docIDs, into
* actual deleted docIDs in the deletedDocs BitVector for * actual deleted docIDs in the deletedDocs BitVector for
* each SegmentReader. */ * each SegmentReader. */
@ -174,7 +184,7 @@ class BufferedDeletesStream {
SegmentInfos infos2 = new SegmentInfos(); SegmentInfos infos2 = new SegmentInfos();
infos2.addAll(infos); infos2.addAll(infos);
Collections.sort(infos2, sortByDelGen); Collections.sort(infos2, sortSegInfoByDelGen);
BufferedDeletes coalescedDeletes = null; BufferedDeletes coalescedDeletes = null;
boolean anyNewDeletes = false; boolean anyNewDeletes = false;
@ -191,19 +201,30 @@ class BufferedDeletesStream {
final SegmentInfo info = infos2.get(infosIDX); final SegmentInfo info = infos2.get(infosIDX);
final long segGen = info.getBufferedDeletesGen(); final long segGen = info.getBufferedDeletesGen();
if (packet != null && segGen < packet.gen) { if (packet != null && segGen < packet.delGen()) {
//System.out.println(" coalesce"); //System.out.println(" coalesce");
if (coalescedDeletes == null) { if (coalescedDeletes == null) {
coalescedDeletes = new BufferedDeletes(true); coalescedDeletes = new BufferedDeletes(true);
} }
coalescedDeletes.update(packet); if (!packet.isSegmentPrivate) {
/*
* only update the coalescededDeletes if we are NOT on a segment private del packet.
* the segment private del packet must only applied to segments with the same delGen.
* Yet, if a segment is already deleted from the SI since it had no more documents remaining
* after some del packets younger than it segPrivate packet (hihger delGen) have been applied
* the segPrivate packet has not been removed.
*/
coalescedDeletes.update(packet);
}
delIDX--; delIDX--;
} else if (packet != null && segGen == packet.gen) { } else if (packet != null && segGen == packet.delGen()) {
assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet";
//System.out.println(" eq"); //System.out.println(" eq");
// Lock order: IW -> BD -> RP // Lock order: IW -> BD -> RP
assert readerPool.infoIsLive(info); assert readerPool.infoIsLive(info);
SegmentReader reader = readerPool.get(info, false); final SegmentReader reader = readerPool.get(info, false);
int delCount = 0; int delCount = 0;
final boolean segAllDeletes; final boolean segAllDeletes;
try { try {
@ -213,7 +234,7 @@ class BufferedDeletesStream {
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
} }
//System.out.println(" del exact"); //System.out.println(" del exact");
// Don't delete by Term here; DocumentsWriter // Don't delete by Term here; DocumentsWriterPerThread
// already did that on flush: // already did that on flush:
delCount += applyQueryDeletes(packet.queriesIterable(), reader); delCount += applyQueryDeletes(packet.queriesIterable(), reader);
segAllDeletes = reader.numDocs() == 0; segAllDeletes = reader.numDocs() == 0;
@ -236,7 +257,12 @@ class BufferedDeletesStream {
if (coalescedDeletes == null) { if (coalescedDeletes == null) {
coalescedDeletes = new BufferedDeletes(true); coalescedDeletes = new BufferedDeletes(true);
} }
coalescedDeletes.update(packet);
/*
* since we are on a segment private del packet we must not
* update the coalescedDeletes here! We can simply advance to the
* next packet and seginfo.
*/
delIDX--; delIDX--;
infosIDX--; infosIDX--;
info.setBufferedDeletesGen(nextGen); info.setBufferedDeletesGen(nextGen);
@ -285,7 +311,7 @@ class BufferedDeletesStream {
return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted); return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted);
} }
public synchronized long getNextGen() { synchronized long getNextGen() {
return nextGen++; return nextGen++;
} }
@ -303,10 +329,9 @@ class BufferedDeletesStream {
if (infoStream != null) { if (infoStream != null) {
message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size()); message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size());
} }
final int limit = deletes.size(); final int limit = deletes.size();
for(int delIDX=0;delIDX<limit;delIDX++) { for(int delIDX=0;delIDX<limit;delIDX++) {
if (deletes.get(delIDX).gen >= minGen) { if (deletes.get(delIDX).delGen() >= minGen) {
prune(delIDX); prune(delIDX);
assert checkDeleteStats(); assert checkDeleteStats();
return; return;

View File

@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
@ -117,8 +119,10 @@ final class DocumentsWriter {
private AtomicInteger numDocsInRAM = new AtomicInteger(0); private AtomicInteger numDocsInRAM = new AtomicInteger(0);
final BufferedDeletesStream bufferedDeletesStream; final BufferedDeletesStream bufferedDeletesStream;
// TODO: cutover to BytesRefHash // TODO: cut over to BytesRefHash in BufferedDeletes
private final BufferedDeletes pendingDeletes = new BufferedDeletes(false); volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
private Collection<String> abortedFiles; // List of files that were written before last abort() private Collection<String> abortedFiles; // List of files that were written before last abort()
final IndexingChain chain; final IndexingChain chain;
@ -146,30 +150,12 @@ final class DocumentsWriter {
healthiness = new Healthiness(); healthiness = new Healthiness();
final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
flushControl = new DocumentsWriterFlushControl(flushPolicy, perThreadPool, healthiness, pendingDeletes, maxRamPerDWPT); flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
} }
boolean deleteQueries(final Query... queries) throws IOException { synchronized boolean deleteQueries(final Query... queries) throws IOException {
synchronized(this) { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
for (Query query : queries) { deleteQueue.addDelete(queries);
pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
}
}
final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
state.lock();
try {
if (state.isActive()) {
state.perThread.deleteQueries(queries);
}
} finally {
state.unlock();
}
}
return false; return false;
} }
@ -177,29 +163,12 @@ final class DocumentsWriter {
return deleteQueries(query); return deleteQueries(query);
} }
boolean deleteTerms(final Term... terms) throws IOException { synchronized boolean deleteTerms(final Term... terms) throws IOException {
synchronized(this) { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
for (Term term : terms) { deleteQueue.addDelete(terms);
pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT); flushControl.doOnDelete();
}
}
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
state.lock();
try {
if (state.isActive()) {
state.perThread.deleteTerms(terms);
flushControl.doOnDelete(state);
}
} finally {
state.unlock();
}
}
if (flushControl.flushDeletes.getAndSet(false)) { if (flushControl.flushDeletes.getAndSet(false)) {
flushDeletes(); flushDeletes(deleteQueue);
} }
return false; return false;
} }
@ -211,32 +180,20 @@ final class DocumentsWriter {
return deleteTerms(term); return deleteTerms(term);
} }
void deleteTerm(final Term term, ThreadState exclude) throws IOException {
synchronized(this) {
pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
}
final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
DocumentsWriterDeleteQueue currentDeleteSession() {
while (threadsIterator.hasNext()) { return deleteQueue;
ThreadState state = threadsIterator.next();
if (state != exclude) {
state.lock();
try {
state.perThread.deleteTerms(term);
flushControl.doOnDelete(state);
} finally {
state.unlock();
}
}
}
if (flushControl.flushDeletes.getAndSet(false)) {
flushDeletes();
}
} }
private void flushDeletes() throws IOException { private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
maybePushPendingDeletes(); if (deleteQueue != null) {
synchronized (ticketQueue) {
// freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
applyFlushTickets(null, null);
}
}
indexWriter.applyAllDeletes(); indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet(); indexWriter.flushCount.incrementAndGet();
} }
@ -284,7 +241,7 @@ final class DocumentsWriter {
boolean success = false; boolean success = false;
synchronized (this) { synchronized (this) {
pendingDeletes.clear(); deleteQueue.clear();
} }
try { try {
@ -322,17 +279,17 @@ final class DocumentsWriter {
} }
public int getBufferedDeleteTermsSize() { public int getBufferedDeleteTermsSize() {
return pendingDeletes.terms.size(); return deleteQueue.getBufferedDeleteTermsSize();
} }
//for testing //for testing
public int getNumBufferedDeleteTerms() { public int getNumBufferedDeleteTerms() {
return pendingDeletes.numTermDeletes.get(); return deleteQueue.numGlobalTermDeletes();
} }
public boolean anyDeletions() { public boolean anyDeletions() {
return pendingDeletes.any(); return deleteQueue.anyChanges();
} }
void close() { void close() {
closed = true; closed = true;
@ -353,73 +310,133 @@ final class DocumentsWriter {
message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment"); message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment");
} }
// try pick up pending threads here if possile // try pick up pending threads here if possile
final DocumentsWriterPerThread flushingDWPT; DocumentsWriterPerThread flushingDWPT;
flushingDWPT = flushControl.getFlushIfPending(null); while ( (flushingDWPT = flushControl.nextPendingFlush()) != null){
// don't push the delete here since the update could fail! // don't push the delete here since the update could fail!
maybeMerge = doFlush(flushingDWPT); maybeMerge = doFlush(flushingDWPT);
if (!healthiness.isStalled()) {
break;
}
}
if (infoStream != null && healthiness.isStalled()) { if (infoStream != null && healthiness.isStalled()) {
message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore"); message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore");
} }
healthiness.waitIfStalled(); // block if stalled healthiness.waitIfStalled(); // block if stalled
} }
ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
this, doc); this, doc);
DocumentsWriterPerThread flushingDWPT = null; final DocumentsWriterPerThread flushingDWPT;
final DocumentsWriterPerThread dwpt;
try { try {
if (!perThread.isActive()) { if (!perThread.isActive()) {
ensureOpen(); ensureOpen();
assert false: "perThread is not active but we are still open"; assert false: "perThread is not active but we are still open";
} }
final DocumentsWriterPerThread dwpt = perThread.perThread;
dwpt = perThread.perThread;
try { try {
dwpt.updateDocument(doc, analyzer, delTerm); dwpt.updateDocument(doc, analyzer, delTerm);
numDocsInRAM.incrementAndGet();
} finally { } finally {
if(dwpt.checkAndResetHasAborted()) { if(dwpt.checkAndResetHasAborted()) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
} }
} }
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
numDocsInRAM.incrementAndGet();
} finally { } finally {
perThread.unlock(); perThread.unlock();
} }
// delete term from other DWPTs later, so that this thread
// doesn't have to lock multiple DWPTs at the same time
if (isUpdate) {
deleteTerm(delTerm, perThread);
}
maybeMerge |= doFlush(flushingDWPT); maybeMerge |= doFlush(flushingDWPT);
return maybeMerge; return maybeMerge;
} }
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean maybeMerge = false; boolean maybeMerge = false;
while (flushingDWPT != null) { while (flushingDWPT != null) {
maybeMerge = true; maybeMerge = true;
boolean success = false;
FlushTicket ticket = null;
try { try {
/*
* Since with DWPT the flush process is concurrent and several DWPT
* could flush at the same time we must maintain the order of the
* flushes before we can apply the flushed segment and the frozen global
* deletes it is buffering. The reason for this is that the global
* deletes mark a certain point in time where we took a DWPT out of
* rotation and freeze the global deletes.
*
* Example: A flush 'A' starts and freezes the global deletes, then
* flush 'B' starts and freezes all deletes occurred since 'A' has
* started. if 'B' finishes before 'A' we need to wait until 'A' is done
* otherwise the deletes frozen by 'B' are not applied to 'A' and we
* might miss to deletes documents in 'A'.
*/
synchronized (ticketQueue) {
// each flush is assigned a ticket in the order they accquire the ticketQueue lock
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
ticketQueue.add(ticket);
}
// flush concurrently without locking // flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush(); final FlushedSegment newSegment = flushingDWPT.flush();
finishFlushedSegment(newSegment); success = true;
/*
* now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
applyFlushTickets(ticket, newSegment);
} finally { } finally {
flushControl.doAfterFlush(flushingDWPT); flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted(); flushingDWPT.checkAndResetHasAborted();
indexWriter.flushCount.incrementAndGet(); indexWriter.flushCount.incrementAndGet();
if (!success && ticket != null) {
synchronized (ticketQueue) {
// in the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed
ticket.isSegmentFlush = false;
}
}
} }
flushingDWPT = flushControl.nextPendingFlush() ; flushingDWPT = flushControl.nextPendingFlush() ;
} }
return maybeMerge; return maybeMerge;
} }
private void finishFlushedSegment(FlushedSegment newSegment) private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException {
throws IOException { synchronized (ticketQueue) {
pushDeletes(newSegment); if (current != null) {
if (newSegment != null) { // this is a segment FlushTicket so assign the flushed segment so we can make progress.
indexWriter.addFlushedSegment(newSegment); assert segment != null;
current.segment = segment;
}
while (true) {
// while we can publish flushes keep on making the queue empty.
final FlushTicket head = ticketQueue.peek();
if (head != null && head.canPublish()) {
ticketQueue.poll();
finishFlushedSegment(head.segment, head.frozenDeletes);
} else {
break;
}
}
} }
} }
private void finishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// this is eventually finishing the flushed segment and publishing it to the IndexWriter
if (bufferedDeletes != null && bufferedDeletes.any()) {
bufferedDeletesStream.push(bufferedDeletes);
if (infoStream != null) {
message("flush: push buffered deletes: " + bufferedDeletes);
}
}
publishFlushedSegment(newSegment);
}
final void subtractFlushedNumDocs(int numFlushed) { final void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get(); int oldValue = numDocsInRAM.get();
@ -427,183 +444,89 @@ final class DocumentsWriter {
oldValue = numDocsInRAM.get(); oldValue = numDocsInRAM.get();
} }
} }
private synchronized void pushDeletes(FlushedSegment flushedSegment) { private void publishFlushedSegment(FlushedSegment newSegment)
maybePushPendingDeletes(); throws IOException {
if (flushedSegment != null) { if (newSegment != null) {
BufferedDeletes deletes = flushedSegment.segmentDeletes; final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
final long delGen = bufferedDeletesStream.getNextGen(); final BufferedDeletes deletes = newSegment.segmentDeletes;
// Lock order: DW -> BD FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) { if (deletes != null && deletes.any()) {
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, // segment private delete
delGen); packet = new FrozenBufferedDeletes(deletes, true);
if (infoStream != null) { if (infoStream != null) {
message("flush: push buffered deletes"); message("flush: push buffered seg private deletes: " + packet);
}
bufferedDeletesStream.push(packet);
if (infoStream != null) {
message("flush: delGen=" + packet.gen);
} }
} }
flushedSegment.segmentInfo.setBufferedDeletesGen(delGen); indexWriter.publishFlushedSegment(segInfo, packet);
} }
} }
private synchronized final void maybePushPendingDeletes() { private final Object flushAllLock = new Object();
final long delGen = bufferedDeletesStream.getNextGen(); // for asserts
if (pendingDeletes.any()) { private volatile DocumentsWriterDeleteQueue currentFlusingSession = null;
indexWriter.bufferedDeletesStream.push(new FrozenBufferedDeletes( private boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
pendingDeletes, delGen)); currentFlusingSession = session;
pendingDeletes.clear(); return true;
}
} }
final boolean flushAllThreads(final boolean flushDeletes) final boolean flushAllThreads(final boolean flushDeletes)
throws IOException { throws IOException {
synchronized (flushAllLock) {
final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator(); final DocumentsWriterDeleteQueue flushingDeleteQueue;
boolean anythingFlushed = false; synchronized (this) {
flushingDeleteQueue = deleteQueue;
while (threadsIterator.hasNext()) { deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
final ThreadState perThread = threadsIterator.next(); assert setFlushingDeleteQueue(flushingDeleteQueue);
final DocumentsWriterPerThread flushingDWPT;
/*
* TODO: maybe we can leverage incoming / indexing threads here if we mark
* all active threads pending so that we don't need to block until we got
* the handle. Yet, we need to figure out how to identify that a certain
* DWPT has been flushed since they are simply replaced once checked out
* for flushing. This would give us another level of concurrency during
* commit.
*
* Maybe we simply iterate them and store the ThreadStates and mark
* all as flushPending and at the same time record the DWPT instance as a
* key for the pending ThreadState. This way we can easily iterate until
* all DWPT have changed.
*/
perThread.lock();
try {
if (!perThread.isActive()) {
assert closed;
continue; //this perThread is already done maybe by a concurrently indexing thread
}
final DocumentsWriterPerThread dwpt = perThread.perThread;
// Always flush docs if there are any
final boolean flushDocs = dwpt.getNumDocsInRAM() > 0;
final String segment = dwpt.getSegment();
// If we are flushing docs, segment must not be null:
assert segment != null || !flushDocs;
if (flushDocs) {
// check out and set pending if not already set
flushingDWPT = flushControl.tryCheckoutForFlush(perThread, true);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT";
try {
final FlushedSegment newSegment = dwpt.flush();
anythingFlushed = true;
finishFlushedSegment(newSegment);
} finally {
flushControl.doAfterFlush(flushingDWPT);
}
}
} finally {
perThread.unlock();
} }
assert flushingDeleteQueue == currentFlusingSession;
boolean anythingFlushed = false;
boolean success = false;
try {
flushControl.markForFullFlush();
DocumentsWriterPerThread flushingDWPT;
// now try help out with flushing
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
// if a concurrent flush is still in flight wait for it
while (!flushControl.allFlushesDue()) {
flushControl.waitForFlush();
}
if (!anythingFlushed && flushDeletes) {
synchronized (ticketQueue) {
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
applyFlushTickets(null, null);
}
success = true;
} finally {
assert flushingDeleteQueue == currentFlusingSession;
assert setFlushingDeleteQueue(null);
if (!success) {
flushControl.abortFullFlushes();
} else {
// release the flush lock
flushControl.finishFullFlush();
}
}
return anythingFlushed;
} }
if (!anythingFlushed && flushDeletes) {
maybePushPendingDeletes();
}
return anythingFlushed;
} }
static final class FlushTicket {
final FrozenBufferedDeletes frozenDeletes;
FlushedSegment segment;
boolean isSegmentFlush;
// /* We have three pools of RAM: Postings, byte blocks
// * (holds freq/prox posting data) and per-doc buffers FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
// * (stored fields/term vectors). Different docs require this.frozenDeletes = frozenDeletes;
// * varying amount of storage from these classes. For this.isSegmentFlush = isSegmentFlush;
// * example, docs with many unique single-occurrence short }
// * terms will use up the Postings RAM and hardly any of
// * the other two. Whereas docs with very large terms will boolean canPublish() {
// * use alot of byte blocks RAM. This method just frees return (!isSegmentFlush || segment != null);
// * allocations from the pools once we are over-budget, }
// * which balances the pools to match the current docs. */ }
// void balanceRAM() {
//
// final boolean doBalance;
// final long deletesRAMUsed;
//
// deletesRAMUsed = bufferedDeletes.bytesUsed();
//
// synchronized(this) {
// if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
// return;
// }
//
// doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
// }
//
// if (doBalance) {
//
// if (infoStream != null)
// message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
// " vs trigger=" + toMB(ramBufferSize) +
// " deletesMB=" + toMB(deletesRAMUsed) +
// " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
// " perDocFree=" + toMB(perDocAllocator.bytesUsed()));
//
// final long startBytesUsed = bytesUsed() + deletesRAMUsed;
//
// int iter = 0;
//
// // We free equally from each pool in 32 KB
// // chunks until we are below our threshold
// // (freeLevel)
//
// boolean any = true;
//
// while(bytesUsed()+deletesRAMUsed > freeLevel) {
//
// synchronized(this) {
// if (0 == perDocAllocator.numBufferedBlocks() &&
// 0 == byteBlockAllocator.numBufferedBlocks() &&
// 0 == freeIntBlocks.size() && !any) {
// // Nothing else to free -- must flush now.
// bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
// if (infoStream != null) {
// if (bytesUsed()+deletesRAMUsed > ramBufferSize)
// message(" nothing to free; set bufferIsFull");
// else
// message(" nothing to free");
// }
// break;
// }
//
// if ((0 == iter % 4) && byteBlockAllocator.numBufferedBlocks() > 0) {
// byteBlockAllocator.freeBlocks(1);
// }
// if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
// freeIntBlocks.remove(freeIntBlocks.size()-1);
// bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
// }
// if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
// perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
// }
// }
//
// if ((3 == iter % 4) && any)
// // Ask consumer to free any recycled state
// any = consumer.freeRAM();
//
// iter++;
// }
//
// if (infoStream != null)
// message(" after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
// }
// }
} }

View File

@ -0,0 +1,377 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.search.Query;
/**
* {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
* queue. In contrast to other queue implementation we only maintain only the
* tail of the queue. A delete queue is always used in a context of a set of
* DWPT and a global delete pool. Each of the DWPT and the global pool need to
* maintain their 'own' head of the queue. The difference between the DWPT and
* the global pool is that the DWPT starts maintaining a head once it has added
* its first document since for its segments private deletes only the deletes
* after that document are relevant. The global pool instead starts maintaining
* the head once this instance is created by taking the sentinel instance as its
* initial head.
* <p>
* Since each {@link DeleteSlice} maintains its own head and the list is only
* single linked the garbage collector takes care of pruning the list for us.
* All nodes in the list that are still relevant should be either directly or
* indirectly referenced by one of the DWPT's private {@link DeleteSlice} or by
* the global {@link BufferedDeletes} slice.
* <p>
* Each DWPT as well as the global delete pool maintain their private
* DeleteSlice instance. In the DWPT case updating a slice is equivalent to
* atomically finishing the document. The slice update guarantees a happens
* before relationship to all other updates in the same indexing session. When a
* DWPT updates a document it
*
* <ol>
* <li>consumes a document finishes its processing</li>
* <li>updates its private {@link DeleteSlice} either by calling
* {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the
* document has a delTerm)</li>
* <li>applies all deletes in the slice to its private {@link BufferedDeletes}
* and resets it</li>
* <li>increments its internal document id</li>
* </ol>
*
* The DWPT also doesn't apply its current documents delete term until it has
* updated its delete slice which ensures the consistency of the update. if the
* update fails before the DeleteSlice could have been updated the deleteTerm
* will also not be added to its private deletes neither to the global deletes.
*
*/
final class DocumentsWriterDeleteQueue {
private volatile Node tail;
private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue, Node> tailUpdater = AtomicReferenceFieldUpdater
.newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail");
private final DeleteSlice globalSlice;
private final BufferedDeletes globalBufferedDeletes;
/* only acquired to update the global deletes */
private final ReentrantLock globalBufferLock = new ReentrantLock();
DocumentsWriterDeleteQueue(BufferedDeletes globalBufferedDeletes) {
this.globalBufferedDeletes = globalBufferedDeletes;
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
*/
tail = new Node(null); // sentinel
globalSlice = new DeleteSlice(tail);
}
void addDelete(Query... queries) {
add(new QueryArrayNode(queries));
tryApplyGlobalSlice();
}
void addDelete(Term... terms) {
add(new TermArrayNode(terms));
tryApplyGlobalSlice();
}
/**
* invariant for document update
*/
void add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
add(termNode);
/*
* this is an update request where the term is the updated documents
* delTerm. in that case we need to guarantee that this insert is atomic
* with regards to the given delete slice. This means if two threads try to
* update the same document with in turn the same delTerm one of them must
* win. By taking the node we have created for our del term as the new tail
* it is guaranteed that if another thread adds the same right after us we
* will apply this delete next time we update our slice and one of the two
* competing updates wins!
*/
slice.sliceTail = termNode;
assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
// we can do it just every n times or so?
}
void add(Node item) {
/*
* this non-blocking / 'wait-free' linked list add was inspired by Apache
* Harmony's ConcurrentLinkedQueue Implementation.
*/
while (true) {
final Node currentTail = this.tail;
final Node tailNext = currentTail.next;
if (tail == currentTail) {
if (tailNext != null) {
/*
* we are in intermediate state here. the tails next pointer has been
* advanced but the tail itself might not be updated yet. help to
* advance the tail and try again updating it.
*/
tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
} else {
/*
* we are in quiescent state and can try to insert the item to the
* current tail if we fail to insert we just retry the operation since
* somebody else has already added its item
*/
if (currentTail.casNext(null, item)) {
/*
* now that we are done we need to advance the tail while another
* thread could have advanced it already so we can ignore the return
* type of this CAS call
*/
tailUpdater.compareAndSet(this, currentTail, item);
return;
}
}
}
}
}
boolean anyChanges() {
globalBufferLock.lock();
try {
return !globalSlice.isEmpty() || globalBufferedDeletes.any();
} finally {
globalBufferLock.unlock();
}
}
void tryApplyGlobalSlice() {
if (globalBufferLock.tryLock()) {
/*
* the global buffer must be locked but we don't need to upate them if
* there is an update going on right now. It is sufficient to apply the
* deletes that have been added after the current in-flight global slices
* tail the next time we can get the lock!
*/
try {
if (updateSlice(globalSlice)) {
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
} finally {
globalBufferLock.unlock();
}
}
}
FrozenBufferedDeletes freezeGlobalBuffer(DeleteSlice callerSlice) {
globalBufferLock.lock();
/*
* here we are freezing the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
final Node currentTail = tail; // take the current tail make this local any
// changes after this call are applied later
// and not relevant here
if (callerSlice != null) {
// update the callers slices so we are on the same page
callerSlice.sliceTail = currentTail;
}
try {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(
globalBufferedDeletes, false);
globalBufferedDeletes.clear();
return packet;
} finally {
globalBufferLock.unlock();
}
}
DeleteSlice newSlice() {
return new DeleteSlice(tail);
}
boolean updateSlice(DeleteSlice slice) {
if (slice.sliceTail != tail) { // if we are the same just
slice.sliceTail = tail;
return true;
}
return false;
}
static class DeleteSlice {
// no need to be volatile, slices are only access by one thread!
Node sliceHead; // we don't apply this one
Node sliceTail;
DeleteSlice(Node currentTail) {
assert currentTail != null;
/*
* Initially this is a 0 length slice pointing to the 'current' tail of
* the queue. Once we update the slice we only need to assign the tail and
* have a new slice
*/
sliceHead = sliceTail = currentTail;
}
void apply(BufferedDeletes del, int docIDUpto) {
if (sliceHead == sliceTail) {
// 0 length slice
return;
}
/*
* when we apply a slice we take the head and get its next as our first
* item to apply and continue until we applied the tail. If the head and
* tail in this slice are not equal then there will be at least one more
* non-null node in the slice!
*/
Node current = sliceHead;
do {
current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto);
} while (current != sliceTail);
reset();
}
void reset() {
// resetting to a 0 length slice
sliceHead = sliceTail;
}
/**
* Returns <code>true</code> iff the given item is identical to the item
* hold by the slices tail, otherwise <code>false</code>.
*/
boolean isTailItem(Object item) {
return sliceTail.item == item;
}
boolean isEmpty() {
return sliceHead == sliceTail;
}
}
public int numGlobalTermDeletes() {
return globalBufferedDeletes.numTermDeletes.get();
}
void clear() {
globalBufferLock.lock();
try {
final Node currentTail = tail;
globalSlice.sliceHead = globalSlice.sliceTail = currentTail;
globalBufferedDeletes.clear();
} finally {
globalBufferLock.unlock();
}
}
private static class Node {
volatile Node next;
final Object item;
private Node(Object item) {
this.item = item;
}
static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater = AtomicReferenceFieldUpdater
.newUpdater(Node.class, Node.class, "next");
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
assert false : "sentinel item must never be applied";
}
boolean casNext(Node cmp, Node val) {
return nextUpdater.compareAndSet(this, cmp, val);
}
}
private static final class TermNode extends Node {
TermNode(Term term) {
super(term);
}
@Override
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
bufferedDeletes.addTerm((Term) item, docIDUpto);
}
}
private static final class QueryArrayNode extends Node {
QueryArrayNode(Query[] query) {
super(query);
}
@Override
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
final Query[] queries = (Query[]) item;
for (Query query : queries) {
bufferedDeletes.addQuery(query, docIDUpto);
}
}
}
private static final class TermArrayNode extends Node {
TermArrayNode(Term[] term) {
super(term);
}
@Override
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
final Term[] terms = (Term[]) item;
for (Term term : terms) {
bufferedDeletes.addTerm(term, docIDUpto);
}
}
}
private boolean forceApplyGlobalSlice() {
globalBufferLock.lock();
final Node currentTail = tail;
try {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
return globalBufferedDeletes.any();
} finally {
globalBufferLock.unlock();
}
}
public int getBufferedDeleteTermsSize() {
globalBufferLock.lock();
try {
forceApplyGlobalSlice();
return globalBufferedDeletes.terms.size();
} finally {
globalBufferLock.unlock();
}
}
}

View File

@ -16,11 +16,15 @@ package org.apache.lucene.index;
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.ThreadInterruptedException;
/** /**
* This class controls {@link DocumentsWriterPerThread} flushing during * This class controls {@link DocumentsWriterPerThread} flushing during
@ -42,6 +46,11 @@ public final class DocumentsWriterFlushControl {
private volatile int numPending = 0; private volatile int numPending = 0;
private volatile int numFlushing = 0; private volatile int numFlushing = 0;
final AtomicBoolean flushDeletes = new AtomicBoolean(false); final AtomicBoolean flushDeletes = new AtomicBoolean(false);
private boolean fullFlush = false;
private Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
// only for safety reasons if a DWPT is close to the RAM limit
private Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<DocumentsWriterPerThread>();
long peakActiveBytes = 0;// only with assert long peakActiveBytes = 0;// only with assert
long peakFlushBytes = 0;// only with assert long peakFlushBytes = 0;// only with assert
@ -51,16 +60,15 @@ public final class DocumentsWriterFlushControl {
private final FlushPolicy flushPolicy; private final FlushPolicy flushPolicy;
private boolean closed = false; private boolean closed = false;
private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>(); private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
private final BufferedDeletes pendingDeletes; private final DocumentsWriter documentsWriter;
DocumentsWriterFlushControl(FlushPolicy flushPolicy, DocumentsWriterFlushControl(DocumentsWriter documentsWriter,
DocumentsWriterPerThreadPool threadPool, Healthiness healthiness, Healthiness healthiness, long maxBytesPerDWPT) {
BufferedDeletes pendingDeletes, long maxBytesPerDWPT) {
this.healthiness = healthiness; this.healthiness = healthiness;
this.perThreadPool = threadPool; this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = flushPolicy; this.flushPolicy = documentsWriter.flushPolicy;
this.maxBytesPerDWPT = maxBytesPerDWPT; this.maxBytesPerDWPT = maxBytesPerDWPT;
this.pendingDeletes = pendingDeletes; this.documentsWriter = documentsWriter;
} }
public synchronized long activeBytes() { public synchronized long activeBytes() {
@ -113,6 +121,11 @@ public final class DocumentsWriterFlushControl {
// is super // is super
// important since we can not address more than 2048 MB per DWPT // important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread); setFlushPending(perThread);
if (fullFlush) {
DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false);
assert toBlock != null;
blockedFlushes.add(toBlock);
}
} }
} }
final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread); final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread);
@ -122,11 +135,29 @@ public final class DocumentsWriterFlushControl {
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
assert flushingWriters.containsKey(dwpt); assert flushingWriters.containsKey(dwpt);
numFlushing--; try {
Long bytes = flushingWriters.remove(dwpt); numFlushing--;
flushBytes -= bytes.longValue(); Long bytes = flushingWriters.remove(dwpt);
perThreadPool.recycle(dwpt); flushBytes -= bytes.longValue();
healthiness.updateStalled(this); perThreadPool.recycle(dwpt);
healthiness.updateStalled(this);
} finally {
notifyAll();
}
}
public synchronized boolean allFlushesDue() {
return numFlushing == 0;
}
public synchronized void waitForFlush() {
if (numFlushing != 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
} }
/** /**
@ -157,6 +188,13 @@ public final class DocumentsWriterFlushControl {
synchronized DocumentsWriterPerThread tryCheckoutForFlush( synchronized DocumentsWriterPerThread tryCheckoutForFlush(
ThreadState perThread, boolean setPending) { ThreadState perThread, boolean setPending) {
if (fullFlush)
return null;
return internalTryCheckOutForFlush(perThread, setPending);
}
private DocumentsWriterPerThread internalTryCheckOutForFlush(
ThreadState perThread, boolean setPending) {
if (setPending && !perThread.flushPending) { if (setPending && !perThread.flushPending) {
setFlushPending(perThread); setFlushPending(perThread);
} }
@ -185,7 +223,7 @@ public final class DocumentsWriterFlushControl {
return null; return null;
} }
DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) { private DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) {
if (numPending > 0) { if (numPending > 0) {
final DocumentsWriterPerThread dwpt = perThread == null ? null final DocumentsWriterPerThread dwpt = perThread == null ? null
: tryCheckoutForFlush(perThread, false); : tryCheckoutForFlush(perThread, false);
@ -204,6 +242,12 @@ public final class DocumentsWriterFlushControl {
} }
DocumentsWriterPerThread nextPendingFlush() { DocumentsWriterPerThread nextPendingFlush() {
synchronized (this) {
DocumentsWriterPerThread poll = flushQueue.poll();
if (poll != null) {
return poll;
}
}
if (numPending > 0) { if (numPending > 0) {
final Iterator<ThreadState> allActiveThreads = perThreadPool final Iterator<ThreadState> allActiveThreads = perThreadPool
.getActivePerThreadsIterator(); .getActivePerThreadsIterator();
@ -236,17 +280,16 @@ public final class DocumentsWriterFlushControl {
return flushPolicy.getMaxNetBytes(); return flushPolicy.getMaxNetBytes();
} }
synchronized void doOnDelete(ThreadState state) { synchronized void doOnDelete() {
if (!state.flushPending) { // pass null this is a global delete no update
flushPolicy.onDelete(this, state); flushPolicy.onDelete(this, null);
}
} }
/** /**
* Returns the number of delete terms in the global pool * Returns the number of delete terms in the global pool
*/ */
public int getNumGlobalTermDeletes() { public int getNumGlobalTermDeletes() {
return pendingDeletes.numTermDeletes.get(); return documentsWriter.deleteQueue.numGlobalTermDeletes();
} }
int numFlushingDWPT() { int numFlushingDWPT() {
@ -260,4 +303,66 @@ public final class DocumentsWriterFlushControl {
int numActiveDWPT() { int numActiveDWPT() {
return this.perThreadPool.getMaxThreadStates(); return this.perThreadPool.getMaxThreadStates();
} }
void markForFullFlush() {
synchronized (this) {
assert !fullFlush;
fullFlush = true;
}
final Iterator<ThreadState> allActiveThreads = perThreadPool
.getActivePerThreadsIterator();
final ArrayList<DocumentsWriterPerThread> toFlush = new ArrayList<DocumentsWriterPerThread>();
while (allActiveThreads.hasNext()) {
final ThreadState next = allActiveThreads.next();
next.lock();
try {
if (!next.isActive()) {
continue;
}
if (next.perThread.getNumDocsInRAM() > 0) {
final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next, true);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT";
toFlush.add(flushingDWPT);
} else {
next.perThread.initialize();
}
} finally {
next.unlock();
}
}
synchronized (this) {
flushQueue.addAll(blockedFlushes);
blockedFlushes.clear();
flushQueue.addAll(toFlush);
}
}
synchronized void finishFullFlush() {
assert fullFlush;
assert flushQueue.isEmpty();
try {
if (!blockedFlushes.isEmpty()) {
flushQueue.addAll(blockedFlushes);
blockedFlushes.clear();
}
} finally {
fullFlush = false;
}
}
synchronized void abortFullFlushes() {
try {
for (DocumentsWriterPerThread dwpt : flushQueue) {
doAfterFlush(dwpt);
}
for (DocumentsWriterPerThread dwpt : blockedFlushes) {
doAfterFlush(dwpt);
}
} finally {
fullFlush = false;
}
}
} }

View File

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.search.Query; import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.SimilarityProvider; import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BitVector; import org.apache.lucene.util.BitVector;
@ -136,6 +136,7 @@ public class DocumentsWriterPerThread {
} }
pendingDeletes.clear(); pendingDeletes.clear();
deleteSlice = deleteQueue.newSlice();
// Reset all postings data // Reset all postings data
doAfterFlush(); doAfterFlush();
@ -165,6 +166,8 @@ public class DocumentsWriterPerThread {
private final PrintStream infoStream; private final PrintStream infoStream;
private int numDocsInRAM; private int numDocsInRAM;
private int flushedDocCount; private int flushedDocCount;
DocumentsWriterDeleteQueue deleteQueue;
DeleteSlice deleteSlice;
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
FieldInfos fieldInfos, IndexingChain indexingChain) { FieldInfos fieldInfos, IndexingChain indexingChain) {
@ -180,11 +183,19 @@ public class DocumentsWriterPerThread {
consumer = indexingChain.getChain(this); consumer = indexingChain.getChain(this);
bytesUsed = new AtomicLong(0); bytesUsed = new AtomicLong(0);
pendingDeletes = new BufferedDeletes(false); pendingDeletes = new BufferedDeletes(false);
initialize();
} }
public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) { public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) {
this(other.directory, other.parent, fieldInfos, other.parent.chain); this(other.directory, other.parent, fieldInfos, other.parent.chain);
}
void initialize() {
deleteQueue = parent.deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
pendingDeletes.clear();
deleteSlice = null;
} }
void setAborting() { void setAborting() {
@ -199,13 +210,10 @@ public class DocumentsWriterPerThread {
public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException { public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
assert writer.testPoint("DocumentsWriterPerThread addDocument start"); assert writer.testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
docState.doc = doc; docState.doc = doc;
docState.analyzer = analyzer; docState.analyzer = analyzer;
docState.docID = numDocsInRAM; docState.docID = numDocsInRAM;
if (delTerm != null) {
pendingDeletes.addTerm(delTerm, numDocsInRAM);
}
if (segment == null) { if (segment == null) {
// this call is synchronized on IndexWriter.segmentInfos // this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName(); segment = writer.newSegmentName();
@ -219,7 +227,6 @@ public class DocumentsWriterPerThread {
} finally { } finally {
docState.clear(); docState.clear();
} }
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
@ -232,18 +239,44 @@ public class DocumentsWriterPerThread {
} }
} }
} }
success = false; success = false;
try { try {
numDocsInRAM++;
consumer.finishDocument(); consumer.finishDocument();
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
abort(); abort();
} }
} }
finishDocument(delTerm);
}
private void finishDocument(Term delTerm) throws IOException {
/*
* here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document
* id.
*
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
if (deleteSlice == null) {
deleteSlice = deleteQueue.newSlice();
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
deleteSlice.reset();
}
} else {
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingDeletes, numDocsInRAM);
} else if (deleteQueue.updateSlice(deleteSlice)) {
deleteSlice.apply(pendingDeletes, numDocsInRAM);
}
}
++numDocsInRAM;
} }
// Buffer a specific docID for deletion. Currently only // Buffer a specific docID for deletion. Currently only
@ -261,22 +294,6 @@ public class DocumentsWriterPerThread {
// confounding exception). // confounding exception).
} }
void deleteQueries(Query... queries) {
if (numDocsInRAM > 0) {
for (Query query : queries) {
pendingDeletes.addQuery(query, numDocsInRAM);
}
}
}
void deleteTerms(Term... terms) {
if (numDocsInRAM > 0) {
for (Term term : terms) {
pendingDeletes.addTerm(term, numDocsInRAM);
}
}
}
/** /**
* Returns the number of delete terms in this {@link DocumentsWriterPerThread} * Returns the number of delete terms in this {@link DocumentsWriterPerThread}
*/ */
@ -305,6 +322,15 @@ public class DocumentsWriterPerThread {
parent.subtractFlushedNumDocs(numDocsInRAM); parent.subtractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0; numDocsInRAM = 0;
} }
FrozenBufferedDeletes prepareFlush() {
assert numDocsInRAM > 0;
final FrozenBufferedDeletes globalDeletes = deleteQueue.freezeGlobalBuffer(deleteSlice);
// apply all deletes before we flush and release the delete slice
deleteSlice.apply(pendingDeletes, numDocsInRAM);
deleteSlice = null;
return globalDeletes;
}
/** Flush all pending docs to a new segment */ /** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException { FlushedSegment flush() throws IOException {

View File

@ -118,7 +118,10 @@ public abstract class DocumentsWriterPerThreadPool {
public synchronized ThreadState newThreadState() { public synchronized ThreadState newThreadState() {
if (numThreadStatesActive < perThreads.length) { if (numThreadStatesActive < perThreads.length) {
return perThreads[numThreadStatesActive++]; final ThreadState threadState = perThreads[numThreadStatesActive];
threadState.perThread.initialize();
numThreadStatesActive++;
return threadState;
} }
return null; return null;
} }
@ -128,7 +131,9 @@ public abstract class DocumentsWriterPerThreadPool {
final DocumentsWriterPerThread dwpt = threadState.perThread; final DocumentsWriterPerThread dwpt = threadState.perThread;
if (!closed) { if (!closed) {
final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider)); final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
threadState.resetWriter(new DocumentsWriterPerThread(dwpt, infos)); final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
newDwpt.initialize();
threadState.resetWriter(newDwpt);
} else { } else {
threadState.resetWriter(null); threadState.resetWriter(null);
} }

View File

@ -55,8 +55,8 @@ public abstract class FlushPolicy {
protected IndexWriterConfig indexWriterConfig; protected IndexWriterConfig indexWriterConfig;
/** /**
* Called for each delete term applied to the given {@link ThreadState}s * Called for each delete term. If this is a delete triggered due to an update
* {@link DocumentsWriterPerThread}. * the given {@link ThreadState} is non-null.
* <p> * <p>
* Note: This method is synchronized by the given * Note: This method is synchronized by the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling

View File

@ -52,9 +52,15 @@ class FrozenBufferedDeletes {
final int[] queryLimits; final int[] queryLimits;
final int bytesUsed; final int bytesUsed;
final int numTermDeletes; final int numTermDeletes;
final long gen; private long gen = -1; // assigned by BufferedDeletesStream once pushed
final boolean isSegmentPrivate; // set to true iff this frozen packet represents
// a segment private deletes. in that case is should
// only have Queries
public FrozenBufferedDeletes(BufferedDeletes deletes, long gen) {
public FrozenBufferedDeletes(BufferedDeletes deletes, boolean isSegmentPrivate) {
this.isSegmentPrivate = isSegmentPrivate;
assert !isSegmentPrivate || deletes.terms.size() == 0 : "segment private package should only have del queries";
terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]); terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]);
queries = new Query[deletes.queries.size()]; queries = new Query[deletes.queries.size()];
queryLimits = new int[deletes.queries.size()]; queryLimits = new int[deletes.queries.size()];
@ -66,8 +72,17 @@ class FrozenBufferedDeletes {
} }
bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY; bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY;
numTermDeletes = deletes.numTermDeletes.get(); numTermDeletes = deletes.numTermDeletes.get();
}
public void setDelGen(long gen) {
assert this.gen == -1;
this.gen = gen; this.gen = gen;
} }
public long delGen() {
assert gen != -1;
return gen;
}
public Iterable<Term> termsIterable() { public Iterable<Term> termsIterable() {
return new Iterable<Term>() { return new Iterable<Term>() {

View File

@ -47,6 +47,7 @@ import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -356,8 +357,8 @@ public class IndexWriter implements Closeable {
// reader; in theory we could do similar retry logic, // reader; in theory we could do similar retry logic,
// just like we do when loading segments_N // just like we do when loading segments_N
IndexReader r; IndexReader r;
flush(false, applyAllDeletes); // don't sync on IW here DWPT will deadlock
synchronized(this) { synchronized(this) {
flush(false, applyAllDeletes);
r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes); r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
if (infoStream != null) { if (infoStream != null) {
message("return reader version=" + r.getVersion() + " reader=" + r); message("return reader version=" + r.getVersion() + " reader=" + r);
@ -2026,7 +2027,16 @@ public class IndexWriter implements Closeable {
deleter.checkpoint(segmentInfos, false); deleter.checkpoint(segmentInfos, false);
} }
void addFlushedSegment(FlushedSegment flushedSegment) throws IOException { /**
* Prepares the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link BitVector}. Use
* {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to
* publish the returned {@link SegmentInfo} together with its segment private
* delete packet.
*
* @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)
*/
SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
assert flushedSegment != null; assert flushedSegment != null;
SegmentInfo newSegment = flushedSegment.segmentInfo; SegmentInfo newSegment = flushedSegment.segmentInfo;
@ -2098,9 +2108,32 @@ public class IndexWriter implements Closeable {
} }
} }
} }
return newSegment;
}
synchronized(this) {
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer. NOTE: use
* {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
* {@link SegmentInfo} for the flushed segment.
*
* @see #prepareFlushedSegment(FlushedSegment)
*/
synchronized void publishFlushedSegment(SegmentInfo newSegment,
FrozenBufferedDeletes packet) throws IOException {
// lock order IW -> BDS
synchronized (bufferedDeletesStream) {
// publishing the segment must be synched on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedDeletesStream.push(packet);
} else {
// since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment); segmentInfos.add(newSegment);
checkpoint(); checkpoint();
} }
@ -2534,7 +2567,6 @@ public class IndexWriter implements Closeable {
maybeMerge(); maybeMerge();
} }
} }
// TODO: this method should not have to be entirely // TODO: this method should not have to be entirely
// synchronized, ie, merges should be allowed to commit // synchronized, ie, merges should be allowed to commit
// even while a flush is happening // even while a flush is happening
@ -2602,30 +2634,31 @@ public class IndexWriter implements Closeable {
} }
final synchronized void applyAllDeletes() throws IOException { final synchronized void applyAllDeletes() throws IOException {
flushDeletesCount.incrementAndGet(); flushDeletesCount.incrementAndGet();
final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos); final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
if (result.anyDeletes) { .applyDeletes(readerPool, segmentInfos);
checkpoint(); if (result.anyDeletes) {
} checkpoint();
if (!keepFullyDeletedSegments && result.allDeleted != null) {
if (infoStream != null) {
message("drop 100% deleted segments: " + result.allDeleted);
} }
for(SegmentInfo info : result.allDeleted) { if (!keepFullyDeletedSegments && result.allDeleted != null) {
// If a merge has already registered for this if (infoStream != null) {
// segment, we leave it in the readerPool; the message("drop 100% deleted segments: " + result.allDeleted);
// merge will skip merging it and will then drop }
// it once it's done: for (SegmentInfo info : result.allDeleted) {
if (!mergingSegments.contains(info)) { // If a merge has already registered for this
segmentInfos.remove(info); // segment, we leave it in the readerPool; the
if (readerPool != null) { // merge will skip merging it and will then drop
readerPool.drop(info); // it once it's done:
if (!mergingSegments.contains(info)) {
segmentInfos.remove(info);
if (readerPool != null) {
readerPool.drop(info);
}
} }
} }
checkpoint();
} }
checkpoint(); bufferedDeletesStream.prune(segmentInfos);
}
bufferedDeletesStream.prune(segmentInfos);
} }
/** Expert: Return the total size of all index files currently cached in memory. /** Expert: Return the total size of all index files currently cached in memory.
@ -3065,7 +3098,6 @@ public class IndexWriter implements Closeable {
// Lock order: IW -> BD // Lock order: IW -> BD
bufferedDeletesStream.prune(segmentInfos); bufferedDeletesStream.prune(segmentInfos);
Map<String,String> details = new HashMap<String,String>(); Map<String,String> details = new HashMap<String,String>();
details.put("optimize", Boolean.toString(merge.optimize)); details.put("optimize", Boolean.toString(merge.optimize));
details.put("mergeFactor", Integer.toString(merge.segments.size())); details.put("mergeFactor", Integer.toString(merge.segments.size()));

View File

@ -0,0 +1,222 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* Unit test for {@link DocumentsWriterDeleteQueue}
*/
public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
public void testUpdateDelteSlices() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(
new BufferedDeletes(false));
final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER;
Integer[] ids = new Integer[size];
for (int i = 0; i < ids.length; i++) {
ids[i] = random.nextInt();
}
Term template = new Term("id");
DeleteSlice slice1 = queue.newSlice();
DeleteSlice slice2 = queue.newSlice();
BufferedDeletes bd1 = new BufferedDeletes(false);
BufferedDeletes bd2 = new BufferedDeletes(false);
int last1 = 0;
int last2 = 0;
Set<Term> uniqueValues = new HashSet<Term>();
for (int j = 0; j < ids.length; j++) {
Integer i = ids[j];
// create an array here since we compare identity below against tailItem
Term[] term = new Term[] {template.createTerm(i.toString())};
uniqueValues.add(term[0]);
queue.addDelete(term);
if (random.nextInt(20) == 0 || j == ids.length - 1) {
queue.updateSlice(slice1);
assertTrue(slice1.isTailItem(term));
slice1.apply(bd1, j);
assertAllBetween(last1, j, bd1, ids);
last1 = j + 1;
}
if (random.nextInt(10) == 5 || j == ids.length - 1) {
queue.updateSlice(slice2);
assertTrue(slice2.isTailItem(term));
slice2.apply(bd2, j);
assertAllBetween(last2, j, bd2, ids);
last2 = j + 1;
}
assertEquals(uniqueValues.size(), queue.numGlobalTermDeletes());
}
assertEquals(uniqueValues, bd1.terms.keySet());
assertEquals(uniqueValues, bd2.terms.keySet());
assertEquals(uniqueValues, new HashSet<Term>(Arrays.asList(queue
.freezeGlobalBuffer(null).terms)));
assertEquals("num deletes must be 0 after freeze", 0, queue
.numGlobalTermDeletes());
}
private void assertAllBetween(int start, int end, BufferedDeletes deletes,
Integer[] ids) {
Term template = new Term("id");
for (int i = start; i <= end; i++) {
assertEquals(Integer.valueOf(end), deletes.terms.get(template
.createTerm(ids[i].toString())));
}
}
public void testClear() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(
new BufferedDeletes(false));
Term template = new Term("id");
assertFalse(queue.anyChanges());
queue.clear();
assertFalse(queue.anyChanges());
final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER;
int termsSinceFreeze = 0;
int queriesSinceFreeze = 0;
for (int i = 0; i < size; i++) {
Term term = template.createTerm("" + i);
if (random.nextInt(10) == 0) {
queue.addDelete(new TermQuery(term));
queriesSinceFreeze++;
} else {
queue.addDelete(term);
termsSinceFreeze++;
}
assertTrue(queue.anyChanges());
if (random.nextInt(10) == 0) {
queue.clear();
queue.tryApplyGlobalSlice();
assertFalse(queue.anyChanges());
}
}
}
public void testAnyChanges() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(
new BufferedDeletes(false));
Term template = new Term("id");
final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER;
int termsSinceFreeze = 0;
int queriesSinceFreeze = 0;
for (int i = 0; i < size; i++) {
Term term = template.createTerm("" + i);
if (random.nextInt(10) == 0) {
queue.addDelete(new TermQuery(term));
queriesSinceFreeze++;
} else {
queue.addDelete(term);
termsSinceFreeze++;
}
assertTrue(queue.anyChanges());
if (random.nextInt(5) == 0) {
FrozenBufferedDeletes freezeGlobalBuffer = queue
.freezeGlobalBuffer(null);
assertEquals(termsSinceFreeze, freezeGlobalBuffer.terms.length);
assertEquals(queriesSinceFreeze, freezeGlobalBuffer.queries.length);
queriesSinceFreeze = 0;
termsSinceFreeze = 0;
assertFalse(queue.anyChanges());
}
}
}
public void testStressDeleteQueue() throws InterruptedException {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(
new BufferedDeletes(false));
Set<Term> uniqueValues = new HashSet<Term>();
final int size = 10000 + random.nextInt(500) * RANDOM_MULTIPLIER;
Integer[] ids = new Integer[size];
Term template = new Term("id");
for (int i = 0; i < ids.length; i++) {
ids[i] = random.nextInt();
uniqueValues.add(template.createTerm(ids[i].toString()));
}
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger index = new AtomicInteger(0);
final int numThreads = 2 + random.nextInt(5);
UpdateThread[] threads = new UpdateThread[numThreads];
for (int i = 0; i < threads.length; i++) {
threads[i] = new UpdateThread(queue, index, ids, latch);
threads[i].start();
}
latch.countDown();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
for (UpdateThread updateThread : threads) {
DeleteSlice slice = updateThread.slice;
queue.updateSlice(slice);
BufferedDeletes deletes = updateThread.deletes;
slice.apply(deletes, BufferedDeletes.MAX_INT);
assertEquals(uniqueValues, deletes.terms.keySet());
}
queue.tryApplyGlobalSlice();
assertEquals(uniqueValues, new HashSet<Term>(Arrays.asList(queue
.freezeGlobalBuffer(null).terms)));
assertEquals("num deletes must be 0 after freeze", 0, queue
.numGlobalTermDeletes());
}
private static class UpdateThread extends Thread {
final DocumentsWriterDeleteQueue queue;
final AtomicInteger index;
final Integer[] ids;
final DeleteSlice slice;
final BufferedDeletes deletes;
final CountDownLatch latch;
protected UpdateThread(DocumentsWriterDeleteQueue queue,
AtomicInteger index, Integer[] ids, CountDownLatch latch) {
this.queue = queue;
this.index = index;
this.ids = ids;
this.slice = queue.newSlice();
deletes = new BufferedDeletes(false);
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
Term template = new Term("id");
int i = 0;
while ((i = index.getAndIncrement()) < ids.length) {
Term term = template.createTerm(ids[i].toString());
queue.add(term, slice);
assertTrue(slice.isTailItem(term));
slice.apply(deletes, BufferedDeletes.MAX_INT);
}
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.*; import org.apache.lucene.document.*;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.apache.lucene.util.*; import org.apache.lucene.util.*;
import org.junit.Test; import org.junit.Test;
@ -72,4 +73,73 @@ public class TestRollingUpdates extends LuceneTestCase {
dir.close(); dir.close();
} }
public void testUpdateSameDoc() throws Exception {
final Directory dir = newDirectory();
final LineFileDocs docs = new LineFileDocs(random);
for (int r = 0; r < 3; r++) {
final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer()).setMergePolicy(
newLogMergePolicy()).setMaxBufferedDocs(2));
final int SIZE = 200 * RANDOM_MULTIPLIER;
final int numUpdates = (int) (SIZE * (2 + random.nextDouble()));
int numThreads = 3 + random.nextInt(Runtime.getRuntime().availableProcessors());
IndexingThread[] threads = new IndexingThread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new IndexingThread(docs, w, numUpdates);
threads[i].start();
}
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
w.close();
}
IndexReader open = IndexReader.open(dir);
assertEquals(1, open.numDocs());
open.close();
docs.close();
dir.close();
}
static class IndexingThread extends Thread {
final LineFileDocs docs;
final IndexWriter writer;
final int num;
public IndexingThread(LineFileDocs docs, IndexWriter writer, int num) {
super();
this.docs = docs;
this.writer = writer;
this.num = num;
}
public void run() {
try {
// IndexReader open = IndexReader.open(writer, true);
for (int i = 0; i < num; i++) {
Document doc = new Document();// docs.nextDoc();
doc.add(newField("id", "test", Index.NOT_ANALYZED));
writer.updateDocument(new Term("id", "test"), doc);
// if (random.nextInt(10) == 0) {
// IndexReader reader = open.reopen();
// if (reader != open) {
// open.close();
// open = reader;
// }
// assertEquals("iter: " + i + " numDocs: "+ open.numDocs() + " del: " + open.numDeletedDocs() + " max: " + open.maxDoc(), 1, open.numDocs());
//
// }
}
// open.close();
} catch (Exception e) {
fail(e.getMessage());
}
}
}
} }

View File

@ -199,44 +199,39 @@ public class TestStressIndexing2 extends LuceneTestCase {
public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates, public Map<String,Document> indexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates,
boolean doReaderPooling) throws IOException, InterruptedException { boolean doReaderPooling) throws IOException, InterruptedException {
Map<String,Document> docs = new HashMap<String,Document>(); Map<String,Document> docs = new HashMap<String,Document>();
for(int iter=0;iter<3;iter++) { IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig(
if (VERBOSE) { TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE)
System.out.println("TEST: iter=" + iter); .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates))
} .setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy()));
IndexWriter w = new MockIndexWriter(dir, newIndexWriterConfig( w.setInfoStream(VERBOSE ? System.out : null);
TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.CREATE) LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
.setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates)) lmp.setUseCompoundFile(false);
.setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy())); lmp.setMergeFactor(mergeFactor);
w.setInfoStream(VERBOSE ? System.out : null);
LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
lmp.setUseCompoundFile(false);
lmp.setMergeFactor(mergeFactor);
threads = new IndexingThread[nThreads]; threads = new IndexingThread[nThreads];
for (int i=0; i<threads.length; i++) { for (int i=0; i<threads.length; i++) {
IndexingThread th = new IndexingThread(); IndexingThread th = new IndexingThread();
th.w = w; th.w = w;
th.base = 1000000*i; th.base = 1000000*i;
th.range = range; th.range = range;
th.iterations = iterations; th.iterations = iterations;
threads[i] = th; threads[i] = th;
} }
for (int i=0; i<threads.length; i++) { for (int i=0; i<threads.length; i++) {
threads[i].start(); threads[i].start();
} }
for (int i=0; i<threads.length; i++) { for (int i=0; i<threads.length; i++) {
threads[i].join(); threads[i].join();
} }
//w.optimize(); //w.optimize();
w.close(); w.close();
for (int i=0; i<threads.length; i++) { for (int i=0; i<threads.length; i++) {
IndexingThread th = threads[i]; IndexingThread th = threads[i];
synchronized(th) { synchronized(th) {
docs.putAll(th.docs); docs.putAll(th.docs);
}
} }
} }