LUCENE-9304: Refactor DWPTPool to pool DWPT directly (#1397)

This change removes the ThreadState indirection from DWPTPool and pools DWPT directly. The tracking information and locking semantics are mostly moved to DWPT directly and the pool semantics have changed slightly such that DWPT need to be checked-out in the pool once they need to be flushed or aborted. This automatically grows and shrinks the number of DWPT in the system when number of threads grow or shrink. Access of pooled DWPTs is more straight forward and doesn't require ordinal. Instead consumers can just iterate over the elements in the pool.
This allowed for removal of indirections in DWPTFlushControl like BlockedFlush, the removal of DWPTPool setter and getter in IndexWriterConfig and the addition of stronger assertions in DWPT and DW.
This commit is contained in:
Simon Willnauer 2020-04-11 12:23:46 +02:00 committed by GitHub
parent d52c1021e5
commit 2602269f3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 606 additions and 692 deletions

View File

@ -120,6 +120,10 @@ API Changes
* LUCENE-9265: SimpleFSDirectory is deprecated in favor of NIOFSDirectory. (Yannick Welsch) * LUCENE-9265: SimpleFSDirectory is deprecated in favor of NIOFSDirectory. (Yannick Welsch)
* LUCENE-9304: Removed ability to set DocumentsWriterPerThreadPool on IndexWriterConfig.
The DocumentsWriterPerThreadPool is a packaged protected final class which made it impossible
to customize. (Simon Willnauer)
New Features New Features
--------------------- ---------------------
(No changes) (No changes)
@ -134,6 +138,9 @@ Improvements
* LUCENE-8050: PerFieldDocValuesFormat should not get the DocValuesFormat on a field that has no doc values. * LUCENE-8050: PerFieldDocValuesFormat should not get the DocValuesFormat on a field that has no doc values.
(David Smiley, Juan Rodriguez) (David Smiley, Juan Rodriguez)
* LUCENE-9304: Removed ThreadState abstraction from DocumentsWriter which allows pooling of DWPT directly and
improves the approachability of the IndexWriter code. (Simon Willnauer)
Optimizations Optimizations
--------------------- ---------------------

View File

@ -32,7 +32,6 @@ import java.util.function.ToLongFunction;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -59,21 +58,25 @@ import org.apache.lucene.util.InfoStream;
* Threads: * Threads:
* *
* Multiple threads are allowed into addDocument at once. * Multiple threads are allowed into addDocument at once.
* There is an initial synchronized call to getThreadState * There is an initial synchronized call to
* which allocates a ThreadState for this thread. The same * {@link DocumentsWriterFlushControl#obtainAndLock()}
* thread will get the same ThreadState over time (thread * which allocates a DWPT for this indexing thread. The same
* affinity) so that if there are consistent patterns (for * thread will not necessarily get the same DWPT over time.
* example each thread is indexing a different content * Then updateDocuments is called on that DWPT without
* source) then we make better use of RAM. Then
* processDocument is called on that ThreadState without
* synchronization (most of the "heavy lifting" is in this * synchronization (most of the "heavy lifting" is in this
* call). Finally the synchronized "finishDocument" is * call). Once a DWPT fills up enough RAM or hold enough
* called to flush changes to the directory. * documents in memory the DWPT is checked out for flush
* and all changes are written to the directory. Each DWPT
* corresponds to one segment being written.
* *
* When flush is called by IndexWriter we forcefully idle * When flush is called by IndexWriter we check out all DWPTs
* all threads and flush only once they are all idle. This * that are associated with the current {@link DocumentsWriterDeleteQueue}
* means you can call flush with a given thread even while * out of the {@link DocumentsWriterPerThreadPool} and write
* other threads are actively adding/deleting documents. * them to disk. The flush process can piggy-back on incoming
* indexing threads or even block them from adding documents
* if flushing can't keep up with new documents being added.
* Unless the stall control kicks in to block indexing threads
* flushes are happening concurrently to actual index requests.
* *
* *
* Exceptions: * Exceptions:
@ -99,13 +102,8 @@ import org.apache.lucene.util.InfoStream;
*/ */
final class DocumentsWriter implements Closeable, Accountable { final class DocumentsWriter implements Closeable, Accountable {
private final Directory directoryOrig; // no wrapping, for infos
private final Directory directory;
private final FieldInfos.FieldNumbers globalFieldNumberMap;
private final int indexCreatedVersionMajor;
private final AtomicLong pendingNumDocs; private final AtomicLong pendingNumDocs;
private final boolean enableTestPoints;
private final Supplier<String> segmentNameSupplier;
private final FlushNotifications flushNotifications; private final FlushNotifications flushNotifications;
private volatile boolean closed; private volatile boolean closed;
@ -130,24 +128,23 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThreadPool perThreadPool; final DocumentsWriterPerThreadPool perThreadPool;
final FlushPolicy flushPolicy; final FlushPolicy flushPolicy;
final DocumentsWriterFlushControl flushControl; final DocumentsWriterFlushControl flushControl;
private long lastSeqNo;
DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints, DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
Supplier<String> segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory, Supplier<String> segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory,
FieldInfos.FieldNumbers globalFieldNumberMap) { FieldInfos.FieldNumbers globalFieldNumberMap) {
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
this.directoryOrig = directoryOrig;
this.directory = directory;
this.config = config; this.config = config;
this.infoStream = config.getInfoStream(); this.infoStream = config.getInfoStream();
this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream); this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool = config.getIndexerThreadPool(); this.perThreadPool = new DocumentsWriterPerThreadPool(() -> {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
return new DocumentsWriterPerThread(indexCreatedVersionMajor,
segmentNameSupplier.get(), directoryOrig,
directory, config, infoStream, deleteQueue, infos,
pendingNumDocs, enableTestPoints);
});
flushPolicy = config.getFlushPolicy(); flushPolicy = config.getFlushPolicy();
this.globalFieldNumberMap = globalFieldNumberMap;
this.pendingNumDocs = pendingNumDocs; this.pendingNumDocs = pendingNumDocs;
flushControl = new DocumentsWriterFlushControl(this, config); flushControl = new DocumentsWriterFlushControl(this, config);
this.segmentNameSupplier = segmentNameSupplier;
this.enableTestPoints = enableTestPoints;
this.flushNotifications = flushNotifications; this.flushNotifications = flushNotifications;
} }
@ -155,9 +152,6 @@ final class DocumentsWriter implements Closeable, Accountable {
return applyDeleteOrUpdate(q -> q.addDelete(queries)); return applyDeleteOrUpdate(q -> q.addDelete(queries));
} }
void setLastSeqNo(long seqNo) {
lastSeqNo = seqNo;
}
long deleteTerms(final Term... terms) throws IOException { long deleteTerms(final Term... terms) throws IOException {
return applyDeleteOrUpdate(q -> q.addDelete(terms)); return applyDeleteOrUpdate(q -> q.addDelete(terms));
@ -173,7 +167,6 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = function.applyAsLong(deleteQueue); long seqNo = function.applyAsLong(deleteQueue);
flushControl.doOnDelete(); flushControl.doOnDelete();
lastSeqNo = Math.max(lastSeqNo, seqNo);
if (applyAllDeletes()) { if (applyAllDeletes()) {
seqNo = -seqNo; seqNo = -seqNo;
} }
@ -225,20 +218,23 @@ final class DocumentsWriter implements Closeable, Accountable {
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "abort"); infoStream.message("DW", "abort");
} }
final int limit = perThreadPool.getActiveThreadStateCount(); for (final DocumentsWriterPerThread perThread : perThreadPool.filterAndLock(x -> true)) {
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
try { try {
abortThreadState(perThread); abortDocumentsWriterPerThread(perThread);
} finally { } finally {
perThread.unlock(); perThread.unlock();
} }
} }
flushControl.abortPendingFlushes(); flushControl.abortPendingFlushes();
flushControl.waitForFlush(); flushControl.waitForFlush();
assert perThreadPool.size() == 0
: "There are still active DWPT in the pool: " + perThreadPool.size();
success = true; success = true;
} finally { } finally {
if (success) {
assert flushControl.getFlushingBytes() == 0 : "flushingBytes has unexpected value 0 != " + flushControl.getFlushingBytes();
assert flushControl.netBytes() == 0 : "netBytes has unexpected value 0 != " + flushControl.netBytes();
}
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "done abort success=" + success); infoStream.message("DW", "done abort success=" + success);
} }
@ -273,33 +269,34 @@ final class DocumentsWriter implements Closeable, Accountable {
pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc()); pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc());
} }
}); });
List<ThreadState> threadStates = new ArrayList<>(); List<DocumentsWriterPerThread> writers = new ArrayList<>();
AtomicBoolean released = new AtomicBoolean(false); AtomicBoolean released = new AtomicBoolean(false);
final Closeable release = () -> { final Closeable release = () -> {
// we return this closure to unlock all writers once done
// or if hit an exception below in the try block.
// we can't assign this later otherwise the ref can't be final
if (released.compareAndSet(false, true)) { // only once if (released.compareAndSet(false, true)) { // only once
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAllAbortedThread"); infoStream.message("DW", "unlockAllAbortedThread");
} }
perThreadPool.unlockNewThreadStates(); perThreadPool.unlockNewWriters();
for (ThreadState state : threadStates) { for (DocumentsWriterPerThread writer : writers) {
state.unlock(); writer.unlock();
} }
} }
}; };
try { try {
deleteQueue.clear(); deleteQueue.clear();
perThreadPool.lockNewThreadStates(); perThreadPool.lockNewWriters();
final int limit = perThreadPool.getMaxThreadStates(); writers.addAll(perThreadPool.filterAndLock(x -> true));
for (int i = 0; i < limit; i++) { for (final DocumentsWriterPerThread perThread : writers) {
final ThreadState perThread = perThreadPool.getThreadState(i); assert perThread.isHeldByCurrentThread();
perThread.lock(); abortDocumentsWriterPerThread(perThread);
threadStates.add(perThread);
abortThreadState(perThread);
} }
deleteQueue.clear(); deleteQueue.clear();
// jump over any possible in flight ops: // jump over any possible in flight ops:
deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount() + 1); deleteQueue.skipSequenceNumbers(perThreadPool.size() + 1);
flushControl.abortPendingFlushes(); flushControl.abortPendingFlushes();
flushControl.waitForFlush(); flushControl.waitForFlush();
@ -322,35 +319,22 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
/** Returns how many documents were aborted. */ /** Returns how many documents were aborted. */
private int abortThreadState(final ThreadState perThread) throws IOException { private void abortDocumentsWriterPerThread(final DocumentsWriterPerThread perThread) throws IOException {
assert perThread.isHeldByCurrentThread(); assert perThread.isHeldByCurrentThread();
if (perThread.isInitialized()) {
try { try {
int abortedDocCount = perThread.dwpt.getNumDocsInRAM(); subtractFlushedNumDocs(perThread.getNumDocsInRAM());
subtractFlushedNumDocs(abortedDocCount); perThread.abort();
perThread.dwpt.abort();
return abortedDocCount;
} finally { } finally {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
} }
} else {
flushControl.doOnAbort(perThread);
// This DWPT was never initialized so it has no indexed documents:
return 0;
}
} }
/** returns the maximum sequence number for all previously completed operations */ /** returns the maximum sequence number for all previously completed operations */
public long getMaxCompletedSequenceNumber() { long getMaxCompletedSequenceNumber() {
long value = lastSeqNo; return deleteQueue.getLastSequenceNumber();
int limit = perThreadPool.getMaxThreadStates();
for(int i = 0; i < limit; i++) {
ThreadState perThread = perThreadPool.getThreadState(i);
value = Math.max(value, perThread.lastSeqNo);
}
return value;
} }
boolean anyChanges() { boolean anyChanges() {
/* /*
* changes are either in a DWPT or in the deleteQueue. * changes are either in a DWPT or in the deleteQueue.
@ -369,23 +353,23 @@ final class DocumentsWriter implements Closeable, Accountable {
return anyChanges; return anyChanges;
} }
public int getBufferedDeleteTermsSize() { int getBufferedDeleteTermsSize() {
return deleteQueue.getBufferedUpdatesTermsSize(); return deleteQueue.getBufferedUpdatesTermsSize();
} }
//for testing //for testing
public int getNumBufferedDeleteTerms() { int getNumBufferedDeleteTerms() {
return deleteQueue.numGlobalTermDeletes(); return deleteQueue.numGlobalTermDeletes();
} }
public boolean anyDeletions() { boolean anyDeletions() {
return deleteQueue.anyChanges(); return deleteQueue.anyChanges();
} }
@Override @Override
public void close() { public void close() throws IOException {
closed = true; closed = true;
flushControl.setClosed(); IOUtils.close(flushControl, perThreadPool);
} }
private boolean preUpdate() throws IOException { private boolean preUpdate() throws IOException {
@ -422,36 +406,24 @@ final class DocumentsWriter implements Closeable, Accountable {
return hasEvents; return hasEvents;
} }
private void ensureInitialized(ThreadState state) throws IOException {
if (state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
directory, config, infoStream, deleteQueue, infos,
pendingNumDocs, enableTestPoints);
}
}
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer, long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException { final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
long seqNo; long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the DWPT because IW.close
// waits for all ThreadStates to be released: // waits for all DWPT to be released:
ensureOpen(); ensureOpen();
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications); seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications);
} finally { } finally {
if (dwpt.isAborted()) { if (dwpt.isAborted()) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(dwpt);
} }
// We don't know how many documents were actually // We don't know how many documents were actually
// counted as indexed, so we must subtract here to // counted as indexed, so we must subtract here to
@ -459,13 +431,14 @@ final class DocumentsWriter implements Closeable, Accountable {
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
} }
final boolean isUpdate = delNode != null && delNode.isDelete(); final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally { } finally {
perThreadPool.release(perThread); if (dwpt.isFlushPending() || dwpt.isAborted()) {
dwpt.unlock();
} else {
perThreadPool.marksAsFreeAndUnlock(dwpt);
}
assert dwpt.isHeldByCurrentThread() == false : "we didn't release the dwpt even on abort";
} }
if (postUpdate(flushingDWPT, hasEvents)) { if (postUpdate(flushingDWPT, hasEvents)) {
@ -477,6 +450,7 @@ final class DocumentsWriter implements Closeable, Accountable {
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false; boolean hasEvents = false;
while (flushingDWPT != null) { while (flushingDWPT != null) {
assert flushingDWPT.hasFlushed() == false;
hasEvents = true; hasEvents = true;
boolean success = false; boolean success = false;
DocumentsWriterFlushQueue.FlushTicket ticket = null; DocumentsWriterFlushQueue.FlushTicket ticket = null;
@ -536,7 +510,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* Now we are done and try to flush the ticket queue if the head of the * Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush. * queue has already finished the flush.
*/ */
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) { if (ticketQueue.getTicketCount() >= perThreadPool.size()) {
// This means there is a backlog: the one // This means there is a backlog: the one
// thread in innerPurge can't keep up with all // thread in innerPurge can't keep up with all
// other threads flushing segments. In this case // other threads flushing segments. In this case
@ -727,7 +701,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* *
* This is a subset of the value returned by {@link #ramBytesUsed()} * This is a subset of the value returned by {@link #ramBytesUsed()}
*/ */
public long getFlushingBytes() { long getFlushingBytes() {
return flushControl.getFlushingBytes(); return flushControl.getFlushingBytes();
} }
} }

View File

@ -17,8 +17,9 @@
package org.apache.lucene.index; package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -26,7 +27,7 @@ import java.util.Locale;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -43,7 +44,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
* space exhaustion. * space exhaustion.
*/ */
final class DocumentsWriterFlushControl implements Accountable { final class DocumentsWriterFlushControl implements Accountable, Closeable {
private final long hardMaxBytesPerDWPT; private final long hardMaxBytesPerDWPT;
private long activeBytes = 0; private long activeBytes = 0;
@ -52,11 +53,18 @@ final class DocumentsWriterFlushControl implements Accountable {
private int numDocsSinceStalled = 0; // only with assert private int numDocsSinceStalled = 0; // only with assert
final AtomicBoolean flushDeletes = new AtomicBoolean(false); final AtomicBoolean flushDeletes = new AtomicBoolean(false);
private boolean fullFlush = false; private boolean fullFlush = false;
private boolean fullFlushMarkDone = false; // only for assertion that we don't get stale DWPTs from the pool
// The flushQueue is used to concurrently distribute DWPTs that are ready to be flushed ie. when a full flush is in
// progress. This might be triggered by a commit or NRT refresh. The trigger will only walk all eligible DWPTs and
// mark them as flushable putting them in the flushQueue ready for other threads (ie. indexing threads) to help flushing
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<>(); private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<>();
// only for safety reasons if a DWPT is close to the RAM limit // only for safety reasons if a DWPT is close to the RAM limit
private final Queue<BlockedFlush> blockedFlushes = new LinkedList<>(); private final Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<>();
private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<>(); // flushingWriters holds all currently flushing writers. There might be writers in this list that
// are also in the flushQueue which means that writers in the flushingWriters list are not necessarily
// already actively flushing. They are only in the state of flushing and might be picked up in the future by
// polling the flushQueue
private final List<DocumentsWriterPerThread> flushingWriters = new ArrayList<>();
double maxConfiguredRamBuffer = 0; double maxConfiguredRamBuffer = 0;
long peakActiveBytes = 0;// only with assert long peakActiveBytes = 0;// only with assert
@ -86,11 +94,11 @@ final class DocumentsWriterFlushControl implements Accountable {
return activeBytes; return activeBytes;
} }
public long getFlushingBytes() { long getFlushingBytes() {
return flushBytes; return flushBytes;
} }
public synchronized long netBytes() { synchronized long netBytes() {
return flushBytes + activeBytes; return flushBytes + activeBytes;
} }
@ -139,15 +147,14 @@ final class DocumentsWriterFlushControl implements Accountable {
return true; return true;
} }
private void commitPerThreadBytes(ThreadState perThread) { private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) {
final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed; final long delta = perThread.commitLastBytesUsed();
perThread.bytesUsed += delta;
/* /*
* We need to differentiate here if we are pending since setFlushPending * We need to differentiate here if we are pending since setFlushPending
* moves the perThread memory to the flushBytes and we could be set to * moves the perThread memory to the flushBytes and we could be set to
* pending during a delete * pending during a delete
*/ */
if (perThread.flushPending) { if (perThread.isFlushPending()) {
flushBytes += delta; flushBytes += delta;
} else { } else {
activeBytes += delta; activeBytes += delta;
@ -165,16 +172,16 @@ final class DocumentsWriterFlushControl implements Accountable {
return true; return true;
} }
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) { synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) {
try { try {
commitPerThreadBytes(perThread); commitPerThreadBytes(perThread);
if (!perThread.flushPending) { if (!perThread.isFlushPending()) {
if (isUpdate) { if (isUpdate) {
flushPolicy.onUpdate(this, perThread); flushPolicy.onUpdate(this, perThread);
} else { } else {
flushPolicy.onInsert(this, perThread); flushPolicy.onInsert(this, perThread);
} }
if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) { if (!perThread.isFlushPending() && perThread.bytesUsed() > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This // Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT // is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread); setFlushPending(perThread);
@ -187,22 +194,25 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
} }
private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) { private DocumentsWriterPerThread checkout(DocumentsWriterPerThread perThread, boolean markPending) {
assert Thread.holdsLock(this);
if (fullFlush) { if (fullFlush) {
if (perThread.flushPending) { if (perThread.isFlushPending()) {
checkoutAndBlock(perThread); checkoutAndBlock(perThread);
return nextPendingFlush(); return nextPendingFlush();
} else {
return null;
} }
} else { } else {
if (markPending) { if (markPending) {
assert perThread.isFlushPending() == false; assert perThread.isFlushPending() == false;
setFlushPending(perThread); setFlushPending(perThread);
} }
return tryCheckoutForFlush(perThread);
if (perThread.isFlushPending()) {
return checkOutForFlush(perThread);
} }
} }
return null;
}
private boolean assertNumDocsSinceStalled(boolean stalled) { private boolean assertNumDocsSinceStalled(boolean stalled) {
/* /*
@ -221,11 +231,10 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
assert flushingWriters.containsKey(dwpt); assert flushingWriters.contains(dwpt);
try { try {
Long bytes = flushingWriters.remove(dwpt); flushingWriters.remove(dwpt);
flushBytes -= bytes.longValue(); flushBytes -= dwpt.getLastCommittedBytesUsed();
perThreadPool.recycle(dwpt);
assert assertMemory(); assert assertMemory();
} finally { } finally {
try { try {
@ -281,15 +290,15 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
/** /**
* Sets flush pending state on the given {@link ThreadState}. The * Sets flush pending state on the given {@link DocumentsWriterPerThread}. The
* {@link ThreadState} must have indexed at least on Document and must not be * {@link DocumentsWriterPerThread} must have indexed at least on Document and must not be
* already pending. * already pending.
*/ */
public synchronized void setFlushPending(ThreadState perThread) { public synchronized void setFlushPending(DocumentsWriterPerThread perThread) {
assert !perThread.flushPending; assert !perThread.isFlushPending();
if (perThread.dwpt.getNumDocsInRAM() > 0) { if (perThread.getNumDocsInRAM() > 0) {
perThread.flushPending = true; // write access synced perThread.setFlushPending(); // write access synced
final long bytes = perThread.bytesUsed; final long bytes = perThread.getLastCommittedBytesUsed();
flushBytes += bytes; flushBytes += bytes;
activeBytes -= bytes; activeBytes -= bytes;
numPending++; // write access synced numPending++; // write access synced
@ -298,70 +307,57 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
synchronized void doOnAbort(ThreadState state) { synchronized void doOnAbort(DocumentsWriterPerThread perThread) {
try { try {
if (state.flushPending) { assert perThreadPool.isRegistered(perThread);
flushBytes -= state.bytesUsed; assert perThread.isHeldByCurrentThread();
if (perThread.isFlushPending()) {
flushBytes -= perThread.getLastCommittedBytesUsed();
} else { } else {
activeBytes -= state.bytesUsed; activeBytes -= perThread.getLastCommittedBytesUsed();
} }
assert assertMemory(); assert assertMemory();
// Take it out of the loop this DWPT is stale // Take it out of the loop this DWPT is stale
perThreadPool.reset(state);
} finally { } finally {
updateStallState(); updateStallState();
boolean checkedOut = perThreadPool.checkout(perThread);
assert checkedOut;
} }
} }
synchronized DocumentsWriterPerThread tryCheckoutForFlush( private void checkoutAndBlock(DocumentsWriterPerThread perThread) {
ThreadState perThread) { assert perThreadPool.isRegistered(perThread);
return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
}
private void checkoutAndBlock(ThreadState perThread) {
perThread.lock();
try {
assert perThread.flushPending : "can not block non-pending threadstate";
assert fullFlush : "can not block if fullFlush == false";
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed;
dwpt = perThreadPool.reset(perThread);
numPending--;
blockedFlushes.add(new BlockedFlush(dwpt, bytes));
} finally {
perThread.unlock();
}
}
private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
assert Thread.holdsLock(this);
assert perThread.flushPending;
try {
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
if (perThread.isInitialized()) {
assert perThread.isHeldByCurrentThread(); assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt; assert perThread.isFlushPending() : "can not block non-pending threadstate";
final long bytes = perThread.bytesUsed; // do that before assert fullFlush : "can not block if fullFlush == false";
// replace! numPending--;
dwpt = perThreadPool.reset(perThread); blockedFlushes.add(perThread);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing"; boolean checkedOut = perThreadPool.checkout(perThread);
// Record the flushing DWPT to reduce flushBytes in doAfterFlush assert checkedOut;
flushingWriters.put(dwpt, Long.valueOf(bytes)); }
private synchronized DocumentsWriterPerThread checkOutForFlush(DocumentsWriterPerThread perThread) {
assert Thread.holdsLock(this);
assert perThread.isFlushPending();
assert perThread.isHeldByCurrentThread();
assert perThreadPool.isRegistered(perThread);
try {
addFlushingDWPT(perThread);
numPending--; // write access synced numPending--; // write access synced
return dwpt; boolean checkedOut = perThreadPool.checkout(perThread);
} assert checkedOut;
} finally { return perThread;
perThread.unlock();
}
}
return null;
} finally { } finally {
updateStallState(); updateStallState();
} }
} }
private void addFlushingDWPT(DocumentsWriterPerThread perThread) {
assert flushingWriters.contains(perThread) == false : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.add(perThread);
}
@Override @Override
public String toString() { public String toString() {
return "DocumentsWriterFlushControl [activeBytes=" + activeBytes return "DocumentsWriterFlushControl [activeBytes=" + activeBytes
@ -380,14 +376,17 @@ final class DocumentsWriterFlushControl implements Accountable {
fullFlush = this.fullFlush; fullFlush = this.fullFlush;
numPending = this.numPending; numPending = this.numPending;
} }
if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush if (numPending > 0 && fullFlush == false) { // don't check if we are doing a full flush
final int limit = perThreadPool.getActiveThreadStateCount(); for (final DocumentsWriterPerThread next : perThreadPool) {
for (int i = 0; i < limit && numPending > 0; i++) { if (next.isFlushPending()) {
final ThreadState next = perThreadPool.getThreadState(i); if (next.tryLock()) {
if (next.flushPending) { try {
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next); if (perThreadPool.isRegistered(next)) {
if (dwpt != null) { return checkOutForFlush(next);
return dwpt; }
} finally {
next.unlock();
}
} }
} }
} }
@ -395,37 +394,17 @@ final class DocumentsWriterFlushControl implements Accountable {
return null; return null;
} }
synchronized void setClosed() { @Override
// set by DW to signal that we should not release new DWPT after close public synchronized void close() {
this.closed = true; // set by DW to signal that we are closing. in this case we try to not stall any threads anymore etc.
closed = true;
} }
/** /**
* Returns an iterator that provides access to all currently active {@link ThreadState}s * Returns an iterator that provides access to all currently active {@link DocumentsWriterPerThread}s
*/ */
public Iterator<ThreadState> allActiveThreadStates() { public Iterator<DocumentsWriterPerThread> allActiveWriters() {
return getPerThreadsIterator(perThreadPool.getActiveThreadStateCount()); return perThreadPool.iterator();
}
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
return new Iterator<ThreadState>() {
int i = 0;
@Override
public boolean hasNext() {
return i < upto;
}
@Override
public ThreadState next() {
return perThreadPool.getThreadState(i++);
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove() not supported.");
}
};
} }
synchronized void doOnDelete() { synchronized void doOnDelete() {
@ -458,71 +437,83 @@ final class DocumentsWriterFlushControl implements Accountable {
flushDeletes.set(true); flushDeletes.set(true);
} }
ThreadState obtainAndLock() { DocumentsWriterPerThread obtainAndLock() throws IOException {
final ThreadState perThread = perThreadPool.getAndLock(); while (closed == false) {
boolean success = false; final DocumentsWriterPerThread perThread = perThreadPool.getAndLock();
try { if (perThread.deleteQueue == documentsWriter.deleteQueue) {
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // simply return the DWPT even in a flush all case since we already hold the lock and the DWPT is not stale
// There is a flush-all in process and this DWPT is // since it has the current delete queue associated with it. This means we have established a happens-before
// now stale -- enroll it for flush and try for // relationship and all docs indexed into this DWPT are guaranteed to not be flushed with the currently
// another DWPT: // progress full flush.
addFlushableState(perThread);
}
success = true;
// simply return the ThreadState even in a flush all case sine we already hold the lock
return perThread; return perThread;
} else {
try {
// we must first assert otherwise the full flush might make progress once we unlock the dwpt
assert fullFlush && fullFlushMarkDone == false :
"found a stale DWPT but full flush mark phase is already done fullFlush: "
+ fullFlush + " markDone: " + fullFlushMarkDone;
} finally { } finally {
if (!success) { // make sure we unlock if this fails perThread.unlock();
perThreadPool.release(perThread); // There is a flush-all in process and this DWPT is
// now stale - try another one
} }
} }
} }
throw new AlreadyClosedException("flush control is closed");
}
long markForFullFlush() { long markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue; final DocumentsWriterDeleteQueue flushingQueue;
long seqNo; long seqNo;
synchronized (this) { synchronized (this) {
assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running"; assert fullFlush == false: "called DWFC#markForFullFlush() while full flush is still running";
assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer; assert fullFlushMarkDone == false : "full flush collection marker is still set to true";
fullFlush = true; fullFlush = true;
flushingQueue = documentsWriter.deleteQueue; flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until // Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush // we do another full flush
perThreadPool.lockNewWriters(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
perThreadPool.lockNewThreadStates(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
try { try {
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned: // if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2; seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.size() + 2;
flushingQueue.maxSeqNo = seqNo + 1; flushingQueue.maxSeqNo = seqNo + 1;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1); DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
documentsWriter.deleteQueue = newQueue; documentsWriter.deleteQueue = newQueue;
} finally { } finally {
perThreadPool.unlockNewThreadStates(); perThreadPool.unlockNewWriters();
} }
} }
final int limit = perThreadPool.getActiveThreadStateCount(); final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<>();
for (int i = 0; i < limit; i++) { for (final DocumentsWriterPerThread next : perThreadPool.filterAndLock(dwpt -> dwpt.deleteQueue == flushingQueue)) {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try { try {
if (!next.isInitialized()) { assert next.deleteQueue == flushingQueue
continue; || next.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
}
assert next.dwpt.deleteQueue == flushingQueue
|| next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
+ flushingQueue + flushingQueue
+ " currentqueue: " + " currentqueue: "
+ documentsWriter.deleteQueue + documentsWriter.deleteQueue
+ " perThread queue: " + " perThread queue: "
+ next.dwpt.deleteQueue + next.deleteQueue
+ " numDocsInRam: " + next.dwpt.getNumDocsInRAM(); + " numDocsInRam: " + next.getNumDocsInRAM();
if (next.dwpt.deleteQueue != flushingQueue) {
// this one is already a new DWPT if (next.getNumDocsInRAM() > 0) {
continue; final DocumentsWriterPerThread flushingDWPT;
synchronized(this) {
if (next.isFlushPending() == false) {
setFlushPending(next);
}
flushingDWPT = checkOutForFlush(next);
}
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert next == flushingDWPT : "flushControl returned different DWPT";
fullFlushBuffer.add(flushingDWPT);
} else {
// it's possible that we get a DWPT with 0 docs if we flush concurrently to
// threads getting DWPTs from the pool. In this case we simply remove it from
// the pool and drop it on the floor.
boolean checkout = perThreadPool.checkout(next);
assert checkout;
} }
addFlushableState(next);
} finally { } finally {
next.unlock(); next.unlock();
} }
@ -535,67 +526,33 @@ final class DocumentsWriterFlushControl implements Accountable {
pruneBlockedQueue(flushingQueue); pruneBlockedQueue(flushingQueue);
assert assertBlockedFlushes(documentsWriter.deleteQueue); assert assertBlockedFlushes(documentsWriter.deleteQueue);
flushQueue.addAll(fullFlushBuffer); flushQueue.addAll(fullFlushBuffer);
fullFlushBuffer.clear();
updateStallState(); updateStallState();
fullFlushMarkDone = true; // at this point we must have collected all DWPTs that belong to the old delete queue
} }
assert assertActiveDeleteQueue(documentsWriter.deleteQueue); assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
return seqNo; return seqNo;
} }
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) { private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
final int limit = perThreadPool.getActiveThreadStateCount(); for (final DocumentsWriterPerThread next : perThreadPool) {
for (int i = 0; i < limit; i++) { assert next.deleteQueue == queue : "numDocs: " + next.getNumDocsInRAM();
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ;
} finally {
next.unlock();
}
} }
return true; return true;
} }
private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<>();
void addFlushableState(ThreadState perThread) {
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
}
final DocumentsWriterPerThread dwpt = perThread.dwpt;
assert perThread.isHeldByCurrentThread();
assert perThread.isInitialized();
assert fullFlush;
assert dwpt.deleteQueue != documentsWriter.deleteQueue;
if (dwpt.getNumDocsInRAM() > 0) {
synchronized(this) {
if (!perThread.flushPending) {
setFlushPending(perThread);
}
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT";
fullFlushBuffer.add(flushingDWPT);
}
} else {
perThreadPool.reset(perThread); // make this state inactive
}
}
/** /**
* Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue. * Prunes the blockedQueue by removing all DWPTs that are associated with the given flush queue.
*/ */
private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) { private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) {
Iterator<BlockedFlush> iterator = blockedFlushes.iterator(); assert Thread.holdsLock(this);
Iterator<DocumentsWriterPerThread> iterator = blockedFlushes.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
BlockedFlush blockedFlush = iterator.next(); DocumentsWriterPerThread blockedFlush = iterator.next();
if (blockedFlush.dwpt.deleteQueue == flushingQueue) { if (blockedFlush.deleteQueue == flushingQueue) {
iterator.remove(); iterator.remove();
assert !flushingWriters.containsKey(blockedFlush.dwpt) : "DWPT is already flushing"; addFlushingDWPT(blockedFlush);
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
// don't decr pending here - it's already done when DWPT is blocked // don't decr pending here - it's already done when DWPT is blocked
flushQueue.add(blockedFlush.dwpt); flushQueue.add(blockedFlush);
} }
} }
} }
@ -611,14 +568,15 @@ final class DocumentsWriterFlushControl implements Accountable {
assert blockedFlushes.isEmpty(); assert blockedFlushes.isEmpty();
} }
} finally { } finally {
fullFlush = false; fullFlushMarkDone = fullFlush = false;
updateStallState(); updateStallState();
} }
} }
boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) { boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) {
for (BlockedFlush blockedFlush : blockedFlushes) { for (DocumentsWriterPerThread blockedFlush : blockedFlushes) {
assert blockedFlush.dwpt.deleteQueue == flushingQueue; assert blockedFlush.deleteQueue == flushingQueue;
} }
return true; return true;
} }
@ -627,7 +585,7 @@ final class DocumentsWriterFlushControl implements Accountable {
try { try {
abortPendingFlushes(); abortPendingFlushes();
} finally { } finally {
fullFlush = false; fullFlushMarkDone = fullFlush = false;
} }
} }
@ -643,15 +601,15 @@ final class DocumentsWriterFlushControl implements Accountable {
doAfterFlush(dwpt); doAfterFlush(dwpt);
} }
} }
for (BlockedFlush blockedFlush : blockedFlushes) { for (DocumentsWriterPerThread blockedFlush : blockedFlushes) {
try { try {
flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes)); addFlushingDWPT(blockedFlush); // add the blockedFlushes for correct accounting in doAfterFlush
documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM()); documentsWriter.subtractFlushedNumDocs(blockedFlush.getNumDocsInRAM());
blockedFlush.dwpt.abort(); blockedFlush.abort();
} catch (Exception ex) { } catch (Exception ex) {
// that's fine we just abort everything here this is best effort // that's fine we just abort everything here this is best effort
} finally { } finally {
doAfterFlush(blockedFlush.dwpt); doAfterFlush(blockedFlush);
} }
} }
} finally { } finally {
@ -685,16 +643,6 @@ final class DocumentsWriterFlushControl implements Accountable {
return blockedFlushes.size(); return blockedFlushes.size();
} }
private static class BlockedFlush {
final DocumentsWriterPerThread dwpt;
final long bytes;
BlockedFlush(DocumentsWriterPerThread dwpt, long bytes) {
super();
this.dwpt = dwpt;
this.bytes = bytes;
}
}
/** /**
* This method will block if too many DWPT are currently flushing and no * This method will block if too many DWPT are currently flushing and no
* checked out DWPT are available * checked out DWPT are available
@ -717,53 +665,47 @@ final class DocumentsWriterFlushControl implements Accountable {
return infoStream; return infoStream;
} }
synchronized ThreadState findLargestNonPendingWriter() { synchronized DocumentsWriterPerThread findLargestNonPendingWriter() {
ThreadState maxRamUsingThreadState = null; DocumentsWriterPerThread maxRamUsingWriter = null;
long maxRamSoFar = 0; long maxRamSoFar = 0;
Iterator<ThreadState> activePerThreadsIterator = allActiveThreadStates();
int count = 0; int count = 0;
while (activePerThreadsIterator.hasNext()) { for (DocumentsWriterPerThread next : perThreadPool) {
ThreadState next = activePerThreadsIterator.next(); if (next.isFlushPending() == false && next.getNumDocsInRAM() > 0) {
if (!next.flushPending) { final long nextRam = next.bytesUsed();
final long nextRam = next.bytesUsed;
if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) {
if (infoStream.isEnabled("FP")) { if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM()); infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.getNumDocsInRAM());
} }
count++; count++;
if (nextRam > maxRamSoFar) { if (nextRam > maxRamSoFar) {
maxRamSoFar = nextRam; maxRamSoFar = nextRam;
maxRamUsingThreadState = next; maxRamUsingWriter = next;
}
} }
} }
} }
if (infoStream.isEnabled("FP")) { if (infoStream.isEnabled("FP")) {
infoStream.message("FP", count + " in-use non-flushing threads states"); infoStream.message("FP", count + " in-use non-flushing threads states");
} }
return maxRamUsingThreadState; return maxRamUsingWriter;
} }
/** /**
* Returns the largest non-pending flushable DWPT or <code>null</code> if there is none. * Returns the largest non-pending flushable DWPT or <code>null</code> if there is none.
*/ */
final DocumentsWriterPerThread checkoutLargestNonPendingWriter() { final DocumentsWriterPerThread checkoutLargestNonPendingWriter() {
ThreadState largestNonPendingWriter = findLargestNonPendingWriter(); DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter();
if (largestNonPendingWriter != null) { if (largestNonPendingWriter != null) {
// we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue // we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue
largestNonPendingWriter.lock(); largestNonPendingWriter.lock();
try { try {
if (perThreadPool.isRegistered(largestNonPendingWriter)) {
synchronized (this) { synchronized (this) {
try { try {
if (largestNonPendingWriter.isInitialized() == false) {
return nextPendingFlush();
} else {
return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false); return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false);
}
} finally { } finally {
updateStallState(); updateStallState();
} }
} }
}
} finally { } finally {
largestNonPendingWriter.unlock(); largestNonPendingWriter.unlock();
} }

View File

@ -24,6 +24,7 @@ import java.util.Locale;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.Codec;
@ -41,6 +42,7 @@ import org.apache.lucene.util.Counter;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.IntBlockPool; import org.apache.lucene.util.IntBlockPool;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
@ -156,6 +158,9 @@ final class DocumentsWriterPerThread {
final BufferedUpdates pendingUpdates; final BufferedUpdates pendingUpdates;
final SegmentInfo segmentInfo; // Current segment we are working on final SegmentInfo segmentInfo; // Current segment we are working on
private boolean aborted = false; // True if we aborted private boolean aborted = false; // True if we aborted
private SetOnce<Boolean> flushPending = new SetOnce<>();
private volatile long lastCommittedBytesUsed;
private SetOnce<Boolean> hasFlushed = new SetOnce<>();
private final FieldInfos.Builder fieldInfos; private final FieldInfos.Builder fieldInfos;
private final InfoStream infoStream; private final InfoStream infoStream;
@ -169,6 +174,7 @@ final class DocumentsWriterPerThread {
private final LiveIndexWriterConfig indexWriterConfig; private final LiveIndexWriterConfig indexWriterConfig;
private final boolean enableTestPoints; private final boolean enableTestPoints;
private final int indexVersionCreated; private final int indexVersionCreated;
private final ReentrantLock lock = new ReentrantLock();
public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue, public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException { FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
@ -350,6 +356,7 @@ final class DocumentsWriterPerThread {
/** Flush all pending docs to a new segment */ /** Flush all pending docs to a new segment */
FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException { FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert flushPending.get() == Boolean.TRUE;
assert numDocsInRAM > 0; assert numDocsInRAM > 0;
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush"; assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM); segmentInfo.setMaxDoc(numDocsInRAM);
@ -445,6 +452,7 @@ final class DocumentsWriterPerThread {
throw t; throw t;
} finally { } finally {
maybeAbort("flush", flushNotifications); maybeAbort("flush", flushNotifications);
hasFlushed.set(Boolean.TRUE);
} }
} }
@ -601,4 +609,80 @@ final class DocumentsWriterPerThread {
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]"; + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
} }
/**
* Returns true iff this DWPT is marked as flush pending
*/
boolean isFlushPending() {
return flushPending.get() == Boolean.TRUE;
}
/**
* Sets this DWPT as flush pending. This can only be set once.
*/
void setFlushPending() {
flushPending.set(Boolean.TRUE);
}
/**
* Returns the last committed bytes for this DWPT. This method can be called
* without acquiring the DWPTs lock.
*/
long getLastCommittedBytesUsed() {
return lastCommittedBytesUsed;
}
/**
* Commits the current {@link #bytesUsed()} and stores it's value for later reuse.
* The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()}
* @return the delta between the current {@link #bytesUsed()} and the current {@link #getLastCommittedBytesUsed()}
*/
long commitLastBytesUsed() {
assert isHeldByCurrentThread();
long delta = bytesUsed() - lastCommittedBytesUsed;
lastCommittedBytesUsed += delta;
return delta;
}
/**
* Locks this DWPT for exclusive access.
* @see ReentrantLock#lock()
*/
void lock() {
lock.lock();
}
/**
* Acquires the DWPT's lock only if it is not held by another thread at the time
* of invocation.
* @return true if the lock was acquired.
* @see ReentrantLock#tryLock()
*/
boolean tryLock() {
return lock.tryLock();
}
/**
* Returns true if the DWPT's lock is held by the current thread
* @see ReentrantLock#isHeldByCurrentThread()
*/
boolean isHeldByCurrentThread() {
return lock.isHeldByCurrentThread();
}
/**
* Unlocks the DWPT's lock
* @see ReentrantLock#unlock()
*/
void unlock() {
lock.unlock();
}
/**
* Returns <code>true</code> iff this DWPT has been flushed
*/
boolean hasFlushed() {
return hasFlushed.get() == Boolean.TRUE;
}
} }

