LUCENE-3023: some clean-up and nocommits

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/realtime_search@1094916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-04-19 07:52:55 +00:00
parent f0b56fd92e
commit 01ffe0ba84
12 changed files with 223 additions and 186 deletions

View File

@ -208,11 +208,11 @@ class BufferedDeletesStream {
}
if (!packet.isSegmentPrivate) {
/*
* only update the coalescededDeletes if we are NOT on a segment private del packet.
* the segment private del packet must only applied to segments with the same delGen.
* Yet, if a segment is already deleted from the SI since it had no more documents remaining
* after some del packets younger than it segPrivate packet (hihger delGen) have been applied
* the segPrivate packet has not been removed.
* Only coalesce if we are NOT on a segment private del packet: the segment private del packet
* must only applied to segments with the same delGen. Yet, if a segment is already deleted
* from the SI since it had no more documents remaining after some del packets younger than
* its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
* removed.
*/
coalescedDeletes.update(packet);
}
@ -259,7 +259,7 @@ class BufferedDeletesStream {
}
/*
* since we are on a segment private del packet we must not
* Since we are on a segment private del packet we must not
* update the coalescedDeletes here! We can simply advance to the
* next packet and seginfo.
*/

View File

@ -50,12 +50,10 @@ final class DocFieldProcessor extends DocConsumer {
int hashMask = 1;
int totalFieldCount;
float docBoost;
int fieldGen;
final DocumentsWriterPerThread.DocState docState;
public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
this.docState = docWriter.docState;
this.consumer = consumer;
@ -254,7 +252,6 @@ final class DocFieldProcessor extends DocConsumer {
}
}
void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
if (lo >= hi)
return;

View File

