mirror of https://github.com/apache/lucene.git
LUCENE-3023: some polishing & removed all nocommit
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/realtime_search@1096731 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2b51cd5d72
commit
fa1ac87eaa
|
@ -148,38 +148,26 @@ final class DocumentsWriter {
|
|||
flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
|
||||
}
|
||||
|
||||
synchronized boolean deleteQueries(final Query... queries) throws IOException {
|
||||
synchronized void deleteQueries(final Query... queries) throws IOException {
|
||||
deleteQueue.addDelete(queries);
|
||||
// nocommit -- shouldn't we check for doApplyAllDeletes
|
||||
// here too?
|
||||
// nocommit shouldn't this consult flush policy? or
|
||||
// should this return void now?
|
||||
return false;
|
||||
flushControl.doOnDelete();
|
||||
if (flushControl.doApplyAllDeletes()) {
|
||||
applyAllDeletes(deleteQueue);
|
||||
}
|
||||
}
|
||||
|
||||
boolean deleteQuery(final Query query) throws IOException {
|
||||
return deleteQueries(query);
|
||||
}
|
||||
|
||||
synchronized boolean deleteTerms(final Term... terms) throws IOException {
|
||||
// TODO: we could check w/ FreqProxTermsWriter: if the
|
||||
// term doesn't exist, don't bother buffering into the
|
||||
// per-DWPT map (but still must go into the global map)
|
||||
synchronized void deleteTerms(final Term... terms) throws IOException {
|
||||
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
|
||||
deleteQueue.addDelete(terms);
|
||||
flushControl.doOnDelete();
|
||||
if (flushControl.doApplyAllDeletes()) {
|
||||
applyAllDeletes(deleteQueue);
|
||||
}
|
||||
// nocommit shouldn't this consult flush policy? or
|
||||
// should this return void now?
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: we could check w/ FreqProxTermsWriter: if the
|
||||
// term doesn't exist, don't bother buffering into the
|
||||
// per-DWPT map (but still must go into the global map)
|
||||
boolean deleteTerm(final Term term) throws IOException {
|
||||
return deleteTerms(term);
|
||||
}
|
||||
|
||||
DocumentsWriterDeleteQueue currentDeleteSession() {
|
||||
return deleteQueue;
|
||||
}
|
||||
|
@ -189,7 +177,7 @@ final class DocumentsWriter {
|
|||
synchronized (ticketQueue) {
|
||||
// Freeze and insert the delete flush ticket in the queue
|
||||
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
|
||||
applyFlushTickets(null, null);
|
||||
applyFlushTickets();
|
||||
}
|
||||
}
|
||||
indexWriter.applyAllDeletes();
|
||||
|
@ -380,52 +368,48 @@ final class DocumentsWriter {
|
|||
* otherwise the deletes frozen by 'B' are not applied to 'A' and we
|
||||
* might miss to deletes documents in 'A'.
|
||||
*/
|
||||
synchronized (ticketQueue) {
|
||||
// Each flush is assigned a ticket in the order they accquire the ticketQueue lock
|
||||
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
|
||||
ticketQueue.add(ticket);
|
||||
try {
|
||||
synchronized (ticketQueue) {
|
||||
// Each flush is assigned a ticket in the order they accquire the ticketQueue lock
|
||||
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
|
||||
ticketQueue.add(ticket);
|
||||
}
|
||||
|
||||
// flush concurrently without locking
|
||||
final FlushedSegment newSegment = flushingDWPT.flush();
|
||||
synchronized (ticketQueue) {
|
||||
ticket.segment = newSegment;
|
||||
}
|
||||
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success && ticket != null) {
|
||||
synchronized (ticketQueue) {
|
||||
// In the case of a failure make sure we are making progress and
|
||||
// apply all the deletes since the segment flush failed since the flush
|
||||
// ticket could hold global deletes see FlushTicket#canPublish()
|
||||
ticket.isSegmentFlush = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flush concurrently without locking
|
||||
final FlushedSegment newSegment = flushingDWPT.flush();
|
||||
|
||||
// nocommit -- should this success = true be moved
|
||||
// under the applyFlushTickets?
|
||||
success = true;
|
||||
|
||||
/*
|
||||
* Now we are done and try to flush the ticket queue if the head of the
|
||||
* queue has already finished the flush.
|
||||
*/
|
||||
applyFlushTickets(ticket, newSegment);
|
||||
applyFlushTickets();
|
||||
} finally {
|
||||
flushControl.doAfterFlush(flushingDWPT);
|
||||
flushingDWPT.checkAndResetHasAborted();
|
||||
indexWriter.flushCount.incrementAndGet();
|
||||
if (!success && ticket != null) {
|
||||
synchronized (ticketQueue) {
|
||||
// nocommit -- shouldn't we drop the ticket in
|
||||
// this case?
|
||||
// In the case of a failure make sure we are making progress and
|
||||
// apply all the deletes since the segment flush failed
|
||||
ticket.isSegmentFlush = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
flushingDWPT = flushControl.nextPendingFlush();
|
||||
}
|
||||
return maybeMerge;
|
||||
}
|
||||
|
||||
private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException {
|
||||
private void applyFlushTickets() throws IOException {
|
||||
synchronized (ticketQueue) {
|
||||
if (current != null) {
|
||||
// nocommit -- can't caller set current.segment = segment?
|
||||
// nocommit -- confused by this comment:
|
||||
// This is a segment FlushTicket so assign the flushed segment so we can make progress.
|
||||
assert segment != null;
|
||||
current.segment = segment;
|
||||
}
|
||||
while (true) {
|
||||
// Keep publishing eligible flushed segments:
|
||||
final FlushTicket head = ticketQueue.peek();
|
||||
|
@ -508,9 +492,7 @@ final class DocumentsWriter {
|
|||
/* Cutover to a new delete queue. This must be synced on the flush control
|
||||
* otherwise a new DWPT could sneak into the loop with an already flushing
|
||||
* delete queue */
|
||||
// nocommit -- shouldn't we do this?:
|
||||
// assert Thread.holdsLock(flushControl);
|
||||
flushControl.markForFullFlush();
|
||||
flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
|
||||
assert setFlushingDeleteQueue(flushingDeleteQueue);
|
||||
}
|
||||
assert currentFullFlushDelQueue != null;
|
||||
|
@ -531,7 +513,7 @@ final class DocumentsWriter {
|
|||
synchronized (ticketQueue) {
|
||||
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
|
||||
}
|
||||
applyFlushTickets(null, null);
|
||||
applyFlushTickets();
|
||||
}
|
||||
} finally {
|
||||
assert flushingDeleteQueue == currentFullFlushDelQueue;
|
||||
|
@ -549,11 +531,9 @@ final class DocumentsWriter {
|
|||
}
|
||||
}
|
||||
|
||||
// nocommit -- can we add comment justifying that these
|
||||
// fields are safely changed across threads because they
|
||||
// are always accessed in sync(ticketQueue)?
|
||||
static final class FlushTicket {
|
||||
final FrozenBufferedDeletes frozenDeletes;
|
||||
/* access to non-final members must be synchronized on DW#ticketQueue */
|
||||
FlushedSegment segment;
|
||||
boolean isSegmentFlush;
|
||||
|
||||
|
|
|
@ -375,4 +375,8 @@ final class DocumentsWriterDeleteQueue {
|
|||
globalBufferLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long bytesUsed() {
|
||||
return globalBufferedDeletes.bytesUsed.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -279,10 +279,6 @@ public final class DocumentsWriterFlushControl {
|
|||
return perThreadPool.getActivePerThreadsIterator();
|
||||
}
|
||||
|
||||
long maxNetBytes() {
|
||||
return flushPolicy.getMaxNetBytes();
|
||||
}
|
||||
|
||||
synchronized void doOnDelete() {
|
||||
// pass null this is a global delete no update
|
||||
flushPolicy.onDelete(this, null);
|
||||
|
|
|
@ -23,7 +23,23 @@ import org.apache.lucene.document.Document;
|
|||
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
|
||||
import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder;
|
||||
import org.apache.lucene.index.codecs.CodecProvider;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
|
||||
/**
|
||||
* {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
|
||||
* and their thread assignments during indexing. Each {@link ThreadState} holds
|
||||
* a reference to a {@link DocumentsWriterPerThread} that is once a
|
||||
* {@link ThreadState} is obtained from the pool exclusively used for indexing a
|
||||
* single document by the obtaining thread. Each indexing thread must obtain
|
||||
* such a {@link ThreadState} to make progress. Depending on the
|
||||
* {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
|
||||
* assignments might differ from document to document.
|
||||
* <p>
|
||||
* Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool
|
||||
* is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a
|
||||
* new {@link DocumentsWriterPerThread} instance.
|
||||
* </p>
|
||||
*/
|
||||
public abstract class DocumentsWriterPerThreadPool {
|
||||
|
||||
/**
|
||||
|
@ -39,7 +55,7 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public final static class ThreadState extends ReentrantLock {
|
||||
// public for FlushPolicy
|
||||
// package private for FlushPolicy
|
||||
DocumentsWriterPerThread perThread;
|
||||
// write access guarded by DocumentsWriterFlushControl
|
||||
volatile boolean flushPending = false;
|
||||
|
@ -111,6 +127,7 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
private volatile int numThreadStatesActive;
|
||||
private CodecProvider codecProvider;
|
||||
private FieldNumberBiMap globalFieldMap;
|
||||
private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
|
||||
|
||||
public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
|
||||
maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
|
||||
|
@ -120,23 +137,40 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
}
|
||||
|
||||
public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
|
||||
codecProvider = config.getCodecProvider();
|
||||
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
|
||||
final CodecProvider codecs = config.getCodecProvider();
|
||||
this.codecProvider = codecs;
|
||||
this.globalFieldMap = globalFieldMap;
|
||||
for (int i = 0; i < perThreads.length; i++) {
|
||||
final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
|
||||
final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecs));
|
||||
perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the max number of {@link ThreadState} instances available in this
|
||||
* {@link DocumentsWriterPerThreadPool}
|
||||
*/
|
||||
public int getMaxThreadStates() {
|
||||
return perThreads.length;
|
||||
}
|
||||
|
||||
public synchronized ThreadState newThreadState() {
|
||||
/**
|
||||
* Returns a new {@link ThreadState} iff any new state is available otherwise
|
||||
* <code>null</code>.
|
||||
*
|
||||
* @param lock
|
||||
* <code>true</code> iff the new {@link ThreadState} should be locked
|
||||
* before published otherwise <code>false</code>.
|
||||
* @return a new {@link ThreadState} iff any new state is available otherwise
|
||||
* <code>null</code>
|
||||
*/
|
||||
public synchronized ThreadState newThreadState(boolean lock) {
|
||||
if (numThreadStatesActive < perThreads.length) {
|
||||
final ThreadState threadState = perThreads[numThreadStatesActive];
|
||||
threadState.lock();
|
||||
threadState.perThread.initialize();
|
||||
numThreadStatesActive++;
|
||||
numThreadStatesActive++; // increment will publish the ThreadState
|
||||
return threadState;
|
||||
}
|
||||
return null;
|
||||
|
@ -164,7 +198,7 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
|
||||
//public abstract void clearThreadBindings(ThreadState perThread);
|
||||
|
||||
// public abstract void clearAllThreadBindings();
|
||||
//public abstract void clearAllThreadBindings();
|
||||
|
||||
/**
|
||||
* Returns an iterator providing access to all {@link ThreadState}
|
||||
|
|
|
@ -20,18 +20,32 @@ package org.apache.lucene.index;
|
|||
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
|
||||
|
||||
/**
|
||||
* Default {@link FlushPolicy} implementation that flushes based on RAM
|
||||
* used, document count and number of buffered deletes depending on the
|
||||
* IndexWriter's {@link IndexWriterConfig}. This {@link FlushPolicy} will only
|
||||
* respect settings which are not disabled during initialization (
|
||||
* {@link #init(DocumentsWriter)}) (nocommit what does that mean?). All enabled {@link IndexWriterConfig}
|
||||
* settings are used to mark {@link DocumentsWriterPerThread} as flush pending
|
||||
* during indexing with respect to their live updates.
|
||||
* Default {@link FlushPolicy} implementation that flushes based on RAM used,
|
||||
* document count and number of buffered deletes depending on the IndexWriter's
|
||||
* {@link IndexWriterConfig}.
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@link #onDelete(DocumentsWriterFlushControl, ThreadState)} - flushes
|
||||
* based on the global number of buffered delete terms iff
|
||||
* {@link IndexWriterConfig#getMaxBufferedDeleteTerms()} is enabled</li>
|
||||
* <li>{@link #onInsert(DocumentsWriterFlushControl, ThreadState)} - flushes
|
||||
* either on the number of documents per {@link DocumentsWriterPerThread} (
|
||||
* {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active
|
||||
* memory consumption in the current indexing session iff
|
||||
* {@link IndexWriterConfig#getMaxBufferedDocs()} or
|
||||
* {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively</li>
|
||||
* <li>{@link #onUpdate(DocumentsWriterFlushControl, ThreadState)} - calls
|
||||
* {@link #onInsert(DocumentsWriterFlushControl, ThreadState)} and
|
||||
* {@link #onDelete(DocumentsWriterFlushControl, ThreadState)} in order</li>
|
||||
* </ul>
|
||||
* All {@link IndexWriterConfig} settings are used to mark
|
||||
* {@link DocumentsWriterPerThread} as flush pending during indexing with
|
||||
* respect to their live updates.
|
||||
* <p>
|
||||
* If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the
|
||||
* largest ram consuming {@link DocumentsWriterPerThread} will be marked as
|
||||
* pending iff the global active RAM consumption is >= the
|
||||
* configured max RAM buffer.
|
||||
* pending iff the global active RAM consumption is >= the configured max RAM
|
||||
* buffer.
|
||||
*/
|
||||
public class FlushByRamOrCountsPolicy extends FlushPolicy {
|
||||
|
||||
|
@ -45,6 +59,18 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
|
|||
control.setApplyAllDeletes();
|
||||
}
|
||||
}
|
||||
final DocumentsWriter writer = this.writer.get();
|
||||
// If deletes alone are consuming > 1/2 our RAM
|
||||
// buffer, force them all to apply now. This is to
|
||||
// prevent too-frequent flushing of a long tail of
|
||||
// tiny segments:
|
||||
if ((flushOnRAM() &&
|
||||
writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2))) {
|
||||
control.setApplyAllDeletes();
|
||||
if (writer.infoStream != null) {
|
||||
writer.message("force apply deletes bytesUsed=" + writer.deleteQueue.bytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -54,14 +80,49 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
|
|||
.getMaxBufferedDocs()) {
|
||||
// Flush this state by num docs
|
||||
control.setFlushPending(state);
|
||||
} else {// flush by RAM
|
||||
if (flushOnRAM()) {
|
||||
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
|
||||
final long totalRam = control.activeBytes();
|
||||
if (totalRam >= limit) {
|
||||
markLargestWriterPending(control, state, totalRam);
|
||||
}
|
||||
} else if (flushOnRAM()) {// flush by RAM
|
||||
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
|
||||
final long totalRam = control.activeBytes();
|
||||
if (totalRam >= limit) {
|
||||
markLargestWriterPending(control, state, totalRam);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
|
||||
* pending
|
||||
*/
|
||||
protected void markLargestWriterPending(DocumentsWriterFlushControl control,
|
||||
ThreadState perThreadState, final long currentBytesPerThread) {
|
||||
control
|
||||
.setFlushPending(findLargestNonPendingWriter(control, perThreadState));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this {@link FlushPolicy} flushes on
|
||||
* {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
|
||||
* <code>false</code>.
|
||||
*/
|
||||
protected boolean flushOnDocCount() {
|
||||
return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this {@link FlushPolicy} flushes on
|
||||
* {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
|
||||
* <code>false</code>.
|
||||
*/
|
||||
protected boolean flushOnDeleteTerms() {
|
||||
return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this {@link FlushPolicy} flushes on
|
||||
* {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
|
||||
* <code>false</code>.
|
||||
*/
|
||||
protected boolean flushOnRAM() {
|
||||
return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,8 +58,7 @@ public abstract class FlushPolicy {
|
|||
* Called for each delete term. If this is a delete triggered due to an update
|
||||
* the given {@link ThreadState} is non-null.
|
||||
* <p>
|
||||
* nocommit: what does this note mean...?
|
||||
* Note: This method is synchronized by the given
|
||||
* Note: This method is called synchronized on the given
|
||||
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
|
||||
* thread holds the lock on the given {@link ThreadState}
|
||||
*/
|
||||
|
@ -70,8 +69,7 @@ public abstract class FlushPolicy {
|
|||
* Called for each document update on the given {@link ThreadState}'s
|
||||
* {@link DocumentsWriterPerThread}.
|
||||
* <p>
|
||||
* nocommit: what does this note mean...?
|
||||
* Note: This method is synchronized by the given
|
||||
* Note: This method is called synchronized on the given
|
||||
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
|
||||
* thread holds the lock on the given {@link ThreadState}
|
||||
*/
|
||||
|
@ -101,17 +99,6 @@ public abstract class FlushPolicy {
|
|||
indexWriterConfig = docsWriter.indexWriter.getConfig();
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
|
||||
* pending
|
||||
*/
|
||||
// nocommit -- move to default policy?
|
||||
protected void markLargestWriterPending(DocumentsWriterFlushControl control,
|
||||
ThreadState perThreadState, final long currentBytesPerThread) {
|
||||
control
|
||||
.setFlushPending(findLargestNonPendingWriter(control, perThreadState));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current most RAM consuming non-pending {@link ThreadState} with
|
||||
* at least one indexed document.
|
||||
|
@ -141,63 +128,4 @@ public abstract class FlushPolicy {
|
|||
return maxRamUsingThreadState;
|
||||
}
|
||||
|
||||
// nocommit -- I thought we pause based on "too many flush
|
||||
// states pending"?
|
||||
/**
|
||||
* Returns the max net memory which marks the upper watermark for the
|
||||
* DocumentsWriter to be healthy. If all flushing and active
|
||||
* {@link DocumentsWriterPerThread} consume more memory than the upper
|
||||
* watermark all incoming threads should be stalled and blocked until the
|
||||
* memory drops below this.
|
||||
* <p>
|
||||
* Note: the upper watermark is only taken into account if this
|
||||
* {@link FlushPolicy} flushes by ram usage.
|
||||
*
|
||||
* <p>
|
||||
* The default for the max net memory is set to 2 x
|
||||
* {@link IndexWriterConfig#getRAMBufferSizeMB()}
|
||||
*
|
||||
*/
|
||||
public long getMaxNetBytes() {
|
||||
if (!flushOnRAM()) {
|
||||
// nocommit explain that returning -1 is allowed?
|
||||
return -1;
|
||||
}
|
||||
final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
|
||||
return (long) (ramBufferSizeMB * 1024.d * 1024.d * 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this {@link FlushPolicy} flushes on
|
||||
* {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
|
||||
* <code>false</code>.
|
||||
*/
|
||||
// nocommit who needs this? policy shouldn't have to impl
|
||||
// this? our default policy should?
|
||||
protected boolean flushOnDocCount() {
|
||||
return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this {@link FlushPolicy} flushes on
|
||||
* {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
|
||||
* <code>false</code>.
|
||||
*/
|
||||
// nocommit who needs this? policy shouldn't have to impl
|
||||
// this? our default policy should?
|
||||
protected boolean flushOnDeleteTerms() {
|
||||
return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if this {@link FlushPolicy} flushes on
|
||||
* {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
|
||||
* <code>false</code>.
|
||||
*/
|
||||
// nocommit who needs this? policy shouldn't have to impl
|
||||
// this? our default policy should?
|
||||
protected boolean flushOnRAM() {
|
||||
return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1239,9 +1239,7 @@ public class IndexWriter implements Closeable {
|
|||
public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
try {
|
||||
if (docWriter.deleteTerm(term)) {
|
||||
flush(true, false);
|
||||
}
|
||||
docWriter.deleteTerms(term);
|
||||
} catch (OutOfMemoryError oom) {
|
||||
handleOOM(oom, "deleteDocuments(Term)");
|
||||
}
|
||||
|
@ -1263,9 +1261,7 @@ public class IndexWriter implements Closeable {
|
|||
public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
try {
|
||||
if (docWriter.deleteTerms(terms)) {
|
||||
flush(true, false);
|
||||
}
|
||||
docWriter.deleteTerms(terms);
|
||||
} catch (OutOfMemoryError oom) {
|
||||
handleOOM(oom, "deleteDocuments(Term..)");
|
||||
}
|
||||
|
@ -1285,9 +1281,7 @@ public class IndexWriter implements Closeable {
|
|||
public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
try {
|
||||
if (docWriter.deleteQuery(query)) {
|
||||
flush(true, false);
|
||||
}
|
||||
docWriter.deleteQueries(query);
|
||||
} catch (OutOfMemoryError oom) {
|
||||
handleOOM(oom, "deleteDocuments(Query)");
|
||||
}
|
||||
|
@ -1309,9 +1303,7 @@ public class IndexWriter implements Closeable {
|
|||
public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException {
|
||||
ensureOpen();
|
||||
try {
|
||||
if (docWriter.deleteQueries(queries)) {
|
||||
flush(true, false);
|
||||
}
|
||||
docWriter.deleteQueries(queries);
|
||||
} catch (OutOfMemoryError oom) {
|
||||
handleOOM(oom, "deleteDocuments(Query..)");
|
||||
}
|
||||
|
@ -2646,22 +2638,6 @@ public class IndexWriter implements Closeable {
|
|||
}
|
||||
|
||||
final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
|
||||
if (!applyAllDeletes) {
|
||||
// nocommit -- shouldn't this move into the default
|
||||
// flush policy?
|
||||
// If deletes alone are consuming > 1/2 our RAM
|
||||
// buffer, force them all to apply now. This is to
|
||||
// prevent too-frequent flushing of a long tail of
|
||||
// tiny segments:
|
||||
if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
|
||||
bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
|
||||
applyAllDeletes = true;
|
||||
if (infoStream != null) {
|
||||
message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (applyAllDeletes) {
|
||||
if (infoStream != null) {
|
||||
message("apply all deletes during flush");
|
||||
|
|
|
@ -20,8 +20,16 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.lucene.document.Document;
|
||||
|
||||
// nocommit jdoc
|
||||
// nocommit -- can/should apps set this via IWC
|
||||
/**
|
||||
* A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
|
||||
* indexing thread to the same {@link ThreadState} each time the thread tries to
|
||||
* obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
|
||||
* associated with the creating thread. Subsequently, if the threads associated
|
||||
* {@link ThreadState} is not in use it will be associated with the requesting
|
||||
* thread. Otherwise, if the {@link ThreadState} is used by another thread
|
||||
* {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
|
||||
* minimal contended {@link ThreadState}.
|
||||
*/
|
||||
public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
|
||||
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
|
||||
|
||||
|
@ -40,16 +48,17 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
|
|||
}
|
||||
ThreadState minThreadState = null;
|
||||
|
||||
|
||||
/* TODO -- another thread could lock the minThreadState we just got while
|
||||
we should somehow prevent this. */
|
||||
// Find the state that has minimum number of threads waiting
|
||||
// noocommit -- can't another thread lock the
|
||||
// minThreadState we just got?
|
||||
minThreadState = minContendedThreadState();
|
||||
|
||||
if (minThreadState == null || minThreadState.hasQueuedThreads()) {
|
||||
ThreadState newState = newThreadState();
|
||||
final ThreadState newState = newThreadState(true);
|
||||
if (newState != null) {
|
||||
minThreadState = newState;
|
||||
assert newState.isHeldByCurrentThread();
|
||||
threadBindings.put(requestingThread, newState);
|
||||
return newState;
|
||||
} else if (minThreadState == null) {
|
||||
/*
|
||||
* no new threadState available we just take the minContented one
|
||||
|
|
|
@ -167,28 +167,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testFlushPolicySetup() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
FlushByRamOrCountsPolicy flushPolicy = new FlushByRamOrCountsPolicy();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
|
||||
new MockAnalyzer(random)).setFlushPolicy(flushPolicy);
|
||||
|
||||
final int numDWPT = 1 + random.nextInt(10);
|
||||
DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
|
||||
numDWPT);
|
||||
iwc.setIndexerThreadPool(threadPool);
|
||||
double maxMB = 1.0 + Math.ceil(random.nextDouble());
|
||||
iwc.setRAMBufferSizeMB(maxMB);
|
||||
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
|
||||
|
||||
IndexWriter writer = new IndexWriter(dir, iwc);
|
||||
assertEquals((long) (maxMB * 1024. * 1024. * 2.),
|
||||
flushPolicy.getMaxNetBytes());
|
||||
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testRandom() throws IOException, InterruptedException {
|
||||
final int numThreads = 1 + random.nextInt(8);
|
||||
final int numDocumentsToIndex = 100 + random.nextInt(300);
|
||||
|
|
Loading…
Reference in New Issue