View File

@ -16,228 +16,176 @@
*/ */
package org.apache.lucene.index; package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.ReentrantLock; import java.util.Set;
import java.util.function.Predicate;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** /**
* {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances * {@link DocumentsWriterPerThreadPool} controls {@link DocumentsWriterPerThread} instances
* and their thread assignments during indexing. Each {@link ThreadState} holds * and their thread assignments during indexing. Each {@link DocumentsWriterPerThread} is once a
* a reference to a {@link DocumentsWriterPerThread} that is once a * obtained from the pool exclusively used for indexing a
* {@link ThreadState} is obtained from the pool exclusively used for indexing a * single document or list of documents by the obtaining thread. Each indexing thread must obtain
* single document by the obtaining thread. Each indexing thread must obtain * such a {@link DocumentsWriterPerThread} to make progress. Depending on the
* such a {@link ThreadState} to make progress. Depending on the * {@link DocumentsWriterPerThreadPool} implementation {@link DocumentsWriterPerThread}
* {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
* assignments might differ from document to document. * assignments might differ from document to document.
* <p> * <p>
* Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool * Once a {@link DocumentsWriterPerThread} is selected for flush the {@link DocumentsWriterPerThread} will
* is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a * be checked out of the thread pool and won't be reused for indexing. See {@link #checkout(DocumentsWriterPerThread)}.
* new {@link DocumentsWriterPerThread} instance.
* </p> * </p>
*/ */
final class DocumentsWriterPerThreadPool { final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerThread>, Closeable {
/** private final Set<DocumentsWriterPerThread> dwpts = Collections.newSetFromMap(new IdentityHashMap<>());
* {@link ThreadState} references and guards a private final Deque<DocumentsWriterPerThread> freeList = new ArrayDeque<>();
* {@link DocumentsWriterPerThread} instance that is used during indexing to private final IOSupplier<DocumentsWriterPerThread> dwptFactory;
* build a in-memory index segment. {@link ThreadState} also holds all flush private int takenWriterPermits = 0;
* related per-thread data controlled by {@link DocumentsWriterFlushControl}. private boolean closed;
* <p>
* A {@link ThreadState}, its methods and members should only accessed by one
* thread a time. Users must acquire the lock via {@link ThreadState#lock()}
* and release the lock in a finally block via {@link ThreadState#unlock()}
* before accessing the state.
*/
@SuppressWarnings("serial")
final static class ThreadState extends ReentrantLock {
DocumentsWriterPerThread dwpt;
// TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
volatile boolean flushPending = false;
// TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0;
// set by DocumentsWriter after each indexing op finishes
volatile long lastSeqNo;
ThreadState(DocumentsWriterPerThread dpwt) { DocumentsWriterPerThreadPool(IOSupplier<DocumentsWriterPerThread> dwptFactory) {
this.dwpt = dpwt; this.dwptFactory = dwptFactory;
}
private void reset() {
assert this.isHeldByCurrentThread();
this.dwpt = null;
this.bytesUsed = 0;
this.flushPending = false;
}
boolean isInitialized() {
assert this.isHeldByCurrentThread();
return dwpt != null;
} }
/** /**
* Returns the number of currently active bytes in this ThreadState's * Returns the active number of {@link DocumentsWriterPerThread} instances.
* {@link DocumentsWriterPerThread}
*/ */
public long getBytesUsedPerThread() { synchronized int size() {
assert this.isHeldByCurrentThread(); return dwpts.size();
// public for FlushPolicy
return bytesUsed;
} }
/** synchronized void lockNewWriters() {
* Returns this {@link ThreadState}s {@link DocumentsWriterPerThread} // this is similar to a semaphore - we need to acquire all permits ie. takenWriterPermits must be == 0
*/ // any call to lockNewWriters() must be followed by unlockNewWriters() otherwise we will deadlock at some
public DocumentsWriterPerThread getDocumentsWriterPerThread() {
assert this.isHeldByCurrentThread();
// public for FlushPolicy
return dwpt;
}
/**
* Returns <code>true</code> iff this {@link ThreadState} is marked as flush
* pending otherwise <code>false</code>
*/
public boolean isFlushPending() {
return flushPending;
}
}
private final List<ThreadState> threadStates = new ArrayList<>();
private final List<ThreadState> freeList = new ArrayList<>();
private int takenThreadStatePermits = 0;
/**
* Returns the active number of {@link ThreadState} instances.
*/
synchronized int getActiveThreadStateCount() {
return threadStates.size();
}
synchronized void lockNewThreadStates() {
// this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0
// any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some
// point // point
assert takenThreadStatePermits >= 0; assert takenWriterPermits >= 0;
takenThreadStatePermits++; takenWriterPermits++;
} }
synchronized void unlockNewThreadStates() { synchronized void unlockNewWriters() {
assert takenThreadStatePermits > 0; assert takenWriterPermits > 0;
takenThreadStatePermits--; takenWriterPermits--;
if (takenThreadStatePermits == 0) { if (takenWriterPermits == 0) {
notifyAll(); notifyAll();
} }
} }
/** /**
* Returns a new {@link ThreadState} iff any new state is available otherwise * Returns a new already locked {@link DocumentsWriterPerThread}
* <code>null</code>.
* <p>
* NOTE: the returned {@link ThreadState} is already locked iff non-
* <code>null</code>.
* *
* @return a new {@link ThreadState} iff any new state is available otherwise * @return a new {@link DocumentsWriterPerThread}
* <code>null</code>
*/ */
private synchronized ThreadState newThreadState() { private synchronized DocumentsWriterPerThread newWriter() throws IOException {
assert takenThreadStatePermits >= 0; assert takenWriterPermits >= 0;
while (takenThreadStatePermits > 0) { while (takenWriterPermits > 0) {
// we can't create new thread-states while not all permits are available // we can't create new DWPTs while not all permits are available
try { try {
wait(); wait();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie); throw new ThreadInterruptedException(ie);
} }
} }
ThreadState threadState = new ThreadState(null); DocumentsWriterPerThread dwpt = dwptFactory.get();
threadState.lock(); // lock so nobody else will get this ThreadState dwpt.lock(); // lock so nobody else will get this DWPT
threadStates.add(threadState); dwpts.add(dwpt);
return threadState;
}
DocumentsWriterPerThread reset(ThreadState threadState) {
assert threadState.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt = threadState.dwpt;
threadState.reset();
return dwpt; return dwpt;
} }
void recycle(DocumentsWriterPerThread dwpt) {
// don't recycle DWPT by default
}
// TODO: maybe we should try to do load leveling here: we want roughly even numbers // TODO: maybe we should try to do load leveling here: we want roughly even numbers
// of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */ /** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */
ThreadState getAndLock() { DocumentsWriterPerThread getAndLock() throws IOException {
ThreadState threadState = null;
synchronized (this) { synchronized (this) {
if (freeList.isEmpty()) { if (closed) {
// ThreadState is already locked before return by this method: throw new AlreadyClosedException("DWPTPool is already closed");
return newThreadState(); }
} else { // Important that we are LIFO here! This way if number of concurrent indexing threads was once high,
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a // but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we have suddenly
// limited number of thread states: // a single thread indexing
threadState = freeList.remove(freeList.size()-1); final Iterator<DocumentsWriterPerThread> descendingIterator = freeList.descendingIterator();
while (descendingIterator.hasNext()) {
if (threadState.dwpt == null) { DocumentsWriterPerThread perThread = descendingIterator.next();
// This thread-state is not initialized, e.g. it if (perThread.tryLock()) {
// was just flushed. See if we can instead find descendingIterator.remove();
// another free thread state that already has docs return perThread;
// indexed. This way if incoming thread concurrency
// has decreased, we don't leave docs
// indefinitely buffered, tying up RAM. This
// will instead get those thread states flushed,
// freeing up RAM for larger segment flushes:
for(int i=0;i<freeList.size();i++) {
ThreadState ts = freeList.get(i);
if (ts.dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
freeList.set(i, threadState);
threadState = ts;
break;
}
} }
} }
// DWPT is already locked before return by this method:
return newWriter();
} }
} }
// This could take time, e.g. if the threadState is [briefly] checked for flushing: void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
threadState.lock();
return threadState;
}
void release(ThreadState state) {
state.unlock();
synchronized (this) { synchronized (this) {
assert dwpts.contains(state) : "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
freeList.add(state); freeList.add(state);
} }
state.unlock();
}
@Override
public synchronized Iterator<DocumentsWriterPerThread> iterator() {
return List.copyOf(dwpts).iterator(); // copy on read - this is a quick op since num states is low
} }
/** /**
* Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the * Filters all DWPTs the given predicate applies to and that can be checked out of the pool via
* given ord. * {@link #checkout(DocumentsWriterPerThread)}. All DWPTs returned from this method are already locked
* * and {@link #isRegistered(DocumentsWriterPerThread)} will return <code>true</code> for all returned DWPTs
* @param ord
* the ordinal of the {@link ThreadState}
* @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
* given ord.
*/ */
synchronized ThreadState getThreadState(int ord) { List<DocumentsWriterPerThread> filterAndLock(Predicate<DocumentsWriterPerThread> predicate) {
return threadStates.get(ord); List<DocumentsWriterPerThread> list = new ArrayList<>();
for (DocumentsWriterPerThread perThread : this) {
if (predicate.test(perThread)) {
perThread.lock();
if (isRegistered(perThread)) {
list.add(perThread);
} else {
// somebody else has taken this DWPT out of the pool.
// unlock and let it go
perThread.unlock();
}
}
}
return Collections.unmodifiableList(list);
} }
// TODO: merge this with getActiveThreadStateCount: they are the same! /**
synchronized int getMaxThreadStates() { * Removes the given DWPT from the pool unless it's already been removed before.
return threadStates.size(); * @return <code>true</code> iff the given DWPT has been removed. Otherwise <code>false</code>
*/
synchronized boolean checkout(DocumentsWriterPerThread perThread) {
assert perThread.isHeldByCurrentThread();
if (dwpts.remove(perThread)) {
freeList.remove(perThread);
} else {
assert freeList.contains(perThread) == false;
return false;
}
return true;
}
/**
* Returns <code>true</code> if this DWPT is still part of the pool
*/
synchronized boolean isRegistered(DocumentsWriterPerThread perThread) {
return dwpts.contains(perThread);
}
@Override
public synchronized void close() {
this.closed = true;
} }
} }