@ -39,10 +39,7 @@ import org.apache.lucene.store.Directory;
/**
* This class accepts multiple added documents and directly
* writes a single segment file. It does this more
* efficiently than creating a single segment per document
* (with DocumentWriter) and doing standard merges on those
* segments.
* writes segment files.
*
* Each added document is passed to the {@link DocConsumer},
* which in turn processes the document and interacts with
@ -152,8 +149,11 @@ final class DocumentsWriter {
}
synchronized boolean deleteQueries(final Query... queries) throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
// nocommit -- shouldn't we check for doApplyAllDeletes
// here too?
// nocommit shouldn't this consult flush policy? or
// should this return void now?
return false;
}
@ -165,9 +165,11 @@ final class DocumentsWriter {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
flushControl.doOnDelete();
if (flushControl.flushDeletes.getAndSet(false)) {
flushDeletes(deleteQueue);
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
// nocommit shouldn't this consult flush policy? or
// should this return void now?
return false;
}
@ -182,13 +184,13 @@ final class DocumentsWriter {
return deleteQueue;
}
private void flushDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null) {
synchronized (ticketQueue) {
// freeze and insert the delete flush ticket in the queue
// Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
applyFlushTickets(null, null);
}
}
}
indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet();
@ -196,14 +198,9 @@ final class DocumentsWriter {
synchronized void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
pushConfigChange();
}
private final void pushConfigChange() {
final Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
while (it.hasNext()) {
DocumentsWriterPerThread perThread = it.next().perThread;
perThread.docState.infoStream = this.infoStream;
it.next().perThread.docState.infoStream = infoStream;
}
}
@ -218,8 +215,9 @@ final class DocumentsWriter {
// returns boolean for asserts
boolean message(String message) {
if (infoStream != null)
if (infoStream != null) {
indexWriter.message("DW: " + message);
}
return true;
}
@ -297,45 +295,52 @@ final class DocumentsWriter {
ensureOpen();
boolean maybeMerge = false;
final boolean isUpdate = delTerm != null;
if (healthiness.isStalled()) {
/*
* if we are allowed to hijack threads for flushing we try to flush out
* as many pending DWPT to release memory and get back healthy status.
*/
if (healthiness.anyStalledThreads()) {
// Help out flushing any pending DWPTs so we can un-stall:
if (infoStream != null) {
message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment");
message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)");
}
// try pick up pending threads here if possile
// Try pick up pending threads here if possible
DocumentsWriterPerThread flushingDWPT;
while ( (flushingDWPT = flushControl.nextPendingFlush()) != null){
// don't push the delete here since the update could fail!
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
maybeMerge = doFlush(flushingDWPT);
if (!healthiness.isStalled()) {
if (!healthiness.anyStalledThreads()) {
break;
}
}
if (infoStream != null && healthiness.isStalled()) {
message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore");
if (infoStream != null && healthiness.anyStalledThreads()) {
message("WARNING DocumentsWriter still has stalled threads; waiting");
}
healthiness.waitIfStalled(); // block if stalled
if (infoStream != null && healthiness.anyStalledThreads()) {
message("WARNING DocumentsWriter done waiting");
}
}
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
this, doc);
final DocumentsWriterPerThread flushingDWPT;
final DocumentsWriterPerThread dwpt;
try {
if (!perThread.isActive()) {
ensureOpen();
assert false: "perThread is not active but we are still open";
}
dwpt = perThread.perThread;
final DocumentsWriterPerThread dwpt = perThread.perThread;
try {
dwpt.updateDocument(doc, analyzer, delTerm);
numDocsInRAM.incrementAndGet();
} finally {
if(dwpt.checkAndResetHasAborted()) {
flushControl.doOnAbort(perThread);
if (dwpt.checkAndResetHasAborted()) {
flushControl.doOnAbort(perThread);
}
}
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
@ -376,46 +381,53 @@ final class DocumentsWriter {
* might miss to deletes documents in 'A'.
*/
synchronized (ticketQueue) {
// each flush is assigned a ticket in the order they accquire the ticketQueue lock
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
ticketQueue.add(ticket);
// Each flush is assigned a ticket in the order they accquire the ticketQueue lock
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
ticketQueue.add(ticket);
}
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
// nocommit -- should this success = true be moved
// under the applyFlushTickets?
success = true;
/*
* now we are done and try to flush the ticket queue if the head of the
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
applyFlushTickets(ticket, newSegment);
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
indexWriter.flushCount.incrementAndGet();
if (!success && ticket != null) {
synchronized (ticketQueue) {
// in the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed
ticket.isSegmentFlush = false;
}
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
indexWriter.flushCount.incrementAndGet();
if (!success && ticket != null) {
synchronized (ticketQueue) {
// nocommit -- shouldn't we drop the ticket in
// this case?
// In the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed
ticket.isSegmentFlush = false;
}
}
}
flushingDWPT = flushControl.nextPendingFlush() ;
flushingDWPT = flushControl.nextPendingFlush();
}
return maybeMerge;
}
private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException {
synchronized (ticketQueue) {
if (current != null) {
// this is a segment FlushTicket so assign the flushed segment so we can make progress.
// nocommit -- can't caller set current.segment = segment?
// nocommit -- confused by this comment:
// This is a segment FlushTicket so assign the flushed segment so we can make progress.
assert segment != null;
current.segment = segment;
}
while (true) {
// while we can publish flushes keep on making the queue empty.
// Keep publishing eligible flushed segments:
final FlushTicket head = ticketQueue.peek();
if (head != null && head.canPublish()) {
ticketQueue.poll();
@ -426,11 +438,10 @@ final class DocumentsWriter {
}
}
}
private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// this is eventually finishing the flushed segment and publishing it to the IndexWriter
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
@ -442,9 +453,6 @@ final class DocumentsWriter {
} else {
publishFlushedSegment(newSegment, bufferedDeletes);
}
}
final void subtractFlushedNumDocs(int numFlushed) {
@ -455,10 +463,10 @@ final class DocumentsWriter {
}
/**
* publishes the flushed segment, segment private deletes if any and its
* associated global delete if present to the index writer. the actual
* publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}
* 's delete generation is always GlobalPacket_deleteGeneration + 1
* Publishes the flushed segment, segment private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
*/
private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
throws IOException {
@ -467,12 +475,13 @@ final class DocumentsWriter {
final BufferedDeletes deletes = newSegment.segmentDeletes;
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {
// segment private delete
// Segment private delete
packet = new FrozenBufferedDeletes(deletes, true);
if (infoStream != null) {
message("flush: push buffered seg private deletes: " + packet);
}
}
// now publish!
indexWriter.publishFlushedSegment(segInfo, packet, globalPacket);
}
@ -486,10 +495,9 @@ final class DocumentsWriter {
}
/*
* flushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
* two stage operations, the caller must ensure that #finishFlush is called
* after this method to release the flush lock in DWFlushControl - use try /
* finally!
* FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
final boolean flushAllThreads(final boolean flushDeletes)
throws IOException {
@ -497,9 +505,11 @@ final class DocumentsWriter {
synchronized (this) {
flushingDeleteQueue = deleteQueue;
/* sets a new delete queue - this must be synced on the flush control
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
// nocommit -- shouldn't we do this?:
// assert Thread.holdsLock(flushControl);
flushControl.markForFullFlush();
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
@ -509,18 +519,18 @@ final class DocumentsWriter {
boolean anythingFlushed = false;
try {
DocumentsWriterPerThread flushingDWPT;
// now try help out with flushing
// Help out with flushing:
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
// if a concurrent flush is still in flight wait for it
while (!flushControl.allFlushesDue()) {
// If a concurrent flush is still in flight wait for it
while (flushControl.anyFlushing()) {
flushControl.waitForFlush();
}
if (!anythingFlushed && flushDeletes) {
synchronized (ticketQueue) {
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
}
applyFlushTickets(null, null);
}
} finally {
@ -532,13 +542,16 @@ final class DocumentsWriter {
final void finishFullFlush(boolean success) {
assert setFlushingDeleteQueue(null);
if (success) {
// release the flush lock
// Release the flush lock
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
}
}
// nocommit -- can we add comment justifying that these
// fields are safely changed across threads because they
// are always accessed in sync(ticketQueue)?
static final class FlushTicket {
final FrozenBufferedDeletes frozenDeletes;
FlushedSegment segment;

View File

@ -23,15 +23,15 @@ import org.apache.lucene.search.Query;
/**
* {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
* queue. In contrast to other queue implementation we only maintain only the
* queue. In contrast to other queue implementation we only maintain the
* tail of the queue. A delete queue is always used in a context of a set of
* DWPT and a global delete pool. Each of the DWPT and the global pool need to
* maintain their 'own' head of the queue. The difference between the DWPT and
* the global pool is that the DWPT starts maintaining a head once it has added
* its first document since for its segments private deletes only the deletes
* after that document are relevant. The global pool instead starts maintaining
* the head once this instance is created by taking the sentinel instance as its
* initial head.
* DWPTs and a global delete pool. Each of the DWPT and the global pool need to
* maintain their 'own' head of the queue (as a DeleteSlice instance per DWPT).
* The difference between the DWPT and the global pool is that the DWPT starts
* maintaining a head once it has added its first document since for its segments
* private deletes only the deletes after that document are relevant. The global
* pool instead starts maintaining the head once this instance is created by
* taking the sentinel instance as its initial head.
* <p>
* Since each {@link DeleteSlice} maintains its own head and the list is only
* single linked the garbage collector takes care of pruning the list for us.
@ -41,12 +41,12 @@ import org.apache.lucene.search.Query;
* <p>
* Each DWPT as well as the global delete pool maintain their private
* DeleteSlice instance. In the DWPT case updating a slice is equivalent to
* atomically finishing the document. The slice update guarantees a happens
* before relationship to all other updates in the same indexing session. When a
* DWPT updates a document it
* atomically finishing the document. The slice update guarantees a "happens
* before" relationship to all other updates in the same indexing session. When a
* DWPT updates a document it:
*
* <ol>
* <li>consumes a document finishes its processing</li>
* <li>consumes a document and finishes its processing</li>
* <li>updates its private {@link DeleteSlice} either by calling
* {@link #updateSlice(DeleteSlice)} or {@link #add(Term, DeleteSlice)} (if the
* document has a delTerm)</li>
@ -56,7 +56,7 @@ import org.apache.lucene.search.Query;
* </ol>
*
* The DWPT also doesn't apply its current documents delete term until it has
* updated its delete slice which ensures the consistency of the update. if the
* updated its delete slice which ensures the consistency of the update. If the
* update fails before the DeleteSlice could have been updated the deleteTerm
* will also not be added to its private deletes neither to the global deletes.
*
@ -167,7 +167,7 @@ final class DocumentsWriterDeleteQueue {
void tryApplyGlobalSlice() {
if (globalBufferLock.tryLock()) {
/*
* the global buffer must be locked but we don't need to upate them if
* The global buffer must be locked but we don't need to upate them if
* there is an update going on right now. It is sufficient to apply the
* deletes that have been added after the current in-flight global slices
* tail the next time we can get the lock!
@ -175,7 +175,6 @@ final class DocumentsWriterDeleteQueue {
try {
if (updateSlice(globalSlice)) {
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
} finally {
globalBufferLock.unlock();
@ -186,15 +185,15 @@ final class DocumentsWriterDeleteQueue {
FrozenBufferedDeletes freezeGlobalBuffer(DeleteSlice callerSlice) {
globalBufferLock.lock();
/*
* here we are freezing the global buffer so we need to lock it, apply all
* Here we freeze the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
final Node currentTail = tail; // take the current tail make this local any
// changes after this call are applied later
// Changes after this call are applied later
// and not relevant here
if (callerSlice != null) {
// update the callers slices so we are on the same page
// Update the callers slices so we are on the same page
callerSlice.sliceTail = currentTail;
}
try {
@ -217,7 +216,7 @@ final class DocumentsWriterDeleteQueue {
}
boolean updateSlice(DeleteSlice slice) {
if (slice.sliceTail != tail) { // if we are the same just
if (slice.sliceTail != tail) { // If we are the same just
slice.sliceTail = tail;
return true;
}
@ -225,7 +224,7 @@ final class DocumentsWriterDeleteQueue {
}
static class DeleteSlice {
// no need to be volatile, slices are only access by one thread!
// No need to be volatile, slices are thread captive (only accessed by one thread)!
Node sliceHead; // we don't apply this one
Node sliceTail;
@ -245,7 +244,7 @@ final class DocumentsWriterDeleteQueue {
return;
}
/*
* when we apply a slice we take the head and get its next as our first
* When we apply a slice we take the head and get its next as our first
* item to apply and continue until we applied the tail. If the head and
* tail in this slice are not equal then there will be at least one more
* non-null node in the slice!
@ -260,7 +259,7 @@ final class DocumentsWriterDeleteQueue {
}
void reset() {
// resetting to a 0 length slice
// Reset to a 0 length slice
sliceHead = sliceTail;
}
@ -322,7 +321,6 @@ final class DocumentsWriterDeleteQueue {
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
bufferedDeletes.addTerm((Term) item, docIDUpto);
}
}
private static final class QueryArrayNode extends Node {
@ -376,6 +374,5 @@ final class DocumentsWriterDeleteQueue {
} finally {
globalBufferLock.unlock();
}
}
}

View File

@ -40,7 +40,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
*/
public final class DocumentsWriterFlushControl {
private final long maxBytesPerDWPT;
private final long hardMaxBytesPerDWPT;
private long activeBytes = 0;
private long flushBytes = 0;
private volatile int numPending = 0;
@ -63,11 +63,11 @@ public final class DocumentsWriterFlushControl {
private final DocumentsWriter documentsWriter;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter,
Healthiness healthiness, long maxBytesPerDWPT) {
Healthiness healthiness, long hardMaxBytesPerDWPT) {
this.healthiness = healthiness;
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
this.maxBytesPerDWPT = maxBytesPerDWPT;
this.hardMaxBytesPerDWPT = hardMaxBytesPerDWPT;
this.documentsWriter = documentsWriter;
}
@ -85,8 +85,8 @@ public final class DocumentsWriterFlushControl {
private void commitPerThreadBytes(ThreadState perThread) {
final long delta = perThread.perThread.bytesUsed()
- perThread.perThreadBytes;
perThread.perThreadBytes += delta;
- perThread.bytesUsed;
perThread.bytesUsed += delta;
/*
* We need to differentiate here if we are pending since setFlushPending
* moves the perThread memory to the flushBytes and we could be set to
@ -100,6 +100,7 @@ public final class DocumentsWriterFlushControl {
assert updatePeaks(delta);
}
// only for asserts
private boolean updatePeaks(long delta) {
peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
@ -116,10 +117,9 @@ public final class DocumentsWriterFlushControl {
} else {
flushPolicy.onInsert(this, perThread);
}
if (!perThread.flushPending && perThread.perThreadBytes > maxBytesPerDWPT) {
// safety check to prevent a single DWPT exceeding its RAM limit. This
// is super
// important since we can not address more than 2048 MB per DWPT
if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
if (fullFlush) {
DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread, false);
@ -146,8 +146,8 @@ public final class DocumentsWriterFlushControl {
}
}
public synchronized boolean allFlushesDue() {
return numFlushing == 0;
public synchronized boolean anyFlushing() {
return numFlushing != 0;
}
public synchronized void waitForFlush() {
@ -169,7 +169,7 @@ public final class DocumentsWriterFlushControl {
assert !perThread.flushPending;
if (perThread.perThread.getNumDocsInRAM() > 0) {
perThread.flushPending = true; // write access synced
final long bytes = perThread.perThreadBytes;
final long bytes = perThread.bytesUsed;
flushBytes += bytes;
activeBytes -= bytes;
numPending++; // write access synced
@ -179,19 +179,20 @@ public final class DocumentsWriterFlushControl {
synchronized void doOnAbort(ThreadState state) {
if (state.flushPending) {
flushBytes -= state.perThreadBytes;
flushBytes -= state.bytesUsed;
} else {
activeBytes -= state.perThreadBytes;
activeBytes -= state.bytesUsed;
}
// take it out of the loop this DWPT is stale
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
healthiness.updateStalled(this);
}
synchronized DocumentsWriterPerThread tryCheckoutForFlush(
ThreadState perThread, boolean setPending) {
if (fullFlush)
if (fullFlush) {
return null;
}
return internalTryCheckOutForFlush(perThread, setPending);
}
@ -201,17 +202,17 @@ public final class DocumentsWriterFlushControl {
setFlushPending(perThread);
}
if (perThread.flushPending) {
// we are pending so all memory is already moved to flushBytes
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
if (perThread.isActive()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.perThreadBytes; // do that before
final long bytes = perThread.bytesUsed; // do that before
// replace!
dwpt = perThreadPool.replaceForFlush(perThread, closed);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// record the flushing DWPT to reduce flushBytes in doAfterFlush
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
numPending--; // write access synced
numFlushing++;
@ -298,8 +299,12 @@ public final class DocumentsWriterFlushControl {
return numFlushing;
}
public void setFlushDeletes() {
flushDeletes.set(true);
public boolean doApplyAllDeletes() {
return flushDeletes.getAndSet(false);
}
public void setApplyAllDeletes() {
flushDeletes.set(true);
}
int numActiveDWPT() {
@ -312,7 +317,7 @@ public final class DocumentsWriterFlushControl {
assert !fullFlush;
fullFlush = true;
flushingQueue = documentsWriter.deleteQueue;
// set a new delete queue - all subsequent DWPT will use this queue until
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
documentsWriter.deleteQueue = new DocumentsWriterDeleteQueue(new BufferedDeletes(false));
}
@ -374,9 +379,9 @@ public final class DocumentsWriterFlushControl {
}
} finally {
fullFlush = false;
flushQueue.clear();
blockedFlushes.clear();
fullFlush = false;
}
}

View File

@ -44,7 +44,7 @@ public abstract class DocumentsWriterPerThreadPool {
// write access guarded by DocumentsWriterFlushControl
volatile boolean flushPending = false;
// write access guarded by DocumentsWriterFlushControl
long perThreadBytes = 0;
long bytesUsed = 0;
// guarded by Reentrant lock
private boolean isActive = true;
@ -65,7 +65,7 @@ public abstract class DocumentsWriterPerThreadPool {
isActive = false;
}
this.perThread = perThread;
this.perThreadBytes = 0;
this.bytesUsed = 0;
this.flushPending = false;
}
@ -86,7 +86,7 @@ public abstract class DocumentsWriterPerThreadPool {
public long getBytesUsedPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
return perThreadBytes;
return bytesUsed;
}
/**
@ -162,9 +162,9 @@ public abstract class DocumentsWriterPerThreadPool {
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
public abstract void clearThreadBindings(ThreadState perThread);
//public abstract void clearThreadBindings(ThreadState perThread);
public abstract void clearAllThreadBindings();
// public abstract void clearAllThreadBindings();
/**
* Returns an iterator providing access to all {@link ThreadState}

View File

@ -21,16 +21,16 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
/**
* Default {@link FlushPolicy} implementation that flushes based on RAM
* Consumption, document count and number of buffered deletes depending on the
* IndexWriters {@link IndexWriterConfig}. This {@link FlushPolicy} will only
* used, document count and number of buffered deletes depending on the
* IndexWriter's {@link IndexWriterConfig}. This {@link FlushPolicy} will only
* respect settings which are not disabled during initialization (
* {@link #init(DocumentsWriter)}). All enabled {@link IndexWriterConfig}
* {@link #init(DocumentsWriter)}) (nocommit what does that mean?). All enabled {@link IndexWriterConfig}
* settings are used to mark {@link DocumentsWriterPerThread} as flush pending
* during indexing with respect to thier live updates.
* during indexing with respect to their live updates.
* <p>
* If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled always the
* If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the
* largest ram consuming {@link DocumentsWriterPerThread} will be marked as
* pending iff the global active RAM consumption is equals or higher the
* pending iff the global active RAM consumption is >= the
* configured max RAM buffer.
*/
public class FlushByRamOrCountsPolicy extends FlushPolicy {
@ -38,10 +38,11 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
@Override
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
if (flushOnDeleteTerms()) {
// Flush this state by num del terms
final int maxBufferedDeleteTerms = indexWriterConfig
.getMaxBufferedDeleteTerms();
if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) {
control.setFlushDeletes();
control.setApplyAllDeletes();
}
}
}
@ -51,12 +52,12 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
if (flushOnDocCount()
&& state.perThread.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) {
control.setFlushPending(state); // flush by num docs
// Flush this state by num docs
control.setFlushPending(state);
} else {// flush by RAM
if (flushOnRAM()) {
final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
final long totalRam = control.activeBytes();
final long limit = (long) (ramBufferSizeMB * 1024.d * 1024.d);
if (totalRam >= limit) {
markLargestWriterPending(control, state, totalRam);
}

View File

@ -32,16 +32,16 @@ import org.apache.lucene.util.SetOnce;
* {@link IndexWriterConfig#setRAMBufferSizeMB(double)}</li>
* <li>Number of RAM resident documents - configured via
* {@link IndexWriterConfig#setMaxBufferedDocs(int)}</li>
* <li>Number of buffered delete terms - configured via
* <li>Number of buffered delete terms/queries - configured via
* {@link IndexWriterConfig#setMaxBufferedDeleteTerms(int)}</li>
* </ul>
*
* The {@link IndexWriter} uses a provided {@link FlushPolicy} to control the
* flushing process during indexing. The policy is informed for each added or
* The {@link IndexWriter} consults a provided {@link FlushPolicy} to control the
* flushing process. The policy is informed for each added or
* updated document as well as for each delete term. Based on the
* {@link FlushPolicy} the information provided via {@link ThreadState} and
* {@link DocumentsWriterFlushControl} the {@link FlushPolicy} can decide if a
* {@link DocumentsWriterPerThread} needs flushing and can mark it as
* {@link FlushPolicy}, the information provided via {@link ThreadState} and
* {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a
* {@link DocumentsWriterPerThread} needs flushing and mark it as
* flush-pending via
* {@link DocumentsWriterFlushControl#setFlushPending(ThreadState)}.
*
@ -58,6 +58,7 @@ public abstract class FlushPolicy {
* Called for each delete term. If this is a delete triggered due to an update
* the given {@link ThreadState} is non-null.
* <p>
* nocommit: what does this note mean...?
* Note: This method is synchronized by the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
* thread holds the lock on the given {@link ThreadState}
@ -66,9 +67,10 @@ public abstract class FlushPolicy {
ThreadState state);
/**
* Called for each document update on the given {@link ThreadState}s
* Called for each document update on the given {@link ThreadState}'s
* {@link DocumentsWriterPerThread}.
* <p>
* nocommit: what does this note mean...?
* Note: This method is synchronized by the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
* thread holds the lock on the given {@link ThreadState}
@ -103,6 +105,7 @@ public abstract class FlushPolicy {
* Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
* pending
*/
// nocommit -- move to default policy?
protected void markLargestWriterPending(DocumentsWriterFlushControl control,
ThreadState perThreadState, final long currentBytesPerThread) {
control
@ -117,7 +120,8 @@ public abstract class FlushPolicy {
*/
protected ThreadState findLargestNonPendingWriter(
DocumentsWriterFlushControl control, ThreadState perThreadState) {
long maxRamSoFar = perThreadState.perThreadBytes;
assert perThreadState.perThread.getNumDocsInRAM() > 0;
long maxRamSoFar = perThreadState.bytesUsed;
// the dwpt which needs to be flushed eventually
ThreadState maxRamUsingThreadState = perThreadState;
assert !perThreadState.flushPending : "DWPT should have flushed";
@ -125,7 +129,7 @@ public abstract class FlushPolicy {
while (activePerThreadsIterator.hasNext()) {
ThreadState next = activePerThreadsIterator.next();
if (!next.flushPending) {
final long nextRam = next.perThreadBytes;
final long nextRam = next.bytesUsed;
if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
maxRamSoFar = nextRam;
maxRamUsingThreadState = next;
@ -137,6 +141,8 @@ public abstract class FlushPolicy {
return maxRamUsingThreadState;
}
// nocommit -- I thought we pause based on "too many flush
// states pending"?
/**
* Returns the max net memory which marks the upper watermark for the
* DocumentsWriter to be healthy. If all flushing and active
@ -154,6 +160,7 @@ public abstract class FlushPolicy {
*/
public long getMaxNetBytes() {
if (!flushOnRAM()) {
// nocommit explain that returning -1 is allowed?
return -1;
}
final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
@ -165,6 +172,8 @@ public abstract class FlushPolicy {
* {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
* <code>false</code>.
*/
// nocommit who needs this? policy shouldn't have to impl
// this? our default policy should?
protected boolean flushOnDocCount() {
return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
}
@ -174,6 +183,8 @@ public abstract class FlushPolicy {
* {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
* <code>false</code>.
*/
// nocommit who needs this? policy shouldn't have to impl
// this? our default policy should?
protected boolean flushOnDeleteTerms() {
return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
}
@ -183,6 +194,8 @@ public abstract class FlushPolicy {
* {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
* <code>false</code>.
*/
// nocommit who needs this? policy shouldn't have to impl
// this? our default policy should?
protected boolean flushOnRAM() {
return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
}

View File

@ -52,7 +52,7 @@ final class FreqProxTermsWriter extends TermsHashConsumer {
final int numAllFields = allFields.size();
// sort by field name
// Sort by field name
CollectionUtil.quickSort(allFields);
final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);

View File

@ -83,10 +83,10 @@ final class Healthiness {
}
}
private final Healthiness.Sync sync = new Sync();
private final Sync sync = new Sync();
volatile boolean wasStalled = false; // only with asserts
boolean isStalled() {
boolean anyStalledThreads() {
return !sync.isHealthy();
}

View File

@ -383,7 +383,7 @@ public class IndexWriter implements Closeable {
if (!success && infoStream != null) {
message("hit exception during while NRT reader");
}
// now we are done - finish the full flush!
// Done: finish the full flush!
docWriter.finishFullFlush(success);
doAfterFlush();
}
@ -2073,7 +2073,7 @@ public class IndexWriter implements Closeable {
if (useCompoundFile(newSegment)) {
String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
message("creating compound file " + compoundFileName);
// Now build compound file
// Now build compound file
CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
for(String fileName : newSegment.files()) {
cfsWriter.addFile(fileName);
@ -2146,18 +2146,18 @@ public class IndexWriter implements Closeable {
*/
synchronized void publishFlushedSegment(SegmentInfo newSegment,
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
// lock order IW -> BDS
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
// publishing the segment must be synched on IW -> BDS to make the sure
// Publishing the segment must be synched on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedDeletesStream.push(packet);
} else {
// since we don't have a delete packet to apply we can get a new
// Since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
@ -2572,7 +2572,11 @@ public class IndexWriter implements Closeable {
message("commit: done");
}
}
// Ensures only one flush() is actually flushing segments
// at a time:
private final Object fullFlushLock = new Object();
/**
* Flush all in-memory buffered updates (adds and deletes)
* to the Directory.
@ -2595,9 +2599,7 @@ public class IndexWriter implements Closeable {
maybeMerge();
}
}
// TODO: this method should not have to be entirely
// synchronized, ie, merges should be allowed to commit
// even while a flush is happening
private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush");
@ -2645,6 +2647,8 @@ public class IndexWriter implements Closeable {
final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
if (!applyAllDeletes) {
// nocommit -- shouldn't this move into the default
// flush policy?
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
@ -2670,31 +2674,31 @@ public class IndexWriter implements Closeable {
}
final synchronized void applyAllDeletes() throws IOException {
flushDeletesCount.incrementAndGet();
final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
.applyDeletes(readerPool, segmentInfos);
if (result.anyDeletes) {
checkpoint();
flushDeletesCount.incrementAndGet();
final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
.applyDeletes(readerPool, segmentInfos);
if (result.anyDeletes) {
checkpoint();
}
if (!keepFullyDeletedSegments && result.allDeleted != null) {
if (infoStream != null) {
message("drop 100% deleted segments: " + result.allDeleted);
}
if (!keepFullyDeletedSegments && result.allDeleted != null) {
if (infoStream != null) {
message("drop 100% deleted segments: " + result.allDeleted);
}
for (SegmentInfo info : result.allDeleted) {
// If a merge has already registered for this
// segment, we leave it in the readerPool; the
// merge will skip merging it and will then drop
// it once it's done:
if (!mergingSegments.contains(info)) {
segmentInfos.remove(info);
if (readerPool != null) {
readerPool.drop(info);
}
for (SegmentInfo info : result.allDeleted) {
// If a merge has already registered for this
// segment, we leave it in the readerPool; the
// merge will skip merging it and will then drop
// it once it's done:
if (!mergingSegments.contains(info)) {
segmentInfos.remove(info);
if (readerPool != null) {
readerPool.drop(info);
}
}
checkpoint();
}
bufferedDeletesStream.prune(segmentInfos);
checkpoint();
}
bufferedDeletesStream.prune(segmentInfos);
}
/** Expert: Return the total size of all index files currently cached in memory.

View File

@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.document.Document;
// nocommit jdoc
// nocommit -- can/should apps set this via IWC
public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
@ -37,8 +39,12 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
}
}
ThreadState minThreadState = null;
// find the state that has minimum amount of threads waiting
// Find the state that has minimum number of threads waiting
// noocommit -- can't another thread lock the
// minThreadState we just got?
minThreadState = minContendedThreadState();
if (minThreadState == null || minThreadState.hasQueuedThreads()) {
ThreadState newState = newThreadState();
if (newState != null) {
@ -59,6 +65,7 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
return minThreadState;
}
/*
@Override
public void clearThreadBindings(ThreadState perThread) {
threadBindings.clear();
@ -68,5 +75,5 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
public void clearAllThreadBindings() {
threadBindings.clear();
}
*/
}