diff --git a/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java b/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java index c77cb76b379..11e55734046 100644 --- a/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java +++ b/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java @@ -208,11 +208,11 @@ class BufferedDeletesStream { } if (!packet.isSegmentPrivate) { /* - * only update the coalescededDeletes if we are NOT on a segment private del packet. - * the segment private del packet must only applied to segments with the same delGen. - * Yet, if a segment is already deleted from the SI since it had no more documents remaining - * after some del packets younger than it segPrivate packet (hihger delGen) have been applied - * the segPrivate packet has not been removed. + * Only coalesce if we are NOT on a segment private del packet: the segment private del packet + * must only applied to segments with the same delGen. Yet, if a segment is already deleted + * from the SI since it had no more documents remaining after some del packets younger than + * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been + * removed. */ coalescedDeletes.update(packet); } @@ -259,7 +259,7 @@ class BufferedDeletesStream { } /* - * since we are on a segment private del packet we must not + * Since we are on a segment private del packet we must not * update the coalescedDeletes here! We can simply advance to the * next packet and seginfo. */ diff --git a/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java b/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java index f4ebdc6d02f..7dbeb099c41 100644 --- a/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java +++ b/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java @@ -50,12 +50,10 @@ final class DocFieldProcessor extends DocConsumer { int hashMask = 1; int totalFieldCount; - float docBoost; int fieldGen; final DocumentsWriterPerThread.DocState docState; - public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) { this.docState = docWriter.docState; this.consumer = consumer; @@ -254,7 +252,6 @@ final class DocFieldProcessor extends DocConsumer { } } - void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) { if (lo >= hi) return; diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index 15fdf87a118..249d2f84f3d 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -39,10 +39,7 @@ import org.apache.lucene.store.Directory; /** * This class accepts multiple added documents and directly - * writes a single segment file. It does this more - * efficiently than creating a single segment per document - * (with DocumentWriter) and doing standard merges on those - * segments. + * writes segment files. * * Each added document is passed to the {@link DocConsumer}, * which in turn processes the document and interacts with @@ -152,8 +149,11 @@ final class DocumentsWriter { } synchronized boolean deleteQueries(final Query... queries) throws IOException { - final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(queries); + // nocommit -- shouldn't we check for doApplyAllDeletes + // here too? + // nocommit shouldn't this consult flush policy? or + // should this return void now? return false; } @@ -165,9 +165,11 @@ final class DocumentsWriter { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(terms); flushControl.doOnDelete(); - if (flushControl.flushDeletes.getAndSet(false)) { - flushDeletes(deleteQueue); + if (flushControl.doApplyAllDeletes()) { + applyAllDeletes(deleteQueue); } + // nocommit shouldn't this consult flush policy? or + // should this return void now? return false; } @@ -182,13 +184,13 @@ final class DocumentsWriter { return deleteQueue; } - private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { if (deleteQueue != null) { synchronized (ticketQueue) { - // freeze and insert the delete flush ticket in the queue + // Freeze and insert the delete flush ticket in the queue ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); applyFlushTickets(null, null); - } + } } indexWriter.applyAllDeletes(); indexWriter.flushCount.incrementAndGet(); @@ -196,14 +198,9 @@ final class DocumentsWriter { synchronized void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; - pushConfigChange(); - } - - private final void pushConfigChange() { final Iterator it = perThreadPool.getAllPerThreadsIterator(); while (it.hasNext()) { - DocumentsWriterPerThread perThread = it.next().perThread; - perThread.docState.infoStream = this.infoStream; + it.next().perThread.docState.infoStream = infoStream; } } @@ -218,8 +215,9 @@ final class DocumentsWriter { // returns boolean for asserts boolean message(String message) { - if (infoStream != null) + if (infoStream != null) { indexWriter.message("DW: " + message); + } return true; } @@ -297,45 +295,52 @@ final class DocumentsWriter { ensureOpen(); boolean maybeMerge = false; final boolean isUpdate = delTerm != null; - if (healthiness.isStalled()) { - /* - * if we are allowed to hijack threads for flushing we try to flush out - * as many pending DWPT to release memory and get back healthy status. - */ + if (healthiness.anyStalledThreads()) { + + // Help out flushing any pending DWPTs so we can un-stall: if (infoStream != null) { - message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment"); + message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)"); } - // try pick up pending threads here if possile + + // Try pick up pending threads here if possible DocumentsWriterPerThread flushingDWPT; - while ( (flushingDWPT = flushControl.nextPendingFlush()) != null){ - // don't push the delete here since the update could fail! + while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { + // Don't push the delete here since the update could fail! maybeMerge = doFlush(flushingDWPT); - if (!healthiness.isStalled()) { + if (!healthiness.anyStalledThreads()) { break; } } - if (infoStream != null && healthiness.isStalled()) { - message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore"); + + if (infoStream != null && healthiness.anyStalledThreads()) { + message("WARNING DocumentsWriter still has stalled threads; waiting"); } + healthiness.waitIfStalled(); // block if stalled + + if (infoStream != null && healthiness.anyStalledThreads()) { + message("WARNING DocumentsWriter done waiting"); + } } + final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc); final DocumentsWriterPerThread flushingDWPT; - final DocumentsWriterPerThread dwpt; + try { + if (!perThread.isActive()) { ensureOpen(); assert false: "perThread is not active but we are still open"; } - dwpt = perThread.perThread; + final DocumentsWriterPerThread dwpt = perThread.perThread; try { dwpt.updateDocument(doc, analyzer, delTerm); numDocsInRAM.incrementAndGet(); } finally { - if(dwpt.checkAndResetHasAborted()) { - flushControl.doOnAbort(perThread); + if (dwpt.checkAndResetHasAborted()) { + flushControl.doOnAbort(perThread); } } flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); @@ -376,46 +381,53 @@ final class DocumentsWriter { * might miss to deletes documents in 'A'. */ synchronized (ticketQueue) { - // each flush is assigned a ticket in the order they accquire the ticketQueue lock - ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); - ticketQueue.add(ticket); + // Each flush is assigned a ticket in the order they accquire the ticketQueue lock + ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); + ticketQueue.add(ticket); } + // flush concurrently without locking final FlushedSegment newSegment = flushingDWPT.flush(); + + // nocommit -- should this success = true be moved + // under the applyFlushTickets? success = true; + /* - * now we are done and try to flush the ticket queue if the head of the + * Now we are done and try to flush the ticket queue if the head of the * queue has already finished the flush. */ applyFlushTickets(ticket, newSegment); } finally { - flushControl.doAfterFlush(flushingDWPT); - flushingDWPT.checkAndResetHasAborted(); - indexWriter.flushCount.incrementAndGet(); - if (!success && ticket != null) { - synchronized (ticketQueue) { - // in the case of a failure make sure we are making progress and - // apply all the deletes since the segment flush failed - ticket.isSegmentFlush = false; - - } + flushControl.doAfterFlush(flushingDWPT); + flushingDWPT.checkAndResetHasAborted(); + indexWriter.flushCount.incrementAndGet(); + if (!success && ticket != null) { + synchronized (ticketQueue) { + // nocommit -- shouldn't we drop the ticket in + // this case? + // In the case of a failure make sure we are making progress and + // apply all the deletes since the segment flush failed + ticket.isSegmentFlush = false; } + } } - flushingDWPT = flushControl.nextPendingFlush() ; + flushingDWPT = flushControl.nextPendingFlush(); } return maybeMerge; } - private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException { synchronized (ticketQueue) { if (current != null) { - // this is a segment FlushTicket so assign the flushed segment so we can make progress. + // nocommit -- can't caller set current.segment = segment? + // nocommit -- confused by this comment: + // This is a segment FlushTicket so assign the flushed segment so we can make progress. assert segment != null; current.segment = segment; } while (true) { - // while we can publish flushes keep on making the queue empty. + // Keep publishing eligible flushed segments: final FlushTicket head = ticketQueue.peek(); if (head != null && head.canPublish()) { ticketQueue.poll(); @@ -426,11 +438,10 @@ final class DocumentsWriter { } } } - private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) throws IOException { - // this is eventually finishing the flushed segment and publishing it to the IndexWriter + // Finish the flushed segment and publish it to IndexWriter if (newSegment == null) { assert bufferedDeletes != null; if (bufferedDeletes != null && bufferedDeletes.any()) { @@ -442,9 +453,6 @@ final class DocumentsWriter { } else { publishFlushedSegment(newSegment, bufferedDeletes); } - - - } final void subtractFlushedNumDocs(int numFlushed) { @@ -455,10 +463,10 @@ final class DocumentsWriter { } /** - * publishes the flushed segment, segment private deletes if any and its - * associated global delete if present to the index writer. the actual - * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo} - * 's delete generation is always GlobalPacket_deleteGeneration + 1 + * Publishes the flushed segment, segment private deletes (if any) and its + * associated global delete (if present) to IndexWriter. The actual + * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s + * delete generation is always GlobalPacket_deleteGeneration + 1 */ private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket) throws IOException { @@ -467,12 +475,13 @@ final class DocumentsWriter { final BufferedDeletes deletes = newSegment.segmentDeletes; FrozenBufferedDeletes packet = null; if (deletes != null && deletes.any()) { - // segment private delete + // Segment private delete packet = new FrozenBufferedDeletes(deletes, true); if (infoStream != null) { message("flush: push buffered seg private deletes: " + packet); } } + // now publish! indexWriter.publishFlushedSegment(segInfo, packet, globalPacket); } @@ -486,10 +495,9 @@ final class DocumentsWriter { } /* - * flushAllThreads is synced by IW fullFlushLock. Flushing all threads is a - * two stage operations, the caller must ensure that #finishFlush is called - * after this method to release the flush lock in DWFlushControl - use try / - * finally! + * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a + * two stage operation; the caller must ensure (in try/finally) that finishFlush + * is called after this method, to release the flush lock in DWFlushControl */ final boolean flushAllThreads(final boolean flushDeletes) throws IOException { @@ -497,9 +505,11 @@ final class DocumentsWriter { synchronized (this) { flushingDeleteQueue = deleteQueue; - /* sets a new delete queue - this must be synced on the flush control + /* Cutover to a new delete queue. This must be synced on the flush control * otherwise a new DWPT could sneak into the loop with an already flushing * delete queue */ + // nocommit -- shouldn't we do this?: + // assert Thread.holdsLock(flushControl); flushControl.markForFullFlush(); assert setFlushingDeleteQueue(flushingDeleteQueue); } @@ -509,18 +519,18 @@ final class DocumentsWriter { boolean anythingFlushed = false; try { DocumentsWriterPerThread flushingDWPT; - // now try help out with flushing + // Help out with flushing: while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { anythingFlushed |= doFlush(flushingDWPT); } - // if a concurrent flush is still in flight wait for it - while (!flushControl.allFlushesDue()) { + // If a concurrent flush is still in flight wait for it + while (flushControl.anyFlushing()) { flushControl.waitForFlush(); } if (!anythingFlushed && flushDeletes) { synchronized (ticketQueue) { ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false)); - } + } applyFlushTickets(null, null); } } finally { @@ -532,13 +542,16 @@ final class DocumentsWriter { final void finishFullFlush(boolean success) { assert setFlushingDeleteQueue(null); if (success) { - // release the flush lock + // Release the flush lock flushControl.finishFullFlush(); } else { flushControl.abortFullFlushes(); } } - + + // nocommit -- can we add comment justifying that these + // fields are safely changed across threads because they + // are always accessed in sync(ticketQueue)? static final class FlushTicket { final FrozenBufferedDeletes frozenDeletes; FlushedSegment segment; diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index 85536daa812..ccc06cfd7e7 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -23,15 +23,15 @@ import org.apache.lucene.search.Query; /** * {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes - * queue. In contrast to other queue implementation we only maintain only the + * queue. In contrast to other queue implementation we only maintain the * tail of the queue. A delete queue is always used in a context of a set of - * DWPT and a global delete pool. Each of the DWPT and the global pool need to - * maintain their 'own' head of the queue. The difference between the DWPT and - * the global pool is that the DWPT starts maintaining a head once it has added - * its first document since for its segments private deletes only the deletes - * after that document are relevant. The global pool instead starts maintaining - * the head once this instance is created by taking the sentinel instance as its - * initial head. + * DWPTs and a global delete pool. Each of the DWPT and the global pool need to + * maintain their 'own' head of the queue (as a DeleteSlice instance per DWPT). + * The difference between the DWPT and the global pool is that the DWPT starts + * maintaining a head once it has added its first document since for its segments + * private deletes only the deletes after that document are relevant. The global + * pool instead starts maintaining the head once this instance is created by + * taking the sentinel instance as its initial head. *

* Since each {@link DeleteSlice} maintains its own head and the list is only * single linked the garbage collector takes care of pruning the list for us. @@ -41,12 +41,12 @@ import org.apache.lucene.search.Query; *

* Each DWPT as well as the global delete pool maintain their private * DeleteSlice instance. In the DWPT case updating a slice is equivalent to - * atomically finishing the document. The slice update guarantees a happens - * before relationship to all other updates in the same indexing session. When a - * DWPT updates a document it + * atomically finishing the document. The slice update guarantees a "happens + * before" relationship to all other updates in the same indexing session. When a + * DWPT updates a document it: * *

    - *
  1. consumes a document finishes its processing
  2. + *
  3. consumes a document and finishes its processing
  4. *
  5. updates its private {@link DeleteSlice} either by calling * {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the * document has a delTerm)
  6. @@ -56,7 +56,7 @@ import org.apache.lucene.search.Query; *
* * The DWPT also doesn't apply its current documents delete term until it has - * updated its delete slice which ensures the consistency of the update. if the + * updated its delete slice which ensures the consistency of the update. If the * update fails before the DeleteSlice could have been updated the deleteTerm * will also not be added to its private deletes neither to the global deletes. * @@ -167,7 +167,7 @@ final class DocumentsWriterDeleteQueue { void tryApplyGlobalSlice() { if (globalBufferLock.tryLock()) { /* - * the global buffer must be locked but we don't need to upate them if + * The global buffer must be locked but we don't need to upate them if * there is an update going on right now. It is sufficient to apply the * deletes that have been added after the current in-flight global slices * tail the next time we can get the lock! @@ -175,7 +175,6 @@ final class DocumentsWriterDeleteQueue { try { if (updateSlice(globalSlice)) { globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT); - } } finally { globalBufferLock.unlock(); @@ -186,15 +185,15 @@ final class DocumentsWriterDeleteQueue { FrozenBufferedDeletes freezeGlobalBuffer(DeleteSlice callerSlice) { globalBufferLock.lock(); /* - * here we are freezing the global buffer so we need to lock it, apply all + * Here we freeze the global buffer so we need to lock it, apply all * deletes in the queue and reset the global slice to let the GC prune the * queue. */ final Node currentTail = tail; // take the current tail make this local any - // changes after this call are applied later + // Changes after this call are applied later // and not relevant here if (callerSlice != null) { - // update the callers slices so we are on the same page + // Update the callers slices so we are on the same page callerSlice.sliceTail = currentTail; } try { @@ -217,7 +216,7 @@ final class DocumentsWriterDeleteQueue { } boolean updateSlice(DeleteSlice slice) { - if (slice.sliceTail != tail) { // if we are the same just + if (slice.sliceTail != tail) { // If we are the same just slice.sliceTail = tail; return true; } @@ -225,7 +224,7 @@ final class DocumentsWriterDeleteQueue { } static class DeleteSlice { - // no need to be volatile, slices are only access by one thread! + // No need to be volatile, slices are thread captive (only accessed by one thread)! Node sliceHead; // we don't apply this one Node sliceTail; @@ -245,7 +244,7 @@ final class DocumentsWriterDeleteQueue { return; } /* - * when we apply a slice we take the head and get its next as our first + * When we apply a slice we take the head and get its next as our first * item to apply and continue until we applied the tail. If the head and * tail in this slice are not equal then there will be at least one more * non-null node in the slice! @@ -260,7 +259,7 @@ final class DocumentsWriterDeleteQueue { } void reset() { - // resetting to a 0 length slice + // Reset to a 0 length slice sliceHead = sliceTail; } @@ -322,7 +321,6 @@ final class DocumentsWriterDeleteQueue { void apply(BufferedDeletes bufferedDeletes, int docIDUpto) { bufferedDeletes.addTerm((Term) item, docIDUpto); } - } private static final class QueryArrayNode extends Node { @@ -376,6 +374,5 @@ final class DocumentsWriterDeleteQueue { } finally { globalBufferLock.unlock(); } - } } diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index 3a5c790631e..98ad05538c0 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -40,7 +40,7 @@ import org.apache.lucene.util.ThreadInterruptedException; */ public final class DocumentsWriterFlushControl { - private final long maxBytesPerDWPT; + private final long hardMaxBytesPerDWPT; private long activeBytes = 0; private long flushBytes = 0; private volatile int numPending = 0; @@ -63,11 +63,11 @@ public final class DocumentsWriterFlushControl { private final DocumentsWriter documentsWriter; DocumentsWriterFlushControl(DocumentsWriter documentsWriter, - Healthiness healthiness, long maxBytesPerDWPT) { + Healthiness healthiness, long hardMaxBytesPerDWPT) { this.healthiness = healthiness; this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; - this.maxBytesPerDWPT = maxBytesPerDWPT; + this.hardMaxBytesPerDWPT = hardMaxBytesPerDWPT; this.documentsWriter = documentsWriter; } @@ -85,8 +85,8 @@ public final class DocumentsWriterFlushControl { private void commitPerThreadBytes(ThreadState perThread) { final long delta = perThread.perThread.bytesUsed() - - perThread.perThreadBytes; - perThread.perThreadBytes += delta; + - perThread.bytesUsed; + perThread.bytesUsed += delta; /* * We need to differentiate here if we are pending since setFlushPending * moves the perThread memory to the flushBytes and we could be set to @@ -100,6 +100,7 @@ public final class DocumentsWriterFlushControl { assert updatePeaks(delta); } + // only for asserts private boolean updatePeaks(long delta) { peakActiveBytes = Math.max(peakActiveBytes, activeBytes); peakFlushBytes = Math.max(peakFlushBytes, flushBytes); @@ -116,10 +117,9 @@ public final class DocumentsWriterFlushControl { } else { flushPolicy.onInsert(this, perThread); } - if (!perThread.flushPending && perThread.perThreadBytes > maxBytesPerDWPT) { - // safety check to prevent a single DWPT exceeding its RAM limit. This - // is super - // important since we can not address more than 2048 MB per DWPT + if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) { + // Safety check to prevent a single DWPT exceeding its RAM limit. This + // is super important since we can not address more than 2048 MB per DWPT setFlushPending(perThread); if (fullFlush) { DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false); @@ -146,8 +146,8 @@ public final class DocumentsWriterFlushControl { } } - public synchronized boolean allFlushesDue() { - return numFlushing == 0; + public synchronized boolean anyFlushing() { + return numFlushing != 0; } public synchronized void waitForFlush() { @@ -169,7 +169,7 @@ public final class DocumentsWriterFlushControl { assert !perThread.flushPending; if (perThread.perThread.getNumDocsInRAM() > 0) { perThread.flushPending = true; // write access synced - final long bytes = perThread.perThreadBytes; + final long bytes = perThread.bytesUsed; flushBytes += bytes; activeBytes -= bytes; numPending++; // write access synced @@ -179,19 +179,20 @@ public final class DocumentsWriterFlushControl { synchronized void doOnAbort(ThreadState state) { if (state.flushPending) { - flushBytes -= state.perThreadBytes; + flushBytes -= state.bytesUsed; } else { - activeBytes -= state.perThreadBytes; + activeBytes -= state.bytesUsed; } - // take it out of the loop this DWPT is stale + // Take it out of the loop this DWPT is stale perThreadPool.replaceForFlush(state, closed); healthiness.updateStalled(this); } synchronized DocumentsWriterPerThread tryCheckoutForFlush( ThreadState perThread, boolean setPending) { - if (fullFlush) + if (fullFlush) { return null; + } return internalTryCheckOutForFlush(perThread, setPending); } @@ -201,17 +202,17 @@ public final class DocumentsWriterFlushControl { setFlushPending(perThread); } if (perThread.flushPending) { - // we are pending so all memory is already moved to flushBytes + // We are pending so all memory is already moved to flushBytes if (perThread.tryLock()) { try { if (perThread.isActive()) { assert perThread.isHeldByCurrentThread(); final DocumentsWriterPerThread dwpt; - final long bytes = perThread.perThreadBytes; // do that before + final long bytes = perThread.bytesUsed; // do that before // replace! dwpt = perThreadPool.replaceForFlush(perThread, closed); assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing"; - // record the flushing DWPT to reduce flushBytes in doAfterFlush + // Record the flushing DWPT to reduce flushBytes in doAfterFlush flushingWriters.put(dwpt, Long.valueOf(bytes)); numPending--; // write access synced numFlushing++; @@ -298,8 +299,12 @@ public final class DocumentsWriterFlushControl { return numFlushing; } - public void setFlushDeletes() { - flushDeletes.set(true); + public boolean doApplyAllDeletes() { + return flushDeletes.getAndSet(false); + } + + public void setApplyAllDeletes() { + flushDeletes.set(true); } int numActiveDWPT() { @@ -312,7 +317,7 @@ public final class DocumentsWriterFlushControl { assert !fullFlush; fullFlush = true; flushingQueue = documentsWriter.deleteQueue; - // set a new delete queue - all subsequent DWPT will use this queue until + // Set a new delete queue - all subsequent DWPT will use this queue until // we do another full flush documentsWriter.deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false)); } @@ -374,9 +379,9 @@ public final class DocumentsWriterFlushControl { } } finally { + fullFlush = false; flushQueue.clear(); blockedFlushes.clear(); - fullFlush = false; } } diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 5aba17937e9..4ff346b388e 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -44,7 +44,7 @@ public abstract class DocumentsWriterPerThreadPool { // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; // write access guarded by DocumentsWriterFlushControl - long perThreadBytes = 0; + long bytesUsed = 0; // guarded by Reentrant lock private boolean isActive = true; @@ -65,7 +65,7 @@ public abstract class DocumentsWriterPerThreadPool { isActive = false; } this.perThread = perThread; - this.perThreadBytes = 0; + this.bytesUsed = 0; this.flushPending = false; } @@ -86,7 +86,7 @@ public abstract class DocumentsWriterPerThreadPool { public long getBytesUsedPerThread() { assert this.isHeldByCurrentThread(); // public for FlushPolicy - return perThreadBytes; + return bytesUsed; } /** @@ -162,9 +162,9 @@ public abstract class DocumentsWriterPerThreadPool { public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc); - public abstract void clearThreadBindings(ThreadState perThread); + //public abstract void clearThreadBindings(ThreadState perThread); - public abstract void clearAllThreadBindings(); + // public abstract void clearAllThreadBindings(); /** * Returns an iterator providing access to all {@link ThreadState} diff --git a/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java index 1f0b42032f8..6ae2fa4d64b 100644 --- a/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java +++ b/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java @@ -21,16 +21,16 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; /** * Default {@link FlushPolicy} implementation that flushes based on RAM - * Consumption, document count and number of buffered deletes depending on the - * IndexWriters {@link IndexWriterConfig}. This {@link FlushPolicy} will only + * used, document count and number of buffered deletes depending on the + * IndexWriter's {@link IndexWriterConfig}. This {@link FlushPolicy} will only * respect settings which are not disabled during initialization ( - * {@link #init(DocumentsWriter)}). All enabled {@link IndexWriterConfig} + * {@link #init(DocumentsWriter)}) (nocommit what does that mean?). All enabled {@link IndexWriterConfig} * settings are used to mark {@link DocumentsWriterPerThread} as flush pending - * during indexing with respect to thier live updates. + * during indexing with respect to their live updates. *

- * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled always the + * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the * largest ram consuming {@link DocumentsWriterPerThread} will be marked as - * pending iff the global active RAM consumption is equals or higher the + * pending iff the global active RAM consumption is >= the * configured max RAM buffer. */ public class FlushByRamOrCountsPolicy extends FlushPolicy { @@ -38,10 +38,11 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy { @Override public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { if (flushOnDeleteTerms()) { + // Flush this state by num del terms final int maxBufferedDeleteTerms = indexWriterConfig .getMaxBufferedDeleteTerms(); if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) { - control.setFlushDeletes(); + control.setApplyAllDeletes(); } } } @@ -51,12 +52,12 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy { if (flushOnDocCount() && state.perThread.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { - control.setFlushPending(state); // flush by num docs + // Flush this state by num docs + control.setFlushPending(state); } else {// flush by RAM if (flushOnRAM()) { - final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); + final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes(); - final long limit = (long) (ramBufferSizeMB * 1024.d * 1024.d); if (totalRam >= limit) { markLargestWriterPending(control, state, totalRam); } diff --git a/lucene/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/src/java/org/apache/lucene/index/FlushPolicy.java index 0997eca4ebe..04cefb5b685 100644 --- a/lucene/src/java/org/apache/lucene/index/FlushPolicy.java +++ b/lucene/src/java/org/apache/lucene/index/FlushPolicy.java @@ -32,16 +32,16 @@ import org.apache.lucene.util.SetOnce; * {@link IndexWriterConfig#setRAMBufferSizeMB(double)} *

  • Number of RAM resident documents - configured via * {@link IndexWriterConfig#setMaxBufferedDocs(int)}
  • - *
  • Number of buffered delete terms - configured via + *
  • Number of buffered delete terms/queries - configured via * {@link IndexWriterConfig#setMaxBufferedDeleteTerms(int)}
  • * * - * The {@link IndexWriter} uses a provided {@link FlushPolicy} to control the - * flushing process during indexing. The policy is informed for each added or + * The {@link IndexWriter} consults a provided {@link FlushPolicy} to control the + * flushing process. The policy is informed for each added or * updated document as well as for each delete term. Based on the - * {@link FlushPolicy} the information provided via {@link ThreadState} and - * {@link DocumentsWriterFlushControl} the {@link FlushPolicy} can decide if a - * {@link DocumentsWriterPerThread} needs flushing and can mark it as + * {@link FlushPolicy}, the information provided via {@link ThreadState} and + * {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a + * {@link DocumentsWriterPerThread} needs flushing and mark it as * flush-pending via * {@link DocumentsWriterFlushControl#setFlushPending(ThreadState)}. * @@ -58,6 +58,7 @@ public abstract class FlushPolicy { * Called for each delete term. If this is a delete triggered due to an update * the given {@link ThreadState} is non-null. *

    + * nocommit: what does this note mean...? * Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} @@ -66,9 +67,10 @@ public abstract class FlushPolicy { ThreadState state); /** - * Called for each document update on the given {@link ThreadState}s + * Called for each document update on the given {@link ThreadState}'s * {@link DocumentsWriterPerThread}. *

    + * nocommit: what does this note mean...? * Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} @@ -103,6 +105,7 @@ public abstract class FlushPolicy { * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush * pending */ + // nocommit -- move to default policy? protected void markLargestWriterPending(DocumentsWriterFlushControl control, ThreadState perThreadState, final long currentBytesPerThread) { control @@ -117,7 +120,8 @@ public abstract class FlushPolicy { */ protected ThreadState findLargestNonPendingWriter( DocumentsWriterFlushControl control, ThreadState perThreadState) { - long maxRamSoFar = perThreadState.perThreadBytes; + assert perThreadState.perThread.getNumDocsInRAM() > 0; + long maxRamSoFar = perThreadState.bytesUsed; // the dwpt which needs to be flushed eventually ThreadState maxRamUsingThreadState = perThreadState; assert !perThreadState.flushPending : "DWPT should have flushed"; @@ -125,7 +129,7 @@ public abstract class FlushPolicy { while (activePerThreadsIterator.hasNext()) { ThreadState next = activePerThreadsIterator.next(); if (!next.flushPending) { - final long nextRam = next.perThreadBytes; + final long nextRam = next.bytesUsed; if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) { maxRamSoFar = nextRam; maxRamUsingThreadState = next; @@ -137,6 +141,8 @@ public abstract class FlushPolicy { return maxRamUsingThreadState; } + // nocommit -- I thought we pause based on "too many flush + // states pending"? /** * Returns the max net memory which marks the upper watermark for the * DocumentsWriter to be healthy. If all flushing and active @@ -154,6 +160,7 @@ public abstract class FlushPolicy { */ public long getMaxNetBytes() { if (!flushOnRAM()) { + // nocommit explain that returning -1 is allowed? return -1; } final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); @@ -165,6 +172,8 @@ public abstract class FlushPolicy { * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise * false. */ + // nocommit who needs this? policy shouldn't have to impl + // this? our default policy should? protected boolean flushOnDocCount() { return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH; } @@ -174,6 +183,8 @@ public abstract class FlushPolicy { * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise * false. */ + // nocommit who needs this? policy shouldn't have to impl + // this? our default policy should? protected boolean flushOnDeleteTerms() { return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH; } @@ -183,6 +194,8 @@ public abstract class FlushPolicy { * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise * false. */ + // nocommit who needs this? policy shouldn't have to impl + // this? our default policy should? protected boolean flushOnRAM() { return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH; } diff --git a/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java b/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java index d40543d92d7..0622fc672f8 100644 --- a/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java @@ -52,7 +52,7 @@ final class FreqProxTermsWriter extends TermsHashConsumer { final int numAllFields = allFields.size(); - // sort by field name + // Sort by field name CollectionUtil.quickSort(allFields); final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state); diff --git a/lucene/src/java/org/apache/lucene/index/Healthiness.java b/lucene/src/java/org/apache/lucene/index/Healthiness.java index 716e0ff9a05..c302dbb76d8 100644 --- a/lucene/src/java/org/apache/lucene/index/Healthiness.java +++ b/lucene/src/java/org/apache/lucene/index/Healthiness.java @@ -83,10 +83,10 @@ final class Healthiness { } } - private final Healthiness.Sync sync = new Sync(); + private final Sync sync = new Sync(); volatile boolean wasStalled = false; // only with asserts - boolean isStalled() { + boolean anyStalledThreads() { return !sync.isHealthy(); } diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java index 8d23b955713..90b52f65a9d 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java @@ -383,7 +383,7 @@ public class IndexWriter implements Closeable { if (!success && infoStream != null) { message("hit exception during while NRT reader"); } - // now we are done - finish the full flush! + // Done: finish the full flush! docWriter.finishFullFlush(success); doAfterFlush(); } @@ -2073,7 +2073,7 @@ public class IndexWriter implements Closeable { if (useCompoundFile(newSegment)) { String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION); message("creating compound file " + compoundFileName); - // Now build compound file + // Now build compound file CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); for(String fileName : newSegment.files()) { cfsWriter.addFile(fileName); @@ -2146,18 +2146,18 @@ public class IndexWriter implements Closeable { */ synchronized void publishFlushedSegment(SegmentInfo newSegment, FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException { - // lock order IW -> BDS + // Lock order IW -> BDS synchronized (bufferedDeletesStream) { if (globalPacket != null && globalPacket.any()) { bufferedDeletesStream.push(globalPacket); } - // publishing the segment must be synched on IW -> BDS to make the sure + // Publishing the segment must be synched on IW -> BDS to make the sure // that no merge prunes away the seg. private delete packet final long nextGen; if (packet != null && packet.any()) { nextGen = bufferedDeletesStream.push(packet); } else { - // since we don't have a delete packet to apply we can get a new + // Since we don't have a delete packet to apply we can get a new // generation right away nextGen = bufferedDeletesStream.getNextGen(); } @@ -2572,7 +2572,11 @@ public class IndexWriter implements Closeable { message("commit: done"); } } + + // Ensures only one flush() is actually flushing segments + // at a time: private final Object fullFlushLock = new Object(); + /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. @@ -2595,9 +2599,7 @@ public class IndexWriter implements Closeable { maybeMerge(); } } - // TODO: this method should not have to be entirely - // synchronized, ie, merges should be allowed to commit - // even while a flush is happening + private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException { if (hitOOM) { throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush"); @@ -2645,6 +2647,8 @@ public class IndexWriter implements Closeable { final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException { if (!applyAllDeletes) { + // nocommit -- shouldn't this move into the default + // flush policy? // If deletes alone are consuming > 1/2 our RAM // buffer, force them all to apply now. This is to // prevent too-frequent flushing of a long tail of @@ -2670,31 +2674,31 @@ public class IndexWriter implements Closeable { } final synchronized void applyAllDeletes() throws IOException { - flushDeletesCount.incrementAndGet(); - final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream - .applyDeletes(readerPool, segmentInfos); - if (result.anyDeletes) { - checkpoint(); + flushDeletesCount.incrementAndGet(); + final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream + .applyDeletes(readerPool, segmentInfos); + if (result.anyDeletes) { + checkpoint(); + } + if (!keepFullyDeletedSegments && result.allDeleted != null) { + if (infoStream != null) { + message("drop 100% deleted segments: " + result.allDeleted); } - if (!keepFullyDeletedSegments && result.allDeleted != null) { - if (infoStream != null) { - message("drop 100% deleted segments: " + result.allDeleted); - } - for (SegmentInfo info : result.allDeleted) { - // If a merge has already registered for this - // segment, we leave it in the readerPool; the - // merge will skip merging it and will then drop - // it once it's done: - if (!mergingSegments.contains(info)) { - segmentInfos.remove(info); - if (readerPool != null) { - readerPool.drop(info); - } + for (SegmentInfo info : result.allDeleted) { + // If a merge has already registered for this + // segment, we leave it in the readerPool; the + // merge will skip merging it and will then drop + // it once it's done: + if (!mergingSegments.contains(info)) { + segmentInfos.remove(info); + if (readerPool != null) { + readerPool.drop(info); } } - checkpoint(); } - bufferedDeletesStream.prune(segmentInfos); + checkpoint(); + } + bufferedDeletesStream.prune(segmentInfos); } /** Expert: Return the total size of all index files currently cached in memory. diff --git a/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java index b00e85a97a9..9a1ffe4bc35 100644 --- a/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java +++ b/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java @@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.document.Document; +// nocommit jdoc +// nocommit -- can/should apps set this via IWC public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool { private Map threadBindings = new ConcurrentHashMap(); @@ -37,8 +39,12 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT } } ThreadState minThreadState = null; - // find the state that has minimum amount of threads waiting + + // Find the state that has minimum number of threads waiting + // noocommit -- can't another thread lock the + // minThreadState we just got? minThreadState = minContendedThreadState(); + if (minThreadState == null || minThreadState.hasQueuedThreads()) { ThreadState newState = newThreadState(); if (newState != null) { @@ -59,6 +65,7 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT return minThreadState; } + /* @Override public void clearThreadBindings(ThreadState perThread) { threadBindings.clear(); @@ -68,5 +75,5 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT public void clearAllThreadBindings() { threadBindings.clear(); } - + */ }