View File

@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Map; import java.util.Map;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** /**
@ -32,9 +31,9 @@ import org.apache.lucene.util.ThreadInterruptedException;
* <p> * <p>
* To prevent OOM Errors and ensure IndexWriter's stability this class blocks * To prevent OOM Errors and ensure IndexWriter's stability this class blocks
* incoming threads from indexing once 2 x number of available * incoming threads from indexing once 2 x number of available
* {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded. * {@link DocumentsWriterPerThread}s in {@link DocumentsWriterPerThreadPool} is exceeded.
* Once flushing catches up and the number of flushing DWPT is equal or lower * Once flushing catches up and the number of flushing DWPT is equal or lower
* than the number of active {@link ThreadState}s threads are released and can * than the number of active {@link DocumentsWriterPerThread}s threads are released and can
* continue indexing. * continue indexing.
*/ */
final class DocumentsWriterStallControl { final class DocumentsWriterStallControl {

View File

@ -17,8 +17,6 @@
package org.apache.lucene.index; package org.apache.lucene.index;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
/** /**
* Default {@link FlushPolicy} implementation that flushes new segments based on * Default {@link FlushPolicy} implementation that flushes new segments based on
* RAM used and document count depending on the IndexWriter's * RAM used and document count depending on the IndexWriter's
@ -27,11 +25,11 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
* *
* <ul> * <ul>
* <li> * <li>
* {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
* - applies pending delete operations based on the global number of buffered * - applies pending delete operations based on the global number of buffered
* delete terms if the consumed memory is greater than {@link IndexWriterConfig#getRAMBufferSizeMB()}</li>. * delete terms if the consumed memory is greater than {@link IndexWriterConfig#getRAMBufferSizeMB()}</li>.
* <li> * <li>
* {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
* - flushes either on the number of documents per * - flushes either on the number of documents per
* {@link DocumentsWriterPerThread} ( * {@link DocumentsWriterPerThread} (
* {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active * {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active
@ -39,11 +37,11 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
* {@link IndexWriterConfig#getMaxBufferedDocs()} or * {@link IndexWriterConfig#getMaxBufferedDocs()} or
* {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively</li> * {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively</li>
* <li> * <li>
* {@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} * {@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
* - calls * - calls
* {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
* and * and
* {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
* in order</li> * in order</li>
* </ul> * </ul>
* All {@link IndexWriterConfig} settings are used to mark * All {@link IndexWriterConfig} settings are used to mark
@ -58,7 +56,7 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
class FlushByRamOrCountsPolicy extends FlushPolicy { class FlushByRamOrCountsPolicy extends FlushPolicy {
@Override @Override
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
if ((flushOnRAM() && control.getDeleteBytesUsed() > 1024*1024*indexWriterConfig.getRAMBufferSizeMB())) { if ((flushOnRAM() && control.getDeleteBytesUsed() > 1024*1024*indexWriterConfig.getRAMBufferSizeMB())) {
control.setApplyAllDeletes(); control.setApplyAllDeletes();
if (infoStream.isEnabled("FP")) { if (infoStream.isEnabled("FP")) {
@ -68,12 +66,12 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
} }
@Override @Override
public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
if (flushOnDocCount() if (flushOnDocCount()
&& state.dwpt.getNumDocsInRAM() >= indexWriterConfig && perThread.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) { .getMaxBufferedDocs()) {
// Flush this state by num docs // Flush this state by num docs
control.setFlushPending(state); control.setFlushPending(perThread);
} else if (flushOnRAM()) {// flush by RAM } else if (flushOnRAM()) {// flush by RAM
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
@ -81,7 +79,7 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
if (infoStream.isEnabled("FP")) { if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
} }
markLargestWriterPending(control, state, totalRam); markLargestWriterPending(control, perThread);
} }
} }
} }
@ -91,8 +89,8 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
* pending * pending
*/ */
protected void markLargestWriterPending(DocumentsWriterFlushControl control, protected void markLargestWriterPending(DocumentsWriterFlushControl control,
ThreadState perThreadState, final long currentBytesPerThread) { DocumentsWriterPerThread perThread) {
ThreadState largestNonPendingWriter = findLargestNonPendingWriter(control, perThreadState); DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter(control, perThread);
if (largestNonPendingWriter != null) { if (largestNonPendingWriter != null) {
control.setFlushPending(largestNonPendingWriter); control.setFlushPending(largestNonPendingWriter);
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.lucene.index; package org.apache.lucene.index;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
@ -35,13 +34,12 @@ import org.apache.lucene.util.InfoStream;
* {@link IndexWriter} consults the provided {@link FlushPolicy} to control the * {@link IndexWriter} consults the provided {@link FlushPolicy} to control the
* flushing process. The policy is informed for each added or updated document * flushing process. The policy is informed for each added or updated document
* as well as for each delete term. Based on the {@link FlushPolicy}, the * as well as for each delete term. Based on the {@link FlushPolicy}, the
* information provided via {@link ThreadState} and * information provided via {@link DocumentsWriterPerThread} and
* {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a * {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a
* {@link DocumentsWriterPerThread} needs flushing and mark it as flush-pending * {@link DocumentsWriterPerThread} needs flushing and mark it as flush-pending
* via {@link DocumentsWriterFlushControl#setFlushPending}, or if deletes need * via {@link DocumentsWriterFlushControl#setFlushPending}, or if deletes need
* to be applied. * to be applied.
* *
* @see ThreadState
* @see DocumentsWriterFlushControl * @see DocumentsWriterFlushControl
* @see DocumentsWriterPerThread * @see DocumentsWriterPerThread
* @see IndexWriterConfig#setFlushPolicy(FlushPolicy) * @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
@ -52,38 +50,38 @@ abstract class FlushPolicy {
/** /**
* Called for each delete term. If this is a delete triggered due to an update * Called for each delete term. If this is a delete triggered due to an update
* the given {@link ThreadState} is non-null. * the given {@link DocumentsWriterPerThread} is non-null.
* <p> * <p>
* Note: This method is called synchronized on the given * Note: This method is called synchronized on the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
* thread holds the lock on the given {@link ThreadState} * thread holds the lock on the given {@link DocumentsWriterPerThread}
*/ */
public abstract void onDelete(DocumentsWriterFlushControl control, public abstract void onDelete(DocumentsWriterFlushControl control,
ThreadState state); DocumentsWriterPerThread perThread);
/** /**
* Called for each document update on the given {@link ThreadState}'s * Called for each document update on the given {@link DocumentsWriterPerThread}'s
* {@link DocumentsWriterPerThread}. * {@link DocumentsWriterPerThread}.
* <p> * <p>
* Note: This method is called synchronized on the given * Note: This method is called synchronized on the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
* thread holds the lock on the given {@link ThreadState} * thread holds the lock on the given {@link DocumentsWriterPerThread}
*/ */
public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) { public void onUpdate(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
onInsert(control, state); onInsert(control, perThread);
onDelete(control, state); onDelete(control, perThread);
} }
/** /**
* Called for each document addition on the given {@link ThreadState}s * Called for each document addition on the given {@link DocumentsWriterPerThread}s
* {@link DocumentsWriterPerThread}. * {@link DocumentsWriterPerThread}.
* <p> * <p>
* Note: This method is synchronized by the given * Note: This method is synchronized by the given
* {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
* thread holds the lock on the given {@link ThreadState} * thread holds the lock on the given {@link DocumentsWriterPerThread}
*/ */
public abstract void onInsert(DocumentsWriterFlushControl control, public abstract void onInsert(DocumentsWriterFlushControl control,
ThreadState state); DocumentsWriterPerThread perThread);
/** /**
* Called by DocumentsWriter to initialize the FlushPolicy * Called by DocumentsWriter to initialize the FlushPolicy
@ -94,18 +92,18 @@ abstract class FlushPolicy {
} }
/** /**
* Returns the current most RAM consuming non-pending {@link ThreadState} with * Returns the current most RAM consuming non-pending {@link DocumentsWriterPerThread} with
* at least one indexed document. * at least one indexed document.
* <p> * <p>
* This method will never return <code>null</code> * This method will never return <code>null</code>
*/ */
protected ThreadState findLargestNonPendingWriter( protected DocumentsWriterPerThread findLargestNonPendingWriter(
DocumentsWriterFlushControl control, ThreadState perThreadState) { DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
assert perThreadState.dwpt.getNumDocsInRAM() > 0; assert perThread.getNumDocsInRAM() > 0;
// the dwpt which needs to be flushed eventually // the dwpt which needs to be flushed eventually
ThreadState maxRamUsingThreadState = control.findLargestNonPendingWriter(); DocumentsWriterPerThread maxRamUsingWriter = control.findLargestNonPendingWriter();
assert assertMessage("set largest ram consuming thread pending on lower watermark"); assert assertMessage("set largest ram consuming thread pending on lower watermark");
return maxRamUsingThreadState; return maxRamUsingWriter;
} }
private boolean assertMessage(String s) { private boolean assertMessage(String s) {

View File

@ -2465,7 +2465,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
globalFieldNumberMap.clear(); globalFieldNumberMap.clear();
success = true; success = true;
long seqNo = docWriter.deleteQueue.getNextSequenceNumber(); long seqNo = docWriter.deleteQueue.getNextSequenceNumber();
docWriter.setLastSeqNo(seqNo);
return seqNo; return seqNo;
} finally { } finally {
if (success == false) { if (success == false) {
@ -4947,7 +4946,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// finishStartCommit // finishStartCommit
// startCommitMergeDeletes // startCommitMergeDeletes
// startMergeInit // startMergeInit
// DocumentsWriter.ThreadState.init start // DocumentsWriterPerThread addDocuments start
private final void testPoint(String message) { private final void testPoint(String message) {
if (enableTestPoints) { if (enableTestPoints) {
assert infoStream.isEnabled("TP"); // don't enable unless you need them. assert infoStream.isEnabled("TP"); // don't enable unless you need them.

View File

@ -312,28 +312,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
return mergePolicy; return mergePolicy;
} }
/** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
* IndexWriter to assign thread-states to incoming indexing threads.
* <p>
* NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
* other {@link IndexWriter} instances once it has been initialized / associated with an
* {@link IndexWriter}.
* </p>
* <p>
* NOTE: This only takes effect when IndexWriter is first created.</p>*/
IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
if (threadPool == null) {
throw new IllegalArgumentException("threadPool must not be null");
}
this.indexerThreadPool = threadPool;
return this;
}
@Override
DocumentsWriterPerThreadPool getIndexerThreadPool() {
return indexerThreadPool;
}
/** By default, IndexWriter does not pool the /** By default, IndexWriter does not pool the
* SegmentReaders it must open for deletions and * SegmentReaders it must open for deletions and
* merging, unless a near-real-time reader has been * merging, unless a near-real-time reader has been

View File

@ -80,10 +80,6 @@ public class LiveIndexWriterConfig {
/** {@link MergePolicy} for selecting merges. */ /** {@link MergePolicy} for selecting merges. */
protected volatile MergePolicy mergePolicy; protected volatile MergePolicy mergePolicy;
/** {@code DocumentsWriterPerThreadPool} to control how
* threads are allocated to {@code DocumentsWriterPerThread}. */
protected volatile DocumentsWriterPerThreadPool indexerThreadPool;
/** True if readers should be pooled. */ /** True if readers should be pooled. */
protected volatile boolean readerPooling; protected volatile boolean readerPooling;
@ -135,7 +131,6 @@ public class LiveIndexWriterConfig {
mergePolicy = new TieredMergePolicy(); mergePolicy = new TieredMergePolicy();
flushPolicy = new FlushByRamOrCountsPolicy(); flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
indexerThreadPool = new DocumentsWriterPerThreadPool();
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
} }
@ -347,16 +342,6 @@ public class LiveIndexWriterConfig {
return mergePolicy; return mergePolicy;
} }
/**
* Returns the configured {@link DocumentsWriterPerThreadPool} instance.
*
* @see IndexWriterConfig#setIndexerThreadPool(DocumentsWriterPerThreadPool)
* @return the configured {@link DocumentsWriterPerThreadPool} instance.
*/
DocumentsWriterPerThreadPool getIndexerThreadPool() {
return indexerThreadPool;
}
/** /**
* Returns {@code true} if {@link IndexWriter} should pool readers even if * Returns {@code true} if {@link IndexWriter} should pool readers even if
* {@link DirectoryReader#open(IndexWriter)} has not been called. * {@link DirectoryReader#open(IndexWriter)} has not been called.
@ -492,7 +477,6 @@ public class LiveIndexWriterConfig {
sb.append("codec=").append(getCodec()).append("\n"); sb.append("codec=").append(getCodec()).append("\n");
sb.append("infoStream=").append(getInfoStream().getClass().getName()).append("\n"); sb.append("infoStream=").append(getInfoStream().getClass().getName()).append("\n");
sb.append("mergePolicy=").append(getMergePolicy()).append("\n"); sb.append("mergePolicy=").append(getMergePolicy()).append("\n");
sb.append("indexerThreadPool=").append(getIndexerThreadPool()).append("\n");
sb.append("readerPooling=").append(getReaderPooling()).append("\n"); sb.append("readerPooling=").append(getReaderPooling()).append("\n");
sb.append("perThreadHardLimitMB=").append(getRAMPerThreadHardLimitMB()).append("\n"); sb.append("perThreadHardLimitMB=").append(getRAMPerThreadHardLimitMB()).append("\n");
sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n"); sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n");

View File

@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LineFileDocs;
@ -70,8 +69,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig(analyzer) IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
.setFlushPolicy(flushPolicy); .setFlushPolicy(flushPolicy);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
iwc.setRAMBufferSizeMB(maxRamMB); iwc.setRAMBufferSizeMB(maxRamMB);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
IndexWriter writer = new IndexWriter(dir, iwc); IndexWriter writer = new IndexWriter(dir, iwc);
@ -125,8 +122,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig(analyzer) IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
.setFlushPolicy(flushPolicy); .setFlushPolicy(flushPolicy);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
iwc.setMaxBufferedDocs(2 + atLeast(10)); iwc.setMaxBufferedDocs(2 + atLeast(10));
iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
IndexWriter writer = new IndexWriter(dir, iwc); IndexWriter writer = new IndexWriter(dir, iwc);
@ -173,9 +168,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy(); MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
iwc.setFlushPolicy(flushPolicy); iwc.setFlushPolicy(flushPolicy);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
IndexWriter writer = new IndexWriter(dir, iwc); IndexWriter writer = new IndexWriter(dir, iwc);
flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy(); flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
DocumentsWriter docsWriter = writer.getDocsWriter(); DocumentsWriter docsWriter = writer.getDocsWriter();
@ -237,8 +229,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy(); FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
iwc.setFlushPolicy(flushPolicy); iwc.setFlushPolicy(flushPolicy);
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
iwc.setIndexerThreadPool(threadPool);
// with such a small ram buffer we should be stalled quite quickly // with such a small ram buffer we should be stalled quite quickly
iwc.setRAMBufferSizeMB(0.25); iwc.setRAMBufferSizeMB(0.25);
IndexWriter writer = new IndexWriter(dir, iwc); IndexWriter writer = new IndexWriter(dir, iwc);
@ -273,13 +263,11 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
} }
protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) { protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates(); Iterator<DocumentsWriterPerThread> allActiveWriter = flushControl.allActiveWriters();
long bytesUsed = 0; long bytesUsed = 0;
while (allActiveThreads.hasNext()) { while (allActiveWriter.hasNext()) {
ThreadState next = allActiveThreads.next(); DocumentsWriterPerThread next = allActiveWriter.next();
if (next.dwpt != null) { bytesUsed += next.bytesUsed();
bytesUsed += next.dwpt.bytesUsed();
}
} }
assertEquals(bytesUsed, flushControl.activeBytes()); assertEquals(bytesUsed, flushControl.activeBytes());
} }
@ -332,81 +320,81 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
boolean hasMarkedPending = false; boolean hasMarkedPending = false;
@Override @Override
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
final ArrayList<ThreadState> pending = new ArrayList<>(); final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
final ArrayList<ThreadState> notPending = new ArrayList<>(); final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
findPending(control, pending, notPending); findPending(control, pending, notPending);
final boolean flushCurrent = state.flushPending; final boolean flushCurrent = perThread.isFlushPending();
final ThreadState toFlush; final DocumentsWriterPerThread toFlush;
if (state.flushPending) { if (perThread.isFlushPending()) {
toFlush = state; toFlush = perThread;
} else { } else {
toFlush = null; toFlush = null;
} }
super.onDelete(control, state); super.onDelete(control, perThread);
if (toFlush != null) { if (toFlush != null) {
if (flushCurrent) { if (flushCurrent) {
assertTrue(pending.remove(toFlush)); assertTrue(pending.remove(toFlush));
} else { } else {
assertTrue(notPending.remove(toFlush)); assertTrue(notPending.remove(toFlush));
} }
assertTrue(toFlush.flushPending); assertTrue(toFlush.isFlushPending());
hasMarkedPending = true; hasMarkedPending = true;
} }
for (ThreadState threadState : notPending) { for (DocumentsWriterPerThread dwpt : notPending) {
assertFalse(threadState.flushPending); assertFalse(dwpt.isFlushPending());
} }
} }
@Override @Override
public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread dwpt) {
final ArrayList<ThreadState> pending = new ArrayList<>(); final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
final ArrayList<ThreadState> notPending = new ArrayList<>(); final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
findPending(control, pending, notPending); findPending(control, pending, notPending);
final boolean flushCurrent = state.flushPending; final boolean flushCurrent = dwpt.isFlushPending();
long activeBytes = control.activeBytes(); long activeBytes = control.activeBytes();
final ThreadState toFlush; final DocumentsWriterPerThread toFlush;
if (state.flushPending) { if (dwpt.isFlushPending()) {
toFlush = state; toFlush = dwpt;
} else if (flushOnDocCount() } else if (flushOnDocCount()
&& state.dwpt.getNumDocsInRAM() >= indexWriterConfig && dwpt.getNumDocsInRAM() >= indexWriterConfig
.getMaxBufferedDocs()) { .getMaxBufferedDocs()) {
toFlush = state; toFlush = dwpt;
} else if (flushOnRAM() } else if (flushOnRAM()
&& activeBytes >= (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024. * 1024.)) { && activeBytes >= (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024. * 1024.)) {
toFlush = findLargestNonPendingWriter(control, state); toFlush = findLargestNonPendingWriter(control, dwpt);
assertFalse(toFlush.flushPending); assertFalse(toFlush.isFlushPending());
} else { } else {
toFlush = null; toFlush = null;
} }
super.onInsert(control, state); super.onInsert(control, dwpt);
if (toFlush != null) { if (toFlush != null) {
if (flushCurrent) { if (flushCurrent) {
assertTrue(pending.remove(toFlush)); assertTrue(pending.remove(toFlush));
} else { } else {
assertTrue(notPending.remove(toFlush)); assertTrue(notPending.remove(toFlush));
} }
assertTrue(toFlush.flushPending); assertTrue(toFlush.isFlushPending());
hasMarkedPending = true; hasMarkedPending = true;
} else { } else {
peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush); peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(), peakDocCountWithoutFlush = Math.max(dwpt.getNumDocsInRAM(),
peakDocCountWithoutFlush); peakDocCountWithoutFlush);
} }
for (ThreadState threadState : notPending) { for (DocumentsWriterPerThread perThread : notPending) {
assertFalse(threadState.flushPending); assertFalse(perThread.isFlushPending());
} }
} }
} }
static void findPending(DocumentsWriterFlushControl flushControl, static void findPending(DocumentsWriterFlushControl flushControl,
ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) { ArrayList<DocumentsWriterPerThread> pending, ArrayList<DocumentsWriterPerThread> notPending) {
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates(); Iterator<DocumentsWriterPerThread> allActiveThreads = flushControl.allActiveWriters();
while (allActiveThreads.hasNext()) { while (allActiveThreads.hasNext()) {
ThreadState next = allActiveThreads.next(); DocumentsWriterPerThread next = allActiveThreads.next();
if (next.flushPending) { if (next.isFlushPending()) {
pending.add(next); pending.add(next);
} else { } else {
notPending.add(next); notPending.add(next);

View File

@ -22,7 +22,9 @@ import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader; import java.io.StringReader;
import java.io.StringWriter;
import java.net.URI; import java.net.URI;
import java.nio.file.FileSystem; import java.nio.file.FileSystem;
import java.nio.file.Files; import java.nio.file.Files;
@ -2918,16 +2920,15 @@ public class TestIndexWriter extends LuceneTestCase {
public void testFlushLargestWriter() throws IOException, InterruptedException { public void testFlushLargestWriter() throws IOException, InterruptedException {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
int numDocs = indexDocsForMultipleThreadStates(w); int numDocs = indexDocsForMultipleDWPTs(w);
DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter DocumentsWriterPerThread largestNonPendingWriter
= w.docWriter.flushControl.findLargestNonPendingWriter(); = w.docWriter.flushControl.findLargestNonPendingWriter();
assertFalse(largestNonPendingWriter.flushPending); assertFalse(largestNonPendingWriter.isFlushPending());
assertNotNull(largestNonPendingWriter.dwpt);
int numRamDocs = w.numRamDocs(); int numRamDocs = w.numRamDocs();
int numDocsInDWPT = largestNonPendingWriter.dwpt.getNumDocsInRAM(); int numDocsInDWPT = largestNonPendingWriter.getNumDocsInRAM();
assertTrue(w.flushNextBuffer()); assertTrue(w.flushNextBuffer());
assertNull(largestNonPendingWriter.dwpt); assertTrue(largestNonPendingWriter.hasFlushed());
assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs()); assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs());
// make sure it's not locked // make sure it's not locked
@ -2943,7 +2944,7 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close(); dir.close();
} }
private int indexDocsForMultipleThreadStates(IndexWriter w) throws InterruptedException { private int indexDocsForMultipleDWPTs(IndexWriter w) throws InterruptedException {
Thread[] threads = new Thread[3]; Thread[] threads = new Thread[3];
CountDownLatch latch = new CountDownLatch(threads.length); CountDownLatch latch = new CountDownLatch(threads.length);
int numDocsPerThread = 10 + random().nextInt(30); int numDocsPerThread = 10 + random().nextInt(30);
@ -2973,16 +2974,16 @@ public class TestIndexWriter extends LuceneTestCase {
public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException { public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
indexDocsForMultipleThreadStates(w); indexDocsForMultipleDWPTs(w);
DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter DocumentsWriterPerThread largestNonPendingWriter
= w.docWriter.flushControl.findLargestNonPendingWriter(); = w.docWriter.flushControl.findLargestNonPendingWriter();
assertFalse(largestNonPendingWriter.flushPending); assertFalse(largestNonPendingWriter.isFlushPending());
assertNotNull(largestNonPendingWriter.dwpt); assertFalse(largestNonPendingWriter.hasFlushed());
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); int threadPoolSize = w.docWriter.perThreadPool.size();
w.docWriter.flushControl.markForFullFlush(); w.docWriter.flushControl.markForFullFlush();
DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter(); DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter();
assertNull(documentsWriterPerThread); assertNull(documentsWriterPerThread);
assertEquals(activeThreadStateCount, w.docWriter.flushControl.numQueuedFlushes()); assertEquals(threadPoolSize, w.docWriter.flushControl.numQueuedFlushes());
w.docWriter.flushControl.abortFullFlushes(); w.docWriter.flushControl.abortFullFlushes();
assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter()); assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter());
assertEquals(0, w.docWriter.flushControl.numQueuedFlushes()); assertEquals(0, w.docWriter.flushControl.numQueuedFlushes());
@ -2993,11 +2994,11 @@ public class TestIndexWriter extends LuceneTestCase {
public void testHoldLockOnLargestWriter() throws IOException, InterruptedException { public void testHoldLockOnLargestWriter() throws IOException, InterruptedException {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
int numDocs = indexDocsForMultipleThreadStates(w); int numDocs = indexDocsForMultipleDWPTs(w);
DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter DocumentsWriterPerThread largestNonPendingWriter
= w.docWriter.flushControl.findLargestNonPendingWriter(); = w.docWriter.flushControl.findLargestNonPendingWriter();
assertFalse(largestNonPendingWriter.flushPending); assertFalse(largestNonPendingWriter.isFlushPending());
assertNotNull(largestNonPendingWriter.dwpt); assertFalse(largestNonPendingWriter.hasFlushed());
CountDownLatch wait = new CountDownLatch(1); CountDownLatch wait = new CountDownLatch(1);
CountDownLatch locked = new CountDownLatch(1); CountDownLatch locked = new CountDownLatch(1);
@ -3030,7 +3031,7 @@ public class TestIndexWriter extends LuceneTestCase {
lockThread.join(); lockThread.join();
flushThread.join(); flushThread.join();
assertNull("largest DWPT should be flushed", largestNonPendingWriter.dwpt); assertTrue("largest DWPT should be flushed", largestNonPendingWriter.hasFlushed());
// make sure it's not locked // make sure it's not locked
largestNonPendingWriter.lock(); largestNonPendingWriter.lock();
largestNonPendingWriter.unlock(); largestNonPendingWriter.unlock();
@ -3116,21 +3117,19 @@ public class TestIndexWriter extends LuceneTestCase {
} }
private static void waitForDocsInBuffers(IndexWriter w, int buffersWithDocs) { private static void waitForDocsInBuffers(IndexWriter w, int buffersWithDocs) {
// wait until at least N threadstates have a doc in order to observe // wait until at least N DWPTs have a doc in order to observe
// who flushes the segments. // who flushes the segments.
while(true) { while(true) {
int numStatesWithDocs = 0; int numStatesWithDocs = 0;
DocumentsWriterPerThreadPool perThreadPool = w.docWriter.perThreadPool; DocumentsWriterPerThreadPool perThreadPool = w.docWriter.perThreadPool;
for (int i = 0; i < perThreadPool.getActiveThreadStateCount(); i++) { for (DocumentsWriterPerThread dwpt : perThreadPool) {
DocumentsWriterPerThreadPool.ThreadState threadState = perThreadPool.getThreadState(i); dwpt.lock();
threadState.lock();
try { try {
DocumentsWriterPerThread dwpt = threadState.dwpt; if (dwpt.getNumDocsInRAM() > 1) {
if (dwpt != null && dwpt.getNumDocsInRAM() > 1) {
numStatesWithDocs++; numStatesWithDocs++;
} }
} finally { } finally {
threadState.unlock(); dwpt.unlock();
} }
} }
if (numStatesWithDocs >= buffersWithDocs) { if (numStatesWithDocs >= buffersWithDocs) {
@ -3702,22 +3701,19 @@ public class TestIndexWriter extends LuceneTestCase {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
w.addDocument(new Document()); w.addDocument(new Document());
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); assertEquals(1, w.docWriter.perThreadPool.size());
assertEquals(1, activeThreadStateCount);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
latch.countDown(); latch.countDown();
List<Closeable> states = new ArrayList<>(); List<Closeable> states = new ArrayList<>();
try { try {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
DocumentsWriterPerThreadPool.ThreadState state = w.docWriter.perThreadPool.getAndLock(); DocumentsWriterPerThread state = w.docWriter.perThreadPool.getAndLock();
states.add(state::unlock); states.add(state::unlock);
if (state.isInitialized()) { state.deleteQueue.getNextSequenceNumber();
state.dwpt.deleteQueue.getNextSequenceNumber();
} else {
w.docWriter.deleteQueue.getNextSequenceNumber();
}
} }
} catch (IOException e) {
throw new AssertionError(e);
} finally { } finally {
IOUtils.closeWhileHandlingException(states); IOUtils.closeWhileHandlingException(states);
} }
@ -3774,7 +3770,19 @@ public class TestIndexWriter extends LuceneTestCase {
stopped.set(true); stopped.set(true);
indexer.join(); indexer.join();
refresher.join(); refresher.join();
assertNull("should not consider ACE a tragedy on a closed IW", w.getTragicException()); Throwable e = w.getTragicException();
IOSupplier<String> supplier = () -> {
if (e != null) {
StringWriter writer = new StringWriter();
try (PrintWriter printWriter = new PrintWriter(writer)) {
e.printStackTrace(printWriter);
}
return writer.toString();
} else {
return "";
}
};
assertNull("should not consider ACE a tragedy on a closed IW: " + supplier.get(), w.getTragicException());
IOUtils.close(sm, dir); IOUtils.close(sm, dir);
} }
} }

View File

@ -68,7 +68,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain()); assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain());
assertNull(conf.getMergedSegmentWarmer()); assertNull(conf.getMergedSegmentWarmer());
assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass()); assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass());
assertEquals(DocumentsWriterPerThreadPool.class, conf.getIndexerThreadPool().getClass());
assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass()); assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass());
assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB()); assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB());
assertEquals(Codec.getDefault(), conf.getCodec()); assertEquals(Codec.getDefault(), conf.getCodec());

View File

@ -1285,7 +1285,9 @@ public class TestIndexWriterDelete extends LuceneTestCase {
} }
// First one triggers, but does not reflect, the merge: // First one triggers, but does not reflect, the merge:
if (VERBOSE) {
System.out.println("TEST: now get reader"); System.out.println("TEST: now get reader");
}
DirectoryReader.open(w).close(); DirectoryReader.open(w).close();
IndexReader r = DirectoryReader.open(w); IndexReader r = DirectoryReader.open(w);
assertEquals(1, r.leaves().size()); assertEquals(1, r.leaves().size());

View File

@ -533,8 +533,8 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase {
dir.close(); dir.close();
} }
// LUCENE-1130: make sure immeidate disk full on creating // LUCENE-1130: make sure immediate disk full on creating
// an IndexWriter (hit during DW.ThreadState.init()) is // an IndexWriter (hit during DWPT#updateDocuments()) is
// OK: // OK:
public void testImmediateDiskFull() throws IOException { public void testImmediateDiskFull() throws IOException {
MockDirectoryWrapper dir = newMockDirectory(); MockDirectoryWrapper dir = newMockDirectory();

View File

@ -136,7 +136,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
} }
// LUCENE-1130: make sure immediate disk full on creating // LUCENE-1130: make sure immediate disk full on creating
// an IndexWriter (hit during DW.ThreadState.init()), with // an IndexWriter (hit during DWPT#updateDocuments()), with
// multiple threads, is OK: // multiple threads, is OK:
public void testImmediateDiskFullWithThreads() throws Exception { public void testImmediateDiskFullWithThreads() throws Exception {

View File

@ -224,8 +224,8 @@ public class RandomIndexWriter implements Closeable {
if (LuceneTestCase.VERBOSE) { if (LuceneTestCase.VERBOSE) {
System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount); System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount);
} }
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); int threadPoolSize = w.docWriter.perThreadPool.size();
int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1)); int numFlushes = Math.min(1, r.nextInt(threadPoolSize+1));
for (int i = 0; i < numFlushes; i++) { for (int i = 0; i < numFlushes; i++) {
if (w.flushNextBuffer() == false) { if (w.flushNextBuffer() == false) {
break; // stop once we didn't flush anything break; // stop once we didn't flush anything

View File

@ -267,17 +267,22 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
} }
public synchronized void corruptUnknownFiles() throws IOException { public synchronized void corruptUnknownFiles() throws IOException {
if (LuceneTestCase.VERBOSE) {
System.out.println("MDW: corrupt unknown files"); System.out.println("MDW: corrupt unknown files");
}
Set<String> knownFiles = new HashSet<>(); Set<String> knownFiles = new HashSet<>();
for(String fileName : listAll()) { for(String fileName : listAll()) {
if (fileName.startsWith(IndexFileNames.SEGMENTS)) { if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MDW: read " + fileName + " to gather files it references"); System.out.println("MDW: read " + fileName + " to gather files it references");
}
SegmentInfos infos; SegmentInfos infos;
try { try {
infos = SegmentInfos.readCommit(this, fileName); infos = SegmentInfos.readCommit(this, fileName);
} catch (IOException ioe) { } catch (IOException ioe) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MDW: exception reading segment infos " + fileName + "; files: " + Arrays.toString(listAll())); System.out.println("MDW: exception reading segment infos " + fileName + "; files: " + Arrays.toString(listAll()));
}
throw ioe; throw ioe;
} }
knownFiles.addAll(infos.files(true)); knownFiles.addAll(infos.files(true));
@ -833,8 +838,9 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
// TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles
if (assertNoUnreferencedFilesOnClose) { if (assertNoUnreferencedFilesOnClose) {
if (LuceneTestCase.VERBOSE) {
System.out.println("MDW: now assert no unref'd files at close"); System.out.println("MDW: now assert no unref'd files at close");
}
// now look for unreferenced files: discount ones that we tried to delete but could not // now look for unreferenced files: discount ones that we tried to delete but could not
Set<String> allFiles = new HashSet<>(Arrays.asList(listAll())); Set<String> allFiles = new HashSet<>(Arrays.asList(listAll()));
String[] startFiles = allFiles.toArray(new String[0]); String[] startFiles = allFiles.toArray(new String[0]);