diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 870f4ac00d0..0f18d238cc5 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -120,6 +120,10 @@ API Changes * LUCENE-9265: SimpleFSDirectory is deprecated in favor of NIOFSDirectory. (Yannick Welsch) +* LUCENE-9304: Removed ability to set DocumentsWriterPerThreadPool on IndexWriterConfig. + The DocumentsWriterPerThreadPool is a packaged protected final class which made it impossible + to customize. (Simon Willnauer) + New Features --------------------- (No changes) @@ -134,6 +138,9 @@ Improvements * LUCENE-8050: PerFieldDocValuesFormat should not get the DocValuesFormat on a field that has no doc values. (David Smiley, Juan Rodriguez) +* LUCENE-9304: Removed ThreadState abstraction from DocumentsWriter which allows pooling of DWPT directly and + improves the approachability of the IndexWriter code. (Simon Willnauer) + Optimizations --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index aa4016c985c..965c10d87d0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -32,7 +32,6 @@ import java.util.function.ToLongFunction; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; -import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; @@ -59,21 +58,25 @@ import org.apache.lucene.util.InfoStream; * Threads: * * Multiple threads are allowed into addDocument at once. - * There is an initial synchronized call to getThreadState - * which allocates a ThreadState for this thread. The same - * thread will get the same ThreadState over time (thread - * affinity) so that if there are consistent patterns (for - * example each thread is indexing a different content - * source) then we make better use of RAM. Then - * processDocument is called on that ThreadState without + * There is an initial synchronized call to + * {@link DocumentsWriterFlushControl#obtainAndLock()} + * which allocates a DWPT for this indexing thread. The same + * thread will not necessarily get the same DWPT over time. + * Then updateDocuments is called on that DWPT without * synchronization (most of the "heavy lifting" is in this - * call). Finally the synchronized "finishDocument" is - * called to flush changes to the directory. + * call). Once a DWPT fills up enough RAM or hold enough + * documents in memory the DWPT is checked out for flush + * and all changes are written to the directory. Each DWPT + * corresponds to one segment being written. * - * When flush is called by IndexWriter we forcefully idle - * all threads and flush only once they are all idle. This - * means you can call flush with a given thread even while - * other threads are actively adding/deleting documents. + * When flush is called by IndexWriter we check out all DWPTs + * that are associated with the current {@link DocumentsWriterDeleteQueue} + * out of the {@link DocumentsWriterPerThreadPool} and write + * them to disk. The flush process can piggy-back on incoming + * indexing threads or even block them from adding documents + * if flushing can't keep up with new documents being added. + * Unless the stall control kicks in to block indexing threads + * flushes are happening concurrently to actual index requests. * * * Exceptions: @@ -99,13 +102,8 @@ import org.apache.lucene.util.InfoStream; */ final class DocumentsWriter implements Closeable, Accountable { - private final Directory directoryOrig; // no wrapping, for infos - private final Directory directory; - private final FieldInfos.FieldNumbers globalFieldNumberMap; - private final int indexCreatedVersionMajor; private final AtomicLong pendingNumDocs; - private final boolean enableTestPoints; - private final Supplier segmentNameSupplier; + private final FlushNotifications flushNotifications; private volatile boolean closed; @@ -130,24 +128,23 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterPerThreadPool perThreadPool; final FlushPolicy flushPolicy; final DocumentsWriterFlushControl flushControl; - private long lastSeqNo; - + DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints, Supplier segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory, FieldInfos.FieldNumbers globalFieldNumberMap) { - this.indexCreatedVersionMajor = indexCreatedVersionMajor; - this.directoryOrig = directoryOrig; - this.directory = directory; this.config = config; this.infoStream = config.getInfoStream(); this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream); - this.perThreadPool = config.getIndexerThreadPool(); + this.perThreadPool = new DocumentsWriterPerThreadPool(() -> { + final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap); + return new DocumentsWriterPerThread(indexCreatedVersionMajor, + segmentNameSupplier.get(), directoryOrig, + directory, config, infoStream, deleteQueue, infos, + pendingNumDocs, enableTestPoints); + }); flushPolicy = config.getFlushPolicy(); - this.globalFieldNumberMap = globalFieldNumberMap; this.pendingNumDocs = pendingNumDocs; flushControl = new DocumentsWriterFlushControl(this, config); - this.segmentNameSupplier = segmentNameSupplier; - this.enableTestPoints = enableTestPoints; this.flushNotifications = flushNotifications; } @@ -155,9 +152,6 @@ final class DocumentsWriter implements Closeable, Accountable { return applyDeleteOrUpdate(q -> q.addDelete(queries)); } - void setLastSeqNo(long seqNo) { - lastSeqNo = seqNo; - } long deleteTerms(final Term... terms) throws IOException { return applyDeleteOrUpdate(q -> q.addDelete(terms)); @@ -173,13 +167,12 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; long seqNo = function.applyAsLong(deleteQueue); flushControl.doOnDelete(); - lastSeqNo = Math.max(lastSeqNo, seqNo); if (applyAllDeletes()) { seqNo = -seqNo; } return seqNo; } - + /** If buffered deletes are using too much heap, resolve them and write disk and return true. */ private boolean applyAllDeletes() throws IOException { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; @@ -225,20 +218,23 @@ final class DocumentsWriter implements Closeable, Accountable { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "abort"); } - final int limit = perThreadPool.getActiveThreadStateCount(); - for (int i = 0; i < limit; i++) { - final ThreadState perThread = perThreadPool.getThreadState(i); - perThread.lock(); + for (final DocumentsWriterPerThread perThread : perThreadPool.filterAndLock(x -> true)) { try { - abortThreadState(perThread); + abortDocumentsWriterPerThread(perThread); } finally { perThread.unlock(); } } flushControl.abortPendingFlushes(); flushControl.waitForFlush(); + assert perThreadPool.size() == 0 + : "There are still active DWPT in the pool: " + perThreadPool.size(); success = true; } finally { + if (success) { + assert flushControl.getFlushingBytes() == 0 : "flushingBytes has unexpected value 0 != " + flushControl.getFlushingBytes(); + assert flushControl.netBytes() == 0 : "netBytes has unexpected value 0 != " + flushControl.netBytes(); + } if (infoStream.isEnabled("DW")) { infoStream.message("DW", "done abort success=" + success); } @@ -273,33 +269,34 @@ final class DocumentsWriter implements Closeable, Accountable { pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc()); } }); - List threadStates = new ArrayList<>(); + List writers = new ArrayList<>(); AtomicBoolean released = new AtomicBoolean(false); final Closeable release = () -> { + // we return this closure to unlock all writers once done + // or if hit an exception below in the try block. + // we can't assign this later otherwise the ref can't be final if (released.compareAndSet(false, true)) { // only once if (infoStream.isEnabled("DW")) { infoStream.message("DW", "unlockAllAbortedThread"); } - perThreadPool.unlockNewThreadStates(); - for (ThreadState state : threadStates) { - state.unlock(); + perThreadPool.unlockNewWriters(); + for (DocumentsWriterPerThread writer : writers) { + writer.unlock(); } } }; try { deleteQueue.clear(); - perThreadPool.lockNewThreadStates(); - final int limit = perThreadPool.getMaxThreadStates(); - for (int i = 0; i < limit; i++) { - final ThreadState perThread = perThreadPool.getThreadState(i); - perThread.lock(); - threadStates.add(perThread); - abortThreadState(perThread); + perThreadPool.lockNewWriters(); + writers.addAll(perThreadPool.filterAndLock(x -> true)); + for (final DocumentsWriterPerThread perThread : writers) { + assert perThread.isHeldByCurrentThread(); + abortDocumentsWriterPerThread(perThread); } deleteQueue.clear(); // jump over any possible in flight ops: - deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount() + 1); + deleteQueue.skipSequenceNumbers(perThreadPool.size() + 1); flushControl.abortPendingFlushes(); flushControl.waitForFlush(); @@ -322,35 +319,22 @@ final class DocumentsWriter implements Closeable, Accountable { } /** Returns how many documents were aborted. */ - private int abortThreadState(final ThreadState perThread) throws IOException { + private void abortDocumentsWriterPerThread(final DocumentsWriterPerThread perThread) throws IOException { assert perThread.isHeldByCurrentThread(); - if (perThread.isInitialized()) { - try { - int abortedDocCount = perThread.dwpt.getNumDocsInRAM(); - subtractFlushedNumDocs(abortedDocCount); - perThread.dwpt.abort(); - return abortedDocCount; - } finally { - flushControl.doOnAbort(perThread); - } - } else { + try { + subtractFlushedNumDocs(perThread.getNumDocsInRAM()); + perThread.abort(); + } finally { flushControl.doOnAbort(perThread); - // This DWPT was never initialized so it has no indexed documents: - return 0; } } /** returns the maximum sequence number for all previously completed operations */ - public long getMaxCompletedSequenceNumber() { - long value = lastSeqNo; - int limit = perThreadPool.getMaxThreadStates(); - for(int i = 0; i < limit; i++) { - ThreadState perThread = perThreadPool.getThreadState(i); - value = Math.max(value, perThread.lastSeqNo); - } - return value; + long getMaxCompletedSequenceNumber() { + return deleteQueue.getLastSequenceNumber(); } + boolean anyChanges() { /* * changes are either in a DWPT or in the deleteQueue. @@ -369,23 +353,23 @@ final class DocumentsWriter implements Closeable, Accountable { return anyChanges; } - public int getBufferedDeleteTermsSize() { + int getBufferedDeleteTermsSize() { return deleteQueue.getBufferedUpdatesTermsSize(); } //for testing - public int getNumBufferedDeleteTerms() { + int getNumBufferedDeleteTerms() { return deleteQueue.numGlobalTermDeletes(); } - public boolean anyDeletions() { + boolean anyDeletions() { return deleteQueue.anyChanges(); } @Override - public void close() { + public void close() throws IOException { closed = true; - flushControl.setClosed(); + IOUtils.close(flushControl, perThreadPool); } private boolean preUpdate() throws IOException { @@ -421,37 +405,25 @@ final class DocumentsWriter implements Closeable, Accountable { return hasEvents; } - - private void ensureInitialized(ThreadState state) throws IOException { - if (state.dwpt == null) { - final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap); - state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig, - directory, config, infoStream, deleteQueue, infos, - pendingNumDocs, enableTestPoints); - } - } long updateDocuments(final Iterable> docs, final Analyzer analyzer, final DocumentsWriterDeleteQueue.Node delNode) throws IOException { boolean hasEvents = preUpdate(); - final ThreadState perThread = flushControl.obtainAndLock(); + final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; long seqNo; try { - // This must happen after we've pulled the ThreadState because IW.close - // waits for all ThreadStates to be released: + // This must happen after we've pulled the DWPT because IW.close + // waits for all DWPT to be released: ensureOpen(); - ensureInitialized(perThread); - assert perThread.isInitialized(); - final DocumentsWriterPerThread dwpt = perThread.dwpt; final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications); } finally { if (dwpt.isAborted()) { - flushControl.doOnAbort(perThread); + flushControl.doOnAbort(dwpt); } // We don't know how many documents were actually // counted as indexed, so we must subtract here to @@ -459,13 +431,14 @@ final class DocumentsWriter implements Closeable, Accountable { numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs); } final boolean isUpdate = delNode != null && delNode.isDelete(); - flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); - - assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; - perThread.lastSeqNo = seqNo; - + flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate); } finally { - perThreadPool.release(perThread); + if (dwpt.isFlushPending() || dwpt.isAborted()) { + dwpt.unlock(); + } else { + perThreadPool.marksAsFreeAndUnlock(dwpt); + } + assert dwpt.isHeldByCurrentThread() == false : "we didn't release the dwpt even on abort"; } if (postUpdate(flushingDWPT, hasEvents)) { @@ -477,6 +450,7 @@ final class DocumentsWriter implements Closeable, Accountable { private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { boolean hasEvents = false; while (flushingDWPT != null) { + assert flushingDWPT.hasFlushed() == false; hasEvents = true; boolean success = false; DocumentsWriterFlushQueue.FlushTicket ticket = null; @@ -536,7 +510,7 @@ final class DocumentsWriter implements Closeable, Accountable { * Now we are done and try to flush the ticket queue if the head of the * queue has already finished the flush. */ - if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) { + if (ticketQueue.getTicketCount() >= perThreadPool.size()) { // This means there is a backlog: the one // thread in innerPurge can't keep up with all // other threads flushing segments. In this case @@ -727,7 +701,7 @@ final class DocumentsWriter implements Closeable, Accountable { * * This is a subset of the value returned by {@link #ramBytesUsed()} */ - public long getFlushingBytes() { + long getFlushingBytes() { return flushControl.getFlushingBytes(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index dd339f2cab7..2c1a9358187 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -17,8 +17,9 @@ package org.apache.lucene.index; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; -import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -26,7 +27,7 @@ import java.util.Locale; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.ThreadInterruptedException; @@ -43,7 +44,7 @@ import org.apache.lucene.util.ThreadInterruptedException; * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address * space exhaustion. */ -final class DocumentsWriterFlushControl implements Accountable { +final class DocumentsWriterFlushControl implements Accountable, Closeable { private final long hardMaxBytesPerDWPT; private long activeBytes = 0; @@ -52,11 +53,18 @@ final class DocumentsWriterFlushControl implements Accountable { private int numDocsSinceStalled = 0; // only with assert final AtomicBoolean flushDeletes = new AtomicBoolean(false); private boolean fullFlush = false; + private boolean fullFlushMarkDone = false; // only for assertion that we don't get stale DWPTs from the pool + // The flushQueue is used to concurrently distribute DWPTs that are ready to be flushed ie. when a full flush is in + // progress. This might be triggered by a commit or NRT refresh. The trigger will only walk all eligible DWPTs and + // mark them as flushable putting them in the flushQueue ready for other threads (ie. indexing threads) to help flushing private final Queue flushQueue = new LinkedList<>(); // only for safety reasons if a DWPT is close to the RAM limit - private final Queue blockedFlushes = new LinkedList<>(); - private final IdentityHashMap flushingWriters = new IdentityHashMap<>(); - + private final Queue blockedFlushes = new LinkedList<>(); + // flushingWriters holds all currently flushing writers. There might be writers in this list that + // are also in the flushQueue which means that writers in the flushingWriters list are not necessarily + // already actively flushing. They are only in the state of flushing and might be picked up in the future by + // polling the flushQueue + private final List flushingWriters = new ArrayList<>(); double maxConfiguredRamBuffer = 0; long peakActiveBytes = 0;// only with assert @@ -86,11 +94,11 @@ final class DocumentsWriterFlushControl implements Accountable { return activeBytes; } - public long getFlushingBytes() { + long getFlushingBytes() { return flushBytes; } - public synchronized long netBytes() { + synchronized long netBytes() { return flushBytes + activeBytes; } @@ -139,15 +147,14 @@ final class DocumentsWriterFlushControl implements Accountable { return true; } - private void commitPerThreadBytes(ThreadState perThread) { - final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed; - perThread.bytesUsed += delta; + private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) { + final long delta = perThread.commitLastBytesUsed(); /* * We need to differentiate here if we are pending since setFlushPending * moves the perThread memory to the flushBytes and we could be set to * pending during a delete */ - if (perThread.flushPending) { + if (perThread.isFlushPending()) { flushBytes += delta; } else { activeBytes += delta; @@ -165,16 +172,16 @@ final class DocumentsWriterFlushControl implements Accountable { return true; } - synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) { + synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) { try { commitPerThreadBytes(perThread); - if (!perThread.flushPending) { + if (!perThread.isFlushPending()) { if (isUpdate) { flushPolicy.onUpdate(this, perThread); } else { flushPolicy.onInsert(this, perThread); } - if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) { + if (!perThread.isFlushPending() && perThread.bytesUsed() > hardMaxBytesPerDWPT) { // Safety check to prevent a single DWPT exceeding its RAM limit. This // is super important since we can not address more than 2048 MB per DWPT setFlushPending(perThread); @@ -187,21 +194,24 @@ final class DocumentsWriterFlushControl implements Accountable { } } - private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) { + private DocumentsWriterPerThread checkout(DocumentsWriterPerThread perThread, boolean markPending) { + assert Thread.holdsLock(this); if (fullFlush) { - if (perThread.flushPending) { + if (perThread.isFlushPending()) { checkoutAndBlock(perThread); return nextPendingFlush(); - } else { - return null; } } else { if (markPending) { assert perThread.isFlushPending() == false; setFlushPending(perThread); } - return tryCheckoutForFlush(perThread); + + if (perThread.isFlushPending()) { + return checkOutForFlush(perThread); + } } + return null; } private boolean assertNumDocsSinceStalled(boolean stalled) { @@ -221,11 +231,10 @@ final class DocumentsWriterFlushControl implements Accountable { } synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) { - assert flushingWriters.containsKey(dwpt); + assert flushingWriters.contains(dwpt); try { - Long bytes = flushingWriters.remove(dwpt); - flushBytes -= bytes.longValue(); - perThreadPool.recycle(dwpt); + flushingWriters.remove(dwpt); + flushBytes -= dwpt.getLastCommittedBytesUsed(); assert assertMemory(); } finally { try { @@ -281,15 +290,15 @@ final class DocumentsWriterFlushControl implements Accountable { } /** - * Sets flush pending state on the given {@link ThreadState}. The - * {@link ThreadState} must have indexed at least on Document and must not be + * Sets flush pending state on the given {@link DocumentsWriterPerThread}. The + * {@link DocumentsWriterPerThread} must have indexed at least on Document and must not be * already pending. */ - public synchronized void setFlushPending(ThreadState perThread) { - assert !perThread.flushPending; - if (perThread.dwpt.getNumDocsInRAM() > 0) { - perThread.flushPending = true; // write access synced - final long bytes = perThread.bytesUsed; + public synchronized void setFlushPending(DocumentsWriterPerThread perThread) { + assert !perThread.isFlushPending(); + if (perThread.getNumDocsInRAM() > 0) { + perThread.setFlushPending(); // write access synced + final long bytes = perThread.getLastCommittedBytesUsed(); flushBytes += bytes; activeBytes -= bytes; numPending++; // write access synced @@ -298,70 +307,57 @@ final class DocumentsWriterFlushControl implements Accountable { } - synchronized void doOnAbort(ThreadState state) { + synchronized void doOnAbort(DocumentsWriterPerThread perThread) { try { - if (state.flushPending) { - flushBytes -= state.bytesUsed; + assert perThreadPool.isRegistered(perThread); + assert perThread.isHeldByCurrentThread(); + if (perThread.isFlushPending()) { + flushBytes -= perThread.getLastCommittedBytesUsed(); } else { - activeBytes -= state.bytesUsed; + activeBytes -= perThread.getLastCommittedBytesUsed(); } assert assertMemory(); // Take it out of the loop this DWPT is stale - perThreadPool.reset(state); } finally { updateStallState(); + boolean checkedOut = perThreadPool.checkout(perThread); + assert checkedOut; } } - synchronized DocumentsWriterPerThread tryCheckoutForFlush( - ThreadState perThread) { - return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null; - } - - private void checkoutAndBlock(ThreadState perThread) { - perThread.lock(); - try { - assert perThread.flushPending : "can not block non-pending threadstate"; - assert fullFlush : "can not block if fullFlush == false"; - final DocumentsWriterPerThread dwpt; - final long bytes = perThread.bytesUsed; - dwpt = perThreadPool.reset(perThread); - numPending--; - blockedFlushes.add(new BlockedFlush(dwpt, bytes)); - } finally { - perThread.unlock(); - } + private void checkoutAndBlock(DocumentsWriterPerThread perThread) { + assert perThreadPool.isRegistered(perThread); + assert perThread.isHeldByCurrentThread(); + assert perThread.isFlushPending() : "can not block non-pending threadstate"; + assert fullFlush : "can not block if fullFlush == false"; + numPending--; + blockedFlushes.add(perThread); + boolean checkedOut = perThreadPool.checkout(perThread); + assert checkedOut; } - private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) { + private synchronized DocumentsWriterPerThread checkOutForFlush(DocumentsWriterPerThread perThread) { assert Thread.holdsLock(this); - assert perThread.flushPending; + assert perThread.isFlushPending(); + assert perThread.isHeldByCurrentThread(); + assert perThreadPool.isRegistered(perThread); try { - // We are pending so all memory is already moved to flushBytes - if (perThread.tryLock()) { - try { - if (perThread.isInitialized()) { - assert perThread.isHeldByCurrentThread(); - final DocumentsWriterPerThread dwpt; - final long bytes = perThread.bytesUsed; // do that before - // replace! - dwpt = perThreadPool.reset(perThread); - assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing"; - // Record the flushing DWPT to reduce flushBytes in doAfterFlush - flushingWriters.put(dwpt, Long.valueOf(bytes)); - numPending--; // write access synced - return dwpt; - } - } finally { - perThread.unlock(); - } - } - return null; + addFlushingDWPT(perThread); + numPending--; // write access synced + boolean checkedOut = perThreadPool.checkout(perThread); + assert checkedOut; + return perThread; } finally { updateStallState(); } } + private void addFlushingDWPT(DocumentsWriterPerThread perThread) { + assert flushingWriters.contains(perThread) == false : "DWPT is already flushing"; + // Record the flushing DWPT to reduce flushBytes in doAfterFlush + flushingWriters.add(perThread); + } + @Override public String toString() { return "DocumentsWriterFlushControl [activeBytes=" + activeBytes @@ -380,14 +376,17 @@ final class DocumentsWriterFlushControl implements Accountable { fullFlush = this.fullFlush; numPending = this.numPending; } - if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush - final int limit = perThreadPool.getActiveThreadStateCount(); - for (int i = 0; i < limit && numPending > 0; i++) { - final ThreadState next = perThreadPool.getThreadState(i); - if (next.flushPending) { - final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next); - if (dwpt != null) { - return dwpt; + if (numPending > 0 && fullFlush == false) { // don't check if we are doing a full flush + for (final DocumentsWriterPerThread next : perThreadPool) { + if (next.isFlushPending()) { + if (next.tryLock()) { + try { + if (perThreadPool.isRegistered(next)) { + return checkOutForFlush(next); + } + } finally { + next.unlock(); + } } } } @@ -395,37 +394,17 @@ final class DocumentsWriterFlushControl implements Accountable { return null; } - synchronized void setClosed() { - // set by DW to signal that we should not release new DWPT after close - this.closed = true; + @Override + public synchronized void close() { + // set by DW to signal that we are closing. in this case we try to not stall any threads anymore etc. + closed = true; } /** - * Returns an iterator that provides access to all currently active {@link ThreadState}s + * Returns an iterator that provides access to all currently active {@link DocumentsWriterPerThread}s */ - public Iterator allActiveThreadStates() { - return getPerThreadsIterator(perThreadPool.getActiveThreadStateCount()); - } - - private Iterator getPerThreadsIterator(final int upto) { - return new Iterator() { - int i = 0; - - @Override - public boolean hasNext() { - return i < upto; - } - - @Override - public ThreadState next() { - return perThreadPool.getThreadState(i++); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove() not supported."); - } - }; + public Iterator allActiveWriters() { + return perThreadPool.iterator(); } synchronized void doOnDelete() { @@ -458,74 +437,86 @@ final class DocumentsWriterFlushControl implements Accountable { flushDeletes.set(true); } - ThreadState obtainAndLock() { - final ThreadState perThread = perThreadPool.getAndLock(); - boolean success = false; - try { - if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { - // There is a flush-all in process and this DWPT is - // now stale -- enroll it for flush and try for - // another DWPT: - addFlushableState(perThread); - } - success = true; - // simply return the ThreadState even in a flush all case sine we already hold the lock - return perThread; - } finally { - if (!success) { // make sure we unlock if this fails - perThreadPool.release(perThread); + DocumentsWriterPerThread obtainAndLock() throws IOException { + while (closed == false) { + final DocumentsWriterPerThread perThread = perThreadPool.getAndLock(); + if (perThread.deleteQueue == documentsWriter.deleteQueue) { + // simply return the DWPT even in a flush all case since we already hold the lock and the DWPT is not stale + // since it has the current delete queue associated with it. This means we have established a happens-before + // relationship and all docs indexed into this DWPT are guaranteed to not be flushed with the currently + // progress full flush. + return perThread; + } else { + try { + // we must first assert otherwise the full flush might make progress once we unlock the dwpt + assert fullFlush && fullFlushMarkDone == false : + "found a stale DWPT but full flush mark phase is already done fullFlush: " + + fullFlush + " markDone: " + fullFlushMarkDone; + } finally { + perThread.unlock(); + // There is a flush-all in process and this DWPT is + // now stale - try another one + } } } + throw new AlreadyClosedException("flush control is closed"); } long markForFullFlush() { final DocumentsWriterDeleteQueue flushingQueue; long seqNo; synchronized (this) { - assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running"; - assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer; + assert fullFlush == false: "called DWFC#markForFullFlush() while full flush is still running"; + assert fullFlushMarkDone == false : "full flush collection marker is still set to true"; fullFlush = true; flushingQueue = documentsWriter.deleteQueue; // Set a new delete queue - all subsequent DWPT will use this queue until // we do another full flush - - perThreadPool.lockNewThreadStates(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off + perThreadPool.lockNewWriters(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off try { // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine // if we have some sequence numbers that were never assigned: - seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2; + seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.size() + 2; flushingQueue.maxSeqNo = seqNo + 1; DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1); documentsWriter.deleteQueue = newQueue; - } finally { - perThreadPool.unlockNewThreadStates(); + perThreadPool.unlockNewWriters(); } } - final int limit = perThreadPool.getActiveThreadStateCount(); - for (int i = 0; i < limit; i++) { - final ThreadState next = perThreadPool.getThreadState(i); - next.lock(); - try { - if (!next.isInitialized()) { - continue; + final List fullFlushBuffer = new ArrayList<>(); + for (final DocumentsWriterPerThread next : perThreadPool.filterAndLock(dwpt -> dwpt.deleteQueue == flushingQueue)) { + try { + assert next.deleteQueue == flushingQueue + || next.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " + + flushingQueue + + " currentqueue: " + + documentsWriter.deleteQueue + + " perThread queue: " + + next.deleteQueue + + " numDocsInRam: " + next.getNumDocsInRAM(); + + if (next.getNumDocsInRAM() > 0) { + final DocumentsWriterPerThread flushingDWPT; + synchronized(this) { + if (next.isFlushPending() == false) { + setFlushPending(next); + } + flushingDWPT = checkOutForFlush(next); + } + assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; + assert next == flushingDWPT : "flushControl returned different DWPT"; + fullFlushBuffer.add(flushingDWPT); + } else { + // it's possible that we get a DWPT with 0 docs if we flush concurrently to + // threads getting DWPTs from the pool. In this case we simply remove it from + // the pool and drop it on the floor. + boolean checkout = perThreadPool.checkout(next); + assert checkout; + } + } finally { + next.unlock(); } - assert next.dwpt.deleteQueue == flushingQueue - || next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: " - + flushingQueue - + " currentqueue: " - + documentsWriter.deleteQueue - + " perThread queue: " - + next.dwpt.deleteQueue - + " numDocsInRam: " + next.dwpt.getNumDocsInRAM(); - if (next.dwpt.deleteQueue != flushingQueue) { - // this one is already a new DWPT - continue; - } - addFlushableState(next); - } finally { - next.unlock(); - } } synchronized (this) { /* make sure we move all DWPT that are where concurrently marked as @@ -535,67 +526,33 @@ final class DocumentsWriterFlushControl implements Accountable { pruneBlockedQueue(flushingQueue); assert assertBlockedFlushes(documentsWriter.deleteQueue); flushQueue.addAll(fullFlushBuffer); - fullFlushBuffer.clear(); updateStallState(); + fullFlushMarkDone = true; // at this point we must have collected all DWPTs that belong to the old delete queue } assert assertActiveDeleteQueue(documentsWriter.deleteQueue); return seqNo; } private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) { - final int limit = perThreadPool.getActiveThreadStateCount(); - for (int i = 0; i < limit; i++) { - final ThreadState next = perThreadPool.getThreadState(i); - next.lock(); - try { - assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ; - } finally { - next.unlock(); - } + for (final DocumentsWriterPerThread next : perThreadPool) { + assert next.deleteQueue == queue : "numDocs: " + next.getNumDocsInRAM(); } return true; } - private final List fullFlushBuffer = new ArrayList<>(); - - void addFlushableState(ThreadState perThread) { - if (infoStream.isEnabled("DWFC")) { - infoStream.message("DWFC", "addFlushableState " + perThread.dwpt); - } - final DocumentsWriterPerThread dwpt = perThread.dwpt; - assert perThread.isHeldByCurrentThread(); - assert perThread.isInitialized(); - assert fullFlush; - assert dwpt.deleteQueue != documentsWriter.deleteQueue; - if (dwpt.getNumDocsInRAM() > 0) { - synchronized(this) { - if (!perThread.flushPending) { - setFlushPending(perThread); - } - final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread); - assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents"; - assert dwpt == flushingDWPT : "flushControl returned different DWPT"; - fullFlushBuffer.add(flushingDWPT); - } - } else { - perThreadPool.reset(perThread); // make this state inactive - } - } - /** - * Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue. + * Prunes the blockedQueue by removing all DWPTs that are associated with the given flush queue. */ private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) { - Iterator iterator = blockedFlushes.iterator(); + assert Thread.holdsLock(this); + Iterator iterator = blockedFlushes.iterator(); while (iterator.hasNext()) { - BlockedFlush blockedFlush = iterator.next(); - if (blockedFlush.dwpt.deleteQueue == flushingQueue) { + DocumentsWriterPerThread blockedFlush = iterator.next(); + if (blockedFlush.deleteQueue == flushingQueue) { iterator.remove(); - assert !flushingWriters.containsKey(blockedFlush.dwpt) : "DWPT is already flushing"; - // Record the flushing DWPT to reduce flushBytes in doAfterFlush - flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes)); + addFlushingDWPT(blockedFlush); // don't decr pending here - it's already done when DWPT is blocked - flushQueue.add(blockedFlush.dwpt); + flushQueue.add(blockedFlush); } } } @@ -611,14 +568,15 @@ final class DocumentsWriterFlushControl implements Accountable { assert blockedFlushes.isEmpty(); } } finally { - fullFlush = false; + fullFlushMarkDone = fullFlush = false; + updateStallState(); } } boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) { - for (BlockedFlush blockedFlush : blockedFlushes) { - assert blockedFlush.dwpt.deleteQueue == flushingQueue; + for (DocumentsWriterPerThread blockedFlush : blockedFlushes) { + assert blockedFlush.deleteQueue == flushingQueue; } return true; } @@ -627,7 +585,7 @@ final class DocumentsWriterFlushControl implements Accountable { try { abortPendingFlushes(); } finally { - fullFlush = false; + fullFlushMarkDone = fullFlush = false; } } @@ -643,15 +601,15 @@ final class DocumentsWriterFlushControl implements Accountable { doAfterFlush(dwpt); } } - for (BlockedFlush blockedFlush : blockedFlushes) { + for (DocumentsWriterPerThread blockedFlush : blockedFlushes) { try { - flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes)); - documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM()); - blockedFlush.dwpt.abort(); + addFlushingDWPT(blockedFlush); // add the blockedFlushes for correct accounting in doAfterFlush + documentsWriter.subtractFlushedNumDocs(blockedFlush.getNumDocsInRAM()); + blockedFlush.abort(); } catch (Exception ex) { // that's fine we just abort everything here this is best effort } finally { - doAfterFlush(blockedFlush.dwpt); + doAfterFlush(blockedFlush); } } } finally { @@ -685,16 +643,6 @@ final class DocumentsWriterFlushControl implements Accountable { return blockedFlushes.size(); } - private static class BlockedFlush { - final DocumentsWriterPerThread dwpt; - final long bytes; - BlockedFlush(DocumentsWriterPerThread dwpt, long bytes) { - super(); - this.dwpt = dwpt; - this.bytes = bytes; - } - } - /** * This method will block if too many DWPT are currently flushing and no * checked out DWPT are available @@ -717,51 +665,45 @@ final class DocumentsWriterFlushControl implements Accountable { return infoStream; } - synchronized ThreadState findLargestNonPendingWriter() { - ThreadState maxRamUsingThreadState = null; + synchronized DocumentsWriterPerThread findLargestNonPendingWriter() { + DocumentsWriterPerThread maxRamUsingWriter = null; long maxRamSoFar = 0; - Iterator activePerThreadsIterator = allActiveThreadStates(); int count = 0; - while (activePerThreadsIterator.hasNext()) { - ThreadState next = activePerThreadsIterator.next(); - if (!next.flushPending) { - final long nextRam = next.bytesUsed; - if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) { - if (infoStream.isEnabled("FP")) { - infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM()); - } - count++; - if (nextRam > maxRamSoFar) { - maxRamSoFar = nextRam; - maxRamUsingThreadState = next; - } + for (DocumentsWriterPerThread next : perThreadPool) { + if (next.isFlushPending() == false && next.getNumDocsInRAM() > 0) { + final long nextRam = next.bytesUsed(); + if (infoStream.isEnabled("FP")) { + infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.getNumDocsInRAM()); + } + count++; + if (nextRam > maxRamSoFar) { + maxRamSoFar = nextRam; + maxRamUsingWriter = next; } } } if (infoStream.isEnabled("FP")) { infoStream.message("FP", count + " in-use non-flushing threads states"); } - return maxRamUsingThreadState; + return maxRamUsingWriter; } /** * Returns the largest non-pending flushable DWPT or null if there is none. */ final DocumentsWriterPerThread checkoutLargestNonPendingWriter() { - ThreadState largestNonPendingWriter = findLargestNonPendingWriter(); + DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter(); if (largestNonPendingWriter != null) { // we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue largestNonPendingWriter.lock(); try { - synchronized (this) { - try { - if (largestNonPendingWriter.isInitialized() == false) { - return nextPendingFlush(); - } else { + if (perThreadPool.isRegistered(largestNonPendingWriter)) { + synchronized (this) { + try { return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false); + } finally { + updateStallState(); } - } finally { - updateStallState(); } } } finally { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index a5f79af3308..73b64f05f7a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -24,6 +24,7 @@ import java.util.Locale; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; @@ -41,6 +42,7 @@ import org.apache.lucene.util.Counter; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.IntBlockPool; +import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; @@ -156,6 +158,9 @@ final class DocumentsWriterPerThread { final BufferedUpdates pendingUpdates; final SegmentInfo segmentInfo; // Current segment we are working on private boolean aborted = false; // True if we aborted + private SetOnce flushPending = new SetOnce<>(); + private volatile long lastCommittedBytesUsed; + private SetOnce hasFlushed = new SetOnce<>(); private final FieldInfos.Builder fieldInfos; private final InfoStream infoStream; @@ -169,6 +174,7 @@ final class DocumentsWriterPerThread { private final LiveIndexWriterConfig indexWriterConfig; private final boolean enableTestPoints; private final int indexVersionCreated; + private final ReentrantLock lock = new ReentrantLock(); public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue, FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException { @@ -350,6 +356,7 @@ final class DocumentsWriterPerThread { /** Flush all pending docs to a new segment */ FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException { + assert flushPending.get() == Boolean.TRUE; assert numDocsInRAM > 0; assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush"; segmentInfo.setMaxDoc(numDocsInRAM); @@ -445,6 +452,7 @@ final class DocumentsWriterPerThread { throw t; } finally { maybeAbort("flush", flushNotifications); + hasFlushed.set(Boolean.TRUE); } } @@ -600,5 +608,81 @@ final class DocumentsWriterPerThread { + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM=" + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]"; } - + + + /** + * Returns true iff this DWPT is marked as flush pending + */ + boolean isFlushPending() { + return flushPending.get() == Boolean.TRUE; + } + + /** + * Sets this DWPT as flush pending. This can only be set once. + */ + void setFlushPending() { + flushPending.set(Boolean.TRUE); + } + + + /** + * Returns the last committed bytes for this DWPT. This method can be called + * without acquiring the DWPTs lock. + */ + long getLastCommittedBytesUsed() { + return lastCommittedBytesUsed; + } + + /** + * Commits the current {@link #bytesUsed()} and stores it's value for later reuse. + * The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()} + * @return the delta between the current {@link #bytesUsed()} and the current {@link #getLastCommittedBytesUsed()} + */ + long commitLastBytesUsed() { + assert isHeldByCurrentThread(); + long delta = bytesUsed() - lastCommittedBytesUsed; + lastCommittedBytesUsed += delta; + return delta; + } + + /** + * Locks this DWPT for exclusive access. + * @see ReentrantLock#lock() + */ + void lock() { + lock.lock(); + } + + /** + * Acquires the DWPT's lock only if it is not held by another thread at the time + * of invocation. + * @return true if the lock was acquired. + * @see ReentrantLock#tryLock() + */ + boolean tryLock() { + return lock.tryLock(); + } + + /** + * Returns true if the DWPT's lock is held by the current thread + * @see ReentrantLock#isHeldByCurrentThread() + */ + boolean isHeldByCurrentThread() { + return lock.isHeldByCurrentThread(); + } + + /** + * Unlocks the DWPT's lock + * @see ReentrantLock#unlock() + */ + void unlock() { + lock.unlock(); + } + + /** + * Returns true iff this DWPT has been flushed + */ + boolean hasFlushed() { + return hasFlushed.get() == Boolean.TRUE; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 7fafd418783..ce6956c55fb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -16,228 +16,176 @@ */ package org.apache.lucene.index; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Set; +import java.util.function.Predicate; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.ThreadInterruptedException; /** - * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances - * and their thread assignments during indexing. Each {@link ThreadState} holds - * a reference to a {@link DocumentsWriterPerThread} that is once a - * {@link ThreadState} is obtained from the pool exclusively used for indexing a - * single document by the obtaining thread. Each indexing thread must obtain - * such a {@link ThreadState} to make progress. Depending on the - * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState} + * {@link DocumentsWriterPerThreadPool} controls {@link DocumentsWriterPerThread} instances + * and their thread assignments during indexing. Each {@link DocumentsWriterPerThread} is once a + * obtained from the pool exclusively used for indexing a + * single document or list of documents by the obtaining thread. Each indexing thread must obtain + * such a {@link DocumentsWriterPerThread} to make progress. Depending on the + * {@link DocumentsWriterPerThreadPool} implementation {@link DocumentsWriterPerThread} * assignments might differ from document to document. *

- * Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool - * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a - * new {@link DocumentsWriterPerThread} instance. + * Once a {@link DocumentsWriterPerThread} is selected for flush the {@link DocumentsWriterPerThread} will + * be checked out of the thread pool and won't be reused for indexing. See {@link #checkout(DocumentsWriterPerThread)}. *

*/ -final class DocumentsWriterPerThreadPool { - - /** - * {@link ThreadState} references and guards a - * {@link DocumentsWriterPerThread} instance that is used during indexing to - * build a in-memory index segment. {@link ThreadState} also holds all flush - * related per-thread data controlled by {@link DocumentsWriterFlushControl}. - *

- * A {@link ThreadState}, its methods and members should only accessed by one - * thread a time. Users must acquire the lock via {@link ThreadState#lock()} - * and release the lock in a finally block via {@link ThreadState#unlock()} - * before accessing the state. - */ - @SuppressWarnings("serial") - final static class ThreadState extends ReentrantLock { - DocumentsWriterPerThread dwpt; - // TODO this should really be part of DocumentsWriterFlushControl - // write access guarded by DocumentsWriterFlushControl - volatile boolean flushPending = false; - // TODO this should really be part of DocumentsWriterFlushControl - // write access guarded by DocumentsWriterFlushControl - long bytesUsed = 0; +final class DocumentsWriterPerThreadPool implements Iterable, Closeable { - // set by DocumentsWriter after each indexing op finishes - volatile long lastSeqNo; + private final Set dwpts = Collections.newSetFromMap(new IdentityHashMap<>()); + private final Deque freeList = new ArrayDeque<>(); + private final IOSupplier dwptFactory; + private int takenWriterPermits = 0; + private boolean closed; - ThreadState(DocumentsWriterPerThread dpwt) { - this.dwpt = dpwt; - } - - private void reset() { - assert this.isHeldByCurrentThread(); - this.dwpt = null; - this.bytesUsed = 0; - this.flushPending = false; - } - - boolean isInitialized() { - assert this.isHeldByCurrentThread(); - return dwpt != null; - } - - /** - * Returns the number of currently active bytes in this ThreadState's - * {@link DocumentsWriterPerThread} - */ - public long getBytesUsedPerThread() { - assert this.isHeldByCurrentThread(); - // public for FlushPolicy - return bytesUsed; - } - - /** - * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread} - */ - public DocumentsWriterPerThread getDocumentsWriterPerThread() { - assert this.isHeldByCurrentThread(); - // public for FlushPolicy - return dwpt; - } - - /** - * Returns true iff this {@link ThreadState} is marked as flush - * pending otherwise false - */ - public boolean isFlushPending() { - return flushPending; - } + + DocumentsWriterPerThreadPool(IOSupplier dwptFactory) { + this.dwptFactory = dwptFactory; } - private final List threadStates = new ArrayList<>(); - - private final List freeList = new ArrayList<>(); - - private int takenThreadStatePermits = 0; - /** - * Returns the active number of {@link ThreadState} instances. + * Returns the active number of {@link DocumentsWriterPerThread} instances. */ - synchronized int getActiveThreadStateCount() { - return threadStates.size(); + synchronized int size() { + return dwpts.size(); } - synchronized void lockNewThreadStates() { - // this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0 - // any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some + synchronized void lockNewWriters() { + // this is similar to a semaphore - we need to acquire all permits ie. takenWriterPermits must be == 0 + // any call to lockNewWriters() must be followed by unlockNewWriters() otherwise we will deadlock at some // point - assert takenThreadStatePermits >= 0; - takenThreadStatePermits++; + assert takenWriterPermits >= 0; + takenWriterPermits++; } - synchronized void unlockNewThreadStates() { - assert takenThreadStatePermits > 0; - takenThreadStatePermits--; - if (takenThreadStatePermits == 0) { + synchronized void unlockNewWriters() { + assert takenWriterPermits > 0; + takenWriterPermits--; + if (takenWriterPermits == 0) { notifyAll(); } } + /** - * Returns a new {@link ThreadState} iff any new state is available otherwise - * null. - *

- * NOTE: the returned {@link ThreadState} is already locked iff non- - * null. - * - * @return a new {@link ThreadState} iff any new state is available otherwise - * null + * Returns a new already locked {@link DocumentsWriterPerThread} + * + * @return a new {@link DocumentsWriterPerThread} */ - private synchronized ThreadState newThreadState() { - assert takenThreadStatePermits >= 0; - while (takenThreadStatePermits > 0) { - // we can't create new thread-states while not all permits are available + private synchronized DocumentsWriterPerThread newWriter() throws IOException { + assert takenWriterPermits >= 0; + while (takenWriterPermits > 0) { + // we can't create new DWPTs while not all permits are available try { wait(); } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } } - ThreadState threadState = new ThreadState(null); - threadState.lock(); // lock so nobody else will get this ThreadState - threadStates.add(threadState); - return threadState; -} - - DocumentsWriterPerThread reset(ThreadState threadState) { - assert threadState.isHeldByCurrentThread(); - final DocumentsWriterPerThread dwpt = threadState.dwpt; - threadState.reset(); + DocumentsWriterPerThread dwpt = dwptFactory.get(); + dwpt.lock(); // lock so nobody else will get this DWPT + dwpts.add(dwpt); return dwpt; } - - void recycle(DocumentsWriterPerThread dwpt) { - // don't recycle DWPT by default - } // TODO: maybe we should try to do load leveling here: we want roughly even numbers // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing - /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */ - ThreadState getAndLock() { - ThreadState threadState = null; + /** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */ + DocumentsWriterPerThread getAndLock() throws IOException { synchronized (this) { - if (freeList.isEmpty()) { - // ThreadState is already locked before return by this method: - return newThreadState(); - } else { - // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a - // limited number of thread states: - threadState = freeList.remove(freeList.size()-1); + if (closed) { + throw new AlreadyClosedException("DWPTPool is already closed"); + } + // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, + // but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we have suddenly + // a single thread indexing + final Iterator descendingIterator = freeList.descendingIterator(); + while (descendingIterator.hasNext()) { + DocumentsWriterPerThread perThread = descendingIterator.next(); + if (perThread.tryLock()) { + descendingIterator.remove(); + return perThread; + } + } + // DWPT is already locked before return by this method: + return newWriter(); + } + } - if (threadState.dwpt == null) { - // This thread-state is not initialized, e.g. it - // was just flushed. See if we can instead find - // another free thread state that already has docs - // indexed. This way if incoming thread concurrency - // has decreased, we don't leave docs - // indefinitely buffered, tying up RAM. This - // will instead get those thread states flushed, - // freeing up RAM for larger segment flushes: - for(int i=0;i iterator() { + return List.copyOf(dwpts).iterator(); // copy on read - this is a quick op since num states is low + } + + /** + * Filters all DWPTs the given predicate applies to and that can be checked out of the pool via + * {@link #checkout(DocumentsWriterPerThread)}. All DWPTs returned from this method are already locked + * and {@link #isRegistered(DocumentsWriterPerThread)} will return true for all returned DWPTs + */ + List filterAndLock(Predicate predicate) { + List list = new ArrayList<>(); + for (DocumentsWriterPerThread perThread : this) { + if (predicate.test(perThread)) { + perThread.lock(); + if (isRegistered(perThread)) { + list.add(perThread); + } else { + // somebody else has taken this DWPT out of the pool. + // unlock and let it go + perThread.unlock(); } } } - - // This could take time, e.g. if the threadState is [briefly] checked for flushing: - threadState.lock(); - - return threadState; + return Collections.unmodifiableList(list); } - void release(ThreadState state) { - state.unlock(); - synchronized (this) { - freeList.add(state); - } - } - /** - * Returns the ith active {@link ThreadState} where i is the - * given ord. - * - * @param ord - * the ordinal of the {@link ThreadState} - * @return the ith active {@link ThreadState} where i is the - * given ord. + * Removes the given DWPT from the pool unless it's already been removed before. + * @return true iff the given DWPT has been removed. Otherwise false */ - synchronized ThreadState getThreadState(int ord) { - return threadStates.get(ord); + synchronized boolean checkout(DocumentsWriterPerThread perThread) { + assert perThread.isHeldByCurrentThread(); + if (dwpts.remove(perThread)) { + freeList.remove(perThread); + } else { + assert freeList.contains(perThread) == false; + return false; + } + return true; } - // TODO: merge this with getActiveThreadStateCount: they are the same! - synchronized int getMaxThreadStates() { - return threadStates.size(); + /** + * Returns true if this DWPT is still part of the pool + */ + synchronized boolean isRegistered(DocumentsWriterPerThread perThread) { + return dwpts.contains(perThread); + } + + @Override + public synchronized void close() { + this.closed = true; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java index c46e3d27c23..30a9277d16d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java @@ -19,7 +19,6 @@ package org.apache.lucene.index; import java.util.IdentityHashMap; import java.util.Map; -import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.util.ThreadInterruptedException; /** @@ -32,9 +31,9 @@ import org.apache.lucene.util.ThreadInterruptedException; *

* To prevent OOM Errors and ensure IndexWriter's stability this class blocks * incoming threads from indexing once 2 x number of available - * {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded. + * {@link DocumentsWriterPerThread}s in {@link DocumentsWriterPerThreadPool} is exceeded. * Once flushing catches up and the number of flushing DWPT is equal or lower - * than the number of active {@link ThreadState}s threads are released and can + * than the number of active {@link DocumentsWriterPerThread}s threads are released and can * continue indexing. */ final class DocumentsWriterStallControl { diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java index 6620aefa7e2..ed540a494da 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java @@ -17,8 +17,6 @@ package org.apache.lucene.index; -import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; - /** * Default {@link FlushPolicy} implementation that flushes new segments based on * RAM used and document count depending on the IndexWriter's @@ -27,11 +25,11 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; * *

    *
  • - * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} + * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)} * - applies pending delete operations based on the global number of buffered * delete terms if the consumed memory is greater than {@link IndexWriterConfig#getRAMBufferSizeMB()}
  • . *
  • - * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} + * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)} * - flushes either on the number of documents per * {@link DocumentsWriterPerThread} ( * {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active @@ -39,11 +37,11 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; * {@link IndexWriterConfig#getMaxBufferedDocs()} or * {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively
  • *
  • - * {@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} + * {@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThread)} * - calls - * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} + * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)} * and - * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)} + * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)} * in order
  • *
* All {@link IndexWriterConfig} settings are used to mark @@ -58,7 +56,7 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; class FlushByRamOrCountsPolicy extends FlushPolicy { @Override - public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { + public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { if ((flushOnRAM() && control.getDeleteBytesUsed() > 1024*1024*indexWriterConfig.getRAMBufferSizeMB())) { control.setApplyAllDeletes(); if (infoStream.isEnabled("FP")) { @@ -68,12 +66,12 @@ class FlushByRamOrCountsPolicy extends FlushPolicy { } @Override - public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { + public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { if (flushOnDocCount() - && state.dwpt.getNumDocsInRAM() >= indexWriterConfig + && perThread.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { // Flush this state by num docs - control.setFlushPending(state); + control.setFlushPending(perThread); } else if (flushOnRAM()) {// flush by RAM final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); @@ -81,7 +79,7 @@ class FlushByRamOrCountsPolicy extends FlushPolicy { if (infoStream.isEnabled("FP")) { infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); } - markLargestWriterPending(control, state, totalRam); + markLargestWriterPending(control, perThread); } } } @@ -91,8 +89,8 @@ class FlushByRamOrCountsPolicy extends FlushPolicy { * pending */ protected void markLargestWriterPending(DocumentsWriterFlushControl control, - ThreadState perThreadState, final long currentBytesPerThread) { - ThreadState largestNonPendingWriter = findLargestNonPendingWriter(control, perThreadState); + DocumentsWriterPerThread perThread) { + DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter(control, perThread); if (largestNonPendingWriter != null) { control.setFlushPending(largestNonPendingWriter); } diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java index 3f641006d54..71402490afd 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java @@ -16,7 +16,6 @@ */ package org.apache.lucene.index; -import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.store.Directory; import org.apache.lucene.util.InfoStream; @@ -35,13 +34,12 @@ import org.apache.lucene.util.InfoStream; * {@link IndexWriter} consults the provided {@link FlushPolicy} to control the * flushing process. The policy is informed for each added or updated document * as well as for each delete term. Based on the {@link FlushPolicy}, the - * information provided via {@link ThreadState} and + * information provided via {@link DocumentsWriterPerThread} and * {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a * {@link DocumentsWriterPerThread} needs flushing and mark it as flush-pending * via {@link DocumentsWriterFlushControl#setFlushPending}, or if deletes need * to be applied. * - * @see ThreadState * @see DocumentsWriterFlushControl * @see DocumentsWriterPerThread * @see IndexWriterConfig#setFlushPolicy(FlushPolicy) @@ -52,38 +50,38 @@ abstract class FlushPolicy { /** * Called for each delete term. If this is a delete triggered due to an update - * the given {@link ThreadState} is non-null. + * the given {@link DocumentsWriterPerThread} is non-null. *

* Note: This method is called synchronized on the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling - * thread holds the lock on the given {@link ThreadState} + * thread holds the lock on the given {@link DocumentsWriterPerThread} */ public abstract void onDelete(DocumentsWriterFlushControl control, - ThreadState state); + DocumentsWriterPerThread perThread); /** - * Called for each document update on the given {@link ThreadState}'s + * Called for each document update on the given {@link DocumentsWriterPerThread}'s * {@link DocumentsWriterPerThread}. *

* Note: This method is called synchronized on the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling - * thread holds the lock on the given {@link ThreadState} + * thread holds the lock on the given {@link DocumentsWriterPerThread} */ - public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) { - onInsert(control, state); - onDelete(control, state); + public void onUpdate(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { + onInsert(control, perThread); + onDelete(control, perThread); } /** - * Called for each document addition on the given {@link ThreadState}s + * Called for each document addition on the given {@link DocumentsWriterPerThread}s * {@link DocumentsWriterPerThread}. *

* Note: This method is synchronized by the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling - * thread holds the lock on the given {@link ThreadState} + * thread holds the lock on the given {@link DocumentsWriterPerThread} */ public abstract void onInsert(DocumentsWriterFlushControl control, - ThreadState state); + DocumentsWriterPerThread perThread); /** * Called by DocumentsWriter to initialize the FlushPolicy @@ -94,18 +92,18 @@ abstract class FlushPolicy { } /** - * Returns the current most RAM consuming non-pending {@link ThreadState} with + * Returns the current most RAM consuming non-pending {@link DocumentsWriterPerThread} with * at least one indexed document. *

* This method will never return null */ - protected ThreadState findLargestNonPendingWriter( - DocumentsWriterFlushControl control, ThreadState perThreadState) { - assert perThreadState.dwpt.getNumDocsInRAM() > 0; + protected DocumentsWriterPerThread findLargestNonPendingWriter( + DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { + assert perThread.getNumDocsInRAM() > 0; // the dwpt which needs to be flushed eventually - ThreadState maxRamUsingThreadState = control.findLargestNonPendingWriter(); + DocumentsWriterPerThread maxRamUsingWriter = control.findLargestNonPendingWriter(); assert assertMessage("set largest ram consuming thread pending on lower watermark"); - return maxRamUsingThreadState; + return maxRamUsingWriter; } private boolean assertMessage(String s) { diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 4323c323e68..95592cadfa6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -2465,7 +2465,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, globalFieldNumberMap.clear(); success = true; long seqNo = docWriter.deleteQueue.getNextSequenceNumber(); - docWriter.setLastSeqNo(seqNo); return seqNo; } finally { if (success == false) { @@ -4947,7 +4946,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, // finishStartCommit // startCommitMergeDeletes // startMergeInit - // DocumentsWriter.ThreadState.init start + // DocumentsWriterPerThread addDocuments start private final void testPoint(String message) { if (enableTestPoints) { assert infoStream.isEnabled("TP"); // don't enable unless you need them. diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java index 3edc9fca631..4cdc9c02529 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java @@ -312,28 +312,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig { return mergePolicy; } - /** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the - * IndexWriter to assign thread-states to incoming indexing threads. - *

- * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with - * other {@link IndexWriter} instances once it has been initialized / associated with an - * {@link IndexWriter}. - *

- *

- * NOTE: This only takes effect when IndexWriter is first created.

*/ - IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) { - if (threadPool == null) { - throw new IllegalArgumentException("threadPool must not be null"); - } - this.indexerThreadPool = threadPool; - return this; - } - - @Override - DocumentsWriterPerThreadPool getIndexerThreadPool() { - return indexerThreadPool; - } - /** By default, IndexWriter does not pool the * SegmentReaders it must open for deletions and * merging, unless a near-real-time reader has been diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java index fb4bba7cf6f..1f48acc8d5f 100644 --- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java +++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java @@ -80,10 +80,6 @@ public class LiveIndexWriterConfig { /** {@link MergePolicy} for selecting merges. */ protected volatile MergePolicy mergePolicy; - /** {@code DocumentsWriterPerThreadPool} to control how - * threads are allocated to {@code DocumentsWriterPerThread}. */ - protected volatile DocumentsWriterPerThreadPool indexerThreadPool; - /** True if readers should be pooled. */ protected volatile boolean readerPooling; @@ -135,7 +131,6 @@ public class LiveIndexWriterConfig { mergePolicy = new TieredMergePolicy(); flushPolicy = new FlushByRamOrCountsPolicy(); readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; - indexerThreadPool = new DocumentsWriterPerThreadPool(); perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; } @@ -347,16 +342,6 @@ public class LiveIndexWriterConfig { return mergePolicy; } - /** - * Returns the configured {@link DocumentsWriterPerThreadPool} instance. - * - * @see IndexWriterConfig#setIndexerThreadPool(DocumentsWriterPerThreadPool) - * @return the configured {@link DocumentsWriterPerThreadPool} instance. - */ - DocumentsWriterPerThreadPool getIndexerThreadPool() { - return indexerThreadPool; - } - /** * Returns {@code true} if {@link IndexWriter} should pool readers even if * {@link DirectoryReader#open(IndexWriter)} has not been called. @@ -492,7 +477,6 @@ public class LiveIndexWriterConfig { sb.append("codec=").append(getCodec()).append("\n"); sb.append("infoStream=").append(getInfoStream().getClass().getName()).append("\n"); sb.append("mergePolicy=").append(getMergePolicy()).append("\n"); - sb.append("indexerThreadPool=").append(getIndexerThreadPool()).append("\n"); sb.append("readerPooling=").append(getReaderPooling()).append("\n"); sb.append("perThreadHardLimitMB=").append(getRAMPerThreadHardLimitMB()).append("\n"); sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n"); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java index 1184a887023..0a8f3a88ab2 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; -import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LineFileDocs; @@ -70,8 +69,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { IndexWriterConfig iwc = newIndexWriterConfig(analyzer) .setFlushPolicy(flushPolicy); - DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(); - iwc.setIndexerThreadPool(threadPool); iwc.setRAMBufferSizeMB(maxRamMB); iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); IndexWriter writer = new IndexWriter(dir, iwc); @@ -125,8 +122,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { IndexWriterConfig iwc = newIndexWriterConfig(analyzer) .setFlushPolicy(flushPolicy); - DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(); - iwc.setIndexerThreadPool(threadPool); iwc.setMaxBufferedDocs(2 + atLeast(10)); iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); IndexWriter writer = new IndexWriter(dir, iwc); @@ -173,9 +168,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy(); iwc.setFlushPolicy(flushPolicy); - DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(); - iwc.setIndexerThreadPool(threadPool); - IndexWriter writer = new IndexWriter(dir, iwc); flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy(); DocumentsWriter docsWriter = writer.getDocsWriter(); @@ -237,8 +229,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy(); iwc.setFlushPolicy(flushPolicy); - DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(); - iwc.setIndexerThreadPool(threadPool); // with such a small ram buffer we should be stalled quite quickly iwc.setRAMBufferSizeMB(0.25); IndexWriter writer = new IndexWriter(dir, iwc); @@ -273,13 +263,11 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { } protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) { - Iterator allActiveThreads = flushControl.allActiveThreadStates(); + Iterator allActiveWriter = flushControl.allActiveWriters(); long bytesUsed = 0; - while (allActiveThreads.hasNext()) { - ThreadState next = allActiveThreads.next(); - if (next.dwpt != null) { - bytesUsed += next.dwpt.bytesUsed(); - } + while (allActiveWriter.hasNext()) { + DocumentsWriterPerThread next = allActiveWriter.next(); + bytesUsed += next.bytesUsed(); } assertEquals(bytesUsed, flushControl.activeBytes()); } @@ -332,81 +320,81 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { boolean hasMarkedPending = false; @Override - public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { - final ArrayList pending = new ArrayList<>(); - final ArrayList notPending = new ArrayList<>(); + public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) { + final ArrayList pending = new ArrayList<>(); + final ArrayList notPending = new ArrayList<>(); findPending(control, pending, notPending); - final boolean flushCurrent = state.flushPending; - final ThreadState toFlush; - if (state.flushPending) { - toFlush = state; + final boolean flushCurrent = perThread.isFlushPending(); + final DocumentsWriterPerThread toFlush; + if (perThread.isFlushPending()) { + toFlush = perThread; } else { toFlush = null; } - super.onDelete(control, state); + super.onDelete(control, perThread); if (toFlush != null) { if (flushCurrent) { assertTrue(pending.remove(toFlush)); } else { assertTrue(notPending.remove(toFlush)); } - assertTrue(toFlush.flushPending); + assertTrue(toFlush.isFlushPending()); hasMarkedPending = true; } - for (ThreadState threadState : notPending) { - assertFalse(threadState.flushPending); + for (DocumentsWriterPerThread dwpt : notPending) { + assertFalse(dwpt.isFlushPending()); } } @Override - public void onInsert(DocumentsWriterFlushControl control, ThreadState state) { - final ArrayList pending = new ArrayList<>(); - final ArrayList notPending = new ArrayList<>(); + public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread dwpt) { + final ArrayList pending = new ArrayList<>(); + final ArrayList notPending = new ArrayList<>(); findPending(control, pending, notPending); - final boolean flushCurrent = state.flushPending; + final boolean flushCurrent = dwpt.isFlushPending(); long activeBytes = control.activeBytes(); - final ThreadState toFlush; - if (state.flushPending) { - toFlush = state; + final DocumentsWriterPerThread toFlush; + if (dwpt.isFlushPending()) { + toFlush = dwpt; } else if (flushOnDocCount() - && state.dwpt.getNumDocsInRAM() >= indexWriterConfig + && dwpt.getNumDocsInRAM() >= indexWriterConfig .getMaxBufferedDocs()) { - toFlush = state; + toFlush = dwpt; } else if (flushOnRAM() && activeBytes >= (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024. * 1024.)) { - toFlush = findLargestNonPendingWriter(control, state); - assertFalse(toFlush.flushPending); + toFlush = findLargestNonPendingWriter(control, dwpt); + assertFalse(toFlush.isFlushPending()); } else { toFlush = null; } - super.onInsert(control, state); + super.onInsert(control, dwpt); if (toFlush != null) { if (flushCurrent) { assertTrue(pending.remove(toFlush)); } else { assertTrue(notPending.remove(toFlush)); } - assertTrue(toFlush.flushPending); + assertTrue(toFlush.isFlushPending()); hasMarkedPending = true; } else { peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush); - peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(), + peakDocCountWithoutFlush = Math.max(dwpt.getNumDocsInRAM(), peakDocCountWithoutFlush); } - for (ThreadState threadState : notPending) { - assertFalse(threadState.flushPending); + for (DocumentsWriterPerThread perThread : notPending) { + assertFalse(perThread.isFlushPending()); } } } static void findPending(DocumentsWriterFlushControl flushControl, - ArrayList pending, ArrayList notPending) { - Iterator allActiveThreads = flushControl.allActiveThreadStates(); + ArrayList pending, ArrayList notPending) { + Iterator allActiveThreads = flushControl.allActiveWriters(); while (allActiveThreads.hasNext()) { - ThreadState next = allActiveThreads.next(); - if (next.flushPending) { + DocumentsWriterPerThread next = allActiveThreads.next(); + if (next.isFlushPending()) { pending.add(next); } else { notPending.add(next); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index e3e6bbec13d..0225f1eed9a 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -22,7 +22,9 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; +import java.io.PrintWriter; import java.io.StringReader; +import java.io.StringWriter; import java.net.URI; import java.nio.file.FileSystem; import java.nio.file.Files; @@ -2918,16 +2920,15 @@ public class TestIndexWriter extends LuceneTestCase { public void testFlushLargestWriter() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); - int numDocs = indexDocsForMultipleThreadStates(w); - DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter + int numDocs = indexDocsForMultipleDWPTs(w); + DocumentsWriterPerThread largestNonPendingWriter = w.docWriter.flushControl.findLargestNonPendingWriter(); - assertFalse(largestNonPendingWriter.flushPending); - assertNotNull(largestNonPendingWriter.dwpt); + assertFalse(largestNonPendingWriter.isFlushPending()); int numRamDocs = w.numRamDocs(); - int numDocsInDWPT = largestNonPendingWriter.dwpt.getNumDocsInRAM(); + int numDocsInDWPT = largestNonPendingWriter.getNumDocsInRAM(); assertTrue(w.flushNextBuffer()); - assertNull(largestNonPendingWriter.dwpt); + assertTrue(largestNonPendingWriter.hasFlushed()); assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs()); // make sure it's not locked @@ -2943,7 +2944,7 @@ public class TestIndexWriter extends LuceneTestCase { dir.close(); } - private int indexDocsForMultipleThreadStates(IndexWriter w) throws InterruptedException { + private int indexDocsForMultipleDWPTs(IndexWriter w) throws InterruptedException { Thread[] threads = new Thread[3]; CountDownLatch latch = new CountDownLatch(threads.length); int numDocsPerThread = 10 + random().nextInt(30); @@ -2973,16 +2974,16 @@ public class TestIndexWriter extends LuceneTestCase { public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); - indexDocsForMultipleThreadStates(w); - DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter + indexDocsForMultipleDWPTs(w); + DocumentsWriterPerThread largestNonPendingWriter = w.docWriter.flushControl.findLargestNonPendingWriter(); - assertFalse(largestNonPendingWriter.flushPending); - assertNotNull(largestNonPendingWriter.dwpt); - int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); + assertFalse(largestNonPendingWriter.isFlushPending()); + assertFalse(largestNonPendingWriter.hasFlushed()); + int threadPoolSize = w.docWriter.perThreadPool.size(); w.docWriter.flushControl.markForFullFlush(); DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter(); assertNull(documentsWriterPerThread); - assertEquals(activeThreadStateCount, w.docWriter.flushControl.numQueuedFlushes()); + assertEquals(threadPoolSize, w.docWriter.flushControl.numQueuedFlushes()); w.docWriter.flushControl.abortFullFlushes(); assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter()); assertEquals(0, w.docWriter.flushControl.numQueuedFlushes()); @@ -2993,11 +2994,11 @@ public class TestIndexWriter extends LuceneTestCase { public void testHoldLockOnLargestWriter() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); - int numDocs = indexDocsForMultipleThreadStates(w); - DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter + int numDocs = indexDocsForMultipleDWPTs(w); + DocumentsWriterPerThread largestNonPendingWriter = w.docWriter.flushControl.findLargestNonPendingWriter(); - assertFalse(largestNonPendingWriter.flushPending); - assertNotNull(largestNonPendingWriter.dwpt); + assertFalse(largestNonPendingWriter.isFlushPending()); + assertFalse(largestNonPendingWriter.hasFlushed()); CountDownLatch wait = new CountDownLatch(1); CountDownLatch locked = new CountDownLatch(1); @@ -3030,7 +3031,7 @@ public class TestIndexWriter extends LuceneTestCase { lockThread.join(); flushThread.join(); - assertNull("largest DWPT should be flushed", largestNonPendingWriter.dwpt); + assertTrue("largest DWPT should be flushed", largestNonPendingWriter.hasFlushed()); // make sure it's not locked largestNonPendingWriter.lock(); largestNonPendingWriter.unlock(); @@ -3116,21 +3117,19 @@ public class TestIndexWriter extends LuceneTestCase { } private static void waitForDocsInBuffers(IndexWriter w, int buffersWithDocs) { - // wait until at least N threadstates have a doc in order to observe + // wait until at least N DWPTs have a doc in order to observe // who flushes the segments. while(true) { int numStatesWithDocs = 0; DocumentsWriterPerThreadPool perThreadPool = w.docWriter.perThreadPool; - for (int i = 0; i < perThreadPool.getActiveThreadStateCount(); i++) { - DocumentsWriterPerThreadPool.ThreadState threadState = perThreadPool.getThreadState(i); - threadState.lock(); + for (DocumentsWriterPerThread dwpt : perThreadPool) { + dwpt.lock(); try { - DocumentsWriterPerThread dwpt = threadState.dwpt; - if (dwpt != null && dwpt.getNumDocsInRAM() > 1) { + if (dwpt.getNumDocsInRAM() > 1) { numStatesWithDocs++; } } finally { - threadState.unlock(); + dwpt.unlock(); } } if (numStatesWithDocs >= buffersWithDocs) { @@ -3702,22 +3701,19 @@ public class TestIndexWriter extends LuceneTestCase { Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()); w.addDocument(new Document()); - int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); - assertEquals(1, activeThreadStateCount); + assertEquals(1, w.docWriter.perThreadPool.size()); CountDownLatch latch = new CountDownLatch(1); Thread thread = new Thread(() -> { latch.countDown(); List states = new ArrayList<>(); try { for (int i = 0; i < 100; i++) { - DocumentsWriterPerThreadPool.ThreadState state = w.docWriter.perThreadPool.getAndLock(); + DocumentsWriterPerThread state = w.docWriter.perThreadPool.getAndLock(); states.add(state::unlock); - if (state.isInitialized()) { - state.dwpt.deleteQueue.getNextSequenceNumber(); - } else { - w.docWriter.deleteQueue.getNextSequenceNumber(); - } + state.deleteQueue.getNextSequenceNumber(); } + } catch (IOException e) { + throw new AssertionError(e); } finally { IOUtils.closeWhileHandlingException(states); } @@ -3774,7 +3770,19 @@ public class TestIndexWriter extends LuceneTestCase { stopped.set(true); indexer.join(); refresher.join(); - assertNull("should not consider ACE a tragedy on a closed IW", w.getTragicException()); + Throwable e = w.getTragicException(); + IOSupplier supplier = () -> { + if (e != null) { + StringWriter writer = new StringWriter(); + try (PrintWriter printWriter = new PrintWriter(writer)) { + e.printStackTrace(printWriter); + } + return writer.toString(); + } else { + return ""; + } + }; + assertNull("should not consider ACE a tragedy on a closed IW: " + supplier.get(), w.getTragicException()); IOUtils.close(sm, dir); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java index 7238869328c..dffb1576c87 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java @@ -68,7 +68,6 @@ public class TestIndexWriterConfig extends LuceneTestCase { assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain()); assertNull(conf.getMergedSegmentWarmer()); assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass()); - assertEquals(DocumentsWriterPerThreadPool.class, conf.getIndexerThreadPool().getClass()); assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass()); assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB()); assertEquals(Codec.getDefault(), conf.getCodec()); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java index b32ea30840f..e879019897b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java @@ -1285,7 +1285,9 @@ public class TestIndexWriterDelete extends LuceneTestCase { } // First one triggers, but does not reflect, the merge: - System.out.println("TEST: now get reader"); + if (VERBOSE) { + System.out.println("TEST: now get reader"); + } DirectoryReader.open(w).close(); IndexReader r = DirectoryReader.open(w); assertEquals(1, r.leaves().size()); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java index b560e44ff23..07b6822c8dc 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java @@ -533,8 +533,8 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase { dir.close(); } - // LUCENE-1130: make sure immeidate disk full on creating - // an IndexWriter (hit during DW.ThreadState.init()) is + // LUCENE-1130: make sure immediate disk full on creating + // an IndexWriter (hit during DWPT#updateDocuments()) is // OK: public void testImmediateDiskFull() throws IOException { MockDirectoryWrapper dir = newMockDirectory(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java index c71c5091afe..3bf554ffa35 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java @@ -136,7 +136,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase { } // LUCENE-1130: make sure immediate disk full on creating - // an IndexWriter (hit during DW.ThreadState.init()), with + // an IndexWriter (hit during DWPT#updateDocuments()), with // multiple threads, is OK: public void testImmediateDiskFullWithThreads() throws Exception { diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java index 5c1a73fa2e1..e2915e06d2a 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java @@ -224,8 +224,8 @@ public class RandomIndexWriter implements Closeable { if (LuceneTestCase.VERBOSE) { System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount); } - int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount(); - int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1)); + int threadPoolSize = w.docWriter.perThreadPool.size(); + int numFlushes = Math.min(1, r.nextInt(threadPoolSize+1)); for (int i = 0; i < numFlushes; i++) { if (w.flushNextBuffer() == false) { break; // stop once we didn't flush anything diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java index 316796a698c..12d5075527c 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java @@ -267,17 +267,22 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { } public synchronized void corruptUnknownFiles() throws IOException { - - System.out.println("MDW: corrupt unknown files"); + if (LuceneTestCase.VERBOSE) { + System.out.println("MDW: corrupt unknown files"); + } Set knownFiles = new HashSet<>(); for(String fileName : listAll()) { if (fileName.startsWith(IndexFileNames.SEGMENTS)) { - System.out.println("MDW: read " + fileName + " to gather files it references"); + if (LuceneTestCase.VERBOSE) { + System.out.println("MDW: read " + fileName + " to gather files it references"); + } SegmentInfos infos; try { infos = SegmentInfos.readCommit(this, fileName); } catch (IOException ioe) { - System.out.println("MDW: exception reading segment infos " + fileName + "; files: " + Arrays.toString(listAll())); + if (LuceneTestCase.VERBOSE) { + System.out.println("MDW: exception reading segment infos " + fileName + "; files: " + Arrays.toString(listAll())); + } throw ioe; } knownFiles.addAll(infos.files(true)); @@ -833,8 +838,9 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper { // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles if (assertNoUnreferencedFilesOnClose) { - System.out.println("MDW: now assert no unref'd files at close"); - + if (LuceneTestCase.VERBOSE) { + System.out.println("MDW: now assert no unref'd files at close"); + } // now look for unreferenced files: discount ones that we tried to delete but could not Set allFiles = new HashSet<>(Arrays.asList(listAll())); String[] startFiles = allFiles.toArray(new String[0]);