diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 8cdd81cbede..7d14c63e774 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -45,7 +45,6 @@ Optimizations on Windows if NIOFSDirectory is used, mmapped files are still locked. (Michael Poindexter, Robert Muir, Uwe Schindler) - ======================= Lucene 4.5.0 ======================= New features @@ -206,6 +205,13 @@ Optimizations * LUCENE-5170: Fixed several wrapper analyzers to inherit the reuse strategy of the wrapped Analyzer. (Uwe Schindler, Robert Muir, Shay Banon) + +* LUCENE-5006: Simplified DocumentsWriter and DocumentsWriterPerThread + synchronization and concurrent interaction with IndexWriter. DWPT is now + only setup once and has no reset logic. All segment publishing and state + transition from DWPT into IndexWriter is now done via an Event-Queue + processed from within the IndexWriter in order to prevent suituations + where DWPT or DW calling int IW causing deadlocks. (Simon Willnauer) Documentation diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index c51be896fbf..983ee42b244 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -19,18 +19,18 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Collection; -import java.util.List; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; -import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; -import org.apache.lucene.index.FieldInfos.FieldNumbers; +import org.apache.lucene.index.IndexWriter.Event; import org.apache.lucene.search.Query; -import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.InfoStream; @@ -100,19 +100,15 @@ import org.apache.lucene.util.InfoStream; */ final class DocumentsWriter { - Directory directory; + private final Directory directory; private volatile boolean closed; - final InfoStream infoStream; - Similarity similarity; + private final InfoStream infoStream; - List newFiles; + private final LiveIndexWriterConfig config; - final IndexWriter indexWriter; - final LiveIndexWriterConfig indexWriterConfig; - - private AtomicInteger numDocsInRAM = new AtomicInteger(0); + private final AtomicInteger numDocsInRAM = new AtomicInteger(0); // TODO: cut over to BytesRefHash in BufferedDeletes volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue(); @@ -125,73 +121,72 @@ final class DocumentsWriter { */ private volatile boolean pendingChangesInCurrentFullFlush; - private Collection abortedFiles; // List of files that were written before last abort() - - final IndexingChain chain; - final DocumentsWriterPerThreadPool perThreadPool; final FlushPolicy flushPolicy; final DocumentsWriterFlushControl flushControl; - - final Codec codec; - DocumentsWriter(Codec codec, LiveIndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumbers globalFieldNumbers, - BufferedDeletesStream bufferedDeletesStream) { - this.codec = codec; - this.directory = directory; - this.indexWriter = writer; - this.infoStream = config.getInfoStream(); - this.similarity = config.getSimilarity(); - this.indexWriterConfig = writer.getConfig(); - this.perThreadPool = config.getIndexerThreadPool(); - this.chain = config.getIndexingChain(); - this.perThreadPool.initialize(this, globalFieldNumbers, config); - flushPolicy = config.getFlushPolicy(); - assert flushPolicy != null; - flushPolicy.init(this); - flushControl = new DocumentsWriterFlushControl(this, config); - } + private final IndexWriter writer; + private final Queue events; - synchronized void deleteQueries(final Query... queries) throws IOException { + + DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directory) { + this.directory = directory; + this.config = config; + this.infoStream = config.getInfoStream(); + this.perThreadPool = config.getIndexerThreadPool(); + flushPolicy = config.getFlushPolicy(); + this.writer = writer; + this.events = new ConcurrentLinkedQueue(); + flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedDeletesStream); + } + + synchronized boolean deleteQueries(final Query... queries) throws IOException { + // TODO why is this synchronized? + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(queries); flushControl.doOnDelete(); - if (flushControl.doApplyAllDeletes()) { - applyAllDeletes(deleteQueue); - } + return applyAllDeletes(deleteQueue); } // TODO: we could check w/ FreqProxTermsWriter: if the // term doesn't exist, don't bother buffering into the // per-DWPT map (but still must go into the global map) - synchronized void deleteTerms(final Term... terms) throws IOException { + synchronized boolean deleteTerms(final Term... terms) throws IOException { + // TODO why is this synchronized? final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.addDelete(terms); flushControl.doOnDelete(); - if (flushControl.doApplyAllDeletes()) { - applyAllDeletes(deleteQueue); - } + return applyAllDeletes( deleteQueue); } DocumentsWriterDeleteQueue currentDeleteSession() { return deleteQueue; } - private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { - if (deleteQueue != null && !flushControl.isFullFlush()) { - ticketQueue.addDeletesAndPurge(this, deleteQueue); + private final boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { + if (flushControl.doApplyAllDeletes()) { + if (deleteQueue != null && !flushControl.isFullFlush()) { + ticketQueue.addDeletes(deleteQueue); + } + putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge + return true; } - indexWriter.applyAllDeletes(); - indexWriter.flushCount.incrementAndGet(); + return false; } + + final int purgeBuffer(IndexWriter writer, boolean forced) throws IOException { + if (forced) { + return ticketQueue.forcePurge(writer); + } else { + return ticketQueue.tryPurge(writer); + } + } + /** Returns how many docs are currently buffered in RAM. */ int getNumDocs() { return numDocsInRAM.get(); } - Collection abortedFiles() { - return abortedFiles; - } - private void ensureOpen() throws AlreadyClosedException { if (closed) { throw new AlreadyClosedException("this IndexWriter is closed"); @@ -202,45 +197,37 @@ final class DocumentsWriter { * updating the index files) and must discard all * currently buffered docs. This resets our state, * discarding any docs added since last flush. */ - synchronized void abort() { + synchronized void abort(IndexWriter writer) { + assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting"; boolean success = false; - + final Set newFilesSet = new HashSet(); try { deleteQueue.clear(); if (infoStream.isEnabled("DW")) { infoStream.message("DW", "abort"); } - final int limit = perThreadPool.getActiveThreadState(); for (int i = 0; i < limit; i++) { final ThreadState perThread = perThreadPool.getThreadState(i); perThread.lock(); try { - if (perThread.isActive()) { // we might be closed - try { - perThread.dwpt.abort(); - } finally { - perThread.dwpt.checkAndResetHasAborted(); - flushControl.doOnAbort(perThread); - } - } else { - assert closed; - } + abortThreadState(perThread, newFilesSet); } finally { perThread.unlock(); } } - flushControl.abortPendingFlushes(); + flushControl.abortPendingFlushes(newFilesSet); + putEvent(new DeleteNewFilesEvent(newFilesSet)); flushControl.waitForFlush(); success = true; } finally { if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "done abort; abortedFiles=" + abortedFiles + " success=" + success); + infoStream.message("DW", "done abort; abortedFiles=" + newFilesSet + " success=" + success); } } } - synchronized void lockAndAbortAll() { + synchronized void lockAndAbortAll(IndexWriter indexWriter) { assert indexWriter.holdsFullFlushLock(); if (infoStream.isEnabled("DW")) { infoStream.message("DW", "lockAndAbortAll"); @@ -249,20 +236,15 @@ final class DocumentsWriter { try { deleteQueue.clear(); final int limit = perThreadPool.getMaxThreadStates(); + final Set newFilesSet = new HashSet(); for (int i = 0; i < limit; i++) { final ThreadState perThread = perThreadPool.getThreadState(i); perThread.lock(); - if (perThread.isActive()) { // we might be closed or - try { - perThread.dwpt.abort(); - } finally { - perThread.dwpt.checkAndResetHasAborted(); - flushControl.doOnAbort(perThread); - } - } + abortThreadState(perThread, newFilesSet); } deleteQueue.clear(); - flushControl.abortPendingFlushes(); + flushControl.abortPendingFlushes(newFilesSet); + putEvent(new DeleteNewFilesEvent(newFilesSet)); flushControl.waitForFlush(); success = true; } finally { @@ -271,12 +253,31 @@ final class DocumentsWriter { } if (!success) { // if something happens here we unlock all states again - unlockAllAfterAbortAll(); + unlockAllAfterAbortAll(indexWriter); } } } + + private final void abortThreadState(final ThreadState perThread, Set newFiles) { + assert perThread.isHeldByCurrentThread(); + if (perThread.isActive()) { // we might be closed + if (perThread.isInitialized()) { + try { + subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM()); + perThread.dwpt.abort(newFiles); + } finally { + perThread.dwpt.checkAndResetHasAborted(); + flushControl.doOnAbort(perThread); + } + } else { + flushControl.doOnAbort(perThread); + } + } else { + assert closed; + } + } - final synchronized void unlockAllAfterAbortAll() { + final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) { assert indexWriter.holdsFullFlushLock(); if (infoStream.isEnabled("DW")) { infoStream.message("DW", "unlockAll"); @@ -334,7 +335,7 @@ final class DocumentsWriter { private boolean preUpdate() throws IOException { ensureOpen(); - boolean maybeMerge = false; + boolean hasEvents = false; if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) { // Help out flushing any queued DWPTs so we can un-stall: if (infoStream.isEnabled("DW")) { @@ -345,7 +346,7 @@ final class DocumentsWriter { DocumentsWriterPerThread flushingDWPT; while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { // Don't push the delete here since the update could fail! - maybeMerge |= doFlush(flushingDWPT); + hasEvents |= doFlush(flushingDWPT); } if (infoStream.isEnabled("DW")) { @@ -361,28 +362,35 @@ final class DocumentsWriter { infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy"); } } - return maybeMerge; + return hasEvents; } - private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException { - if (flushControl.doApplyAllDeletes()) { - applyAllDeletes(deleteQueue); - } + private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException { + hasEvents |= applyAllDeletes(deleteQueue); if (flushingDWPT != null) { - maybeMerge |= doFlush(flushingDWPT); + hasEvents |= doFlush(flushingDWPT); } else { final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush(); if (nextPendingFlush != null) { - maybeMerge |= doFlush(nextPendingFlush); + hasEvents |= doFlush(nextPendingFlush); } } - return maybeMerge; + return hasEvents; + } + + private final void ensureInitialized(ThreadState state) { + if (state.isActive() && state.dwpt == null) { + final FieldInfos.Builder infos = new FieldInfos.Builder( + writer.globalFieldNumberMap); + state.dwpt = new DocumentsWriterPerThread(writer.newSegmentName(), + directory, config, infoStream, deleteQueue, infos); + } } boolean updateDocuments(final Iterable docs, final Analyzer analyzer, final Term delTerm) throws IOException { - boolean maybeMerge = preUpdate(); + boolean hasEvents = preUpdate(); final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; @@ -392,13 +400,19 @@ final class DocumentsWriter { ensureOpen(); assert false: "perThread is not active but we are still open"; } - + ensureInitialized(perThread); + assert perThread.isInitialized(); final DocumentsWriterPerThread dwpt = perThread.dwpt; + final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm); numDocsInRAM.addAndGet(docCount); } finally { if (dwpt.checkAndResetHasAborted()) { + if (!dwpt.pendingFilesToDelete().isEmpty()) { + putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete())); + } + subtractFlushedNumDocs(dwptNumDocs); flushControl.doOnAbort(perThread); } } @@ -408,31 +422,35 @@ final class DocumentsWriter { perThread.unlock(); } - return postUpdate(flushingDWPT, maybeMerge); + return postUpdate(flushingDWPT, hasEvents); } boolean updateDocument(final IndexDocument doc, final Analyzer analyzer, final Term delTerm) throws IOException { - boolean maybeMerge = preUpdate(); + boolean hasEvents = preUpdate(); final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - try { - if (!perThread.isActive()) { ensureOpen(); - throw new IllegalStateException("perThread is not active but we are still open"); + assert false: "perThread is not active but we are still open"; } - + ensureInitialized(perThread); + assert perThread.isInitialized(); final DocumentsWriterPerThread dwpt = perThread.dwpt; + final int dwptNumDocs = dwpt.getNumDocsInRAM(); try { dwpt.updateDocument(doc, analyzer, delTerm); numDocsInRAM.incrementAndGet(); } finally { if (dwpt.checkAndResetHasAborted()) { + if (!dwpt.pendingFilesToDelete().isEmpty()) { + putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete())); + } + subtractFlushedNumDocs(dwptNumDocs); flushControl.doOnAbort(perThread); } } @@ -442,13 +460,13 @@ final class DocumentsWriter { perThread.unlock(); } - return postUpdate(flushingDWPT, maybeMerge); + return postUpdate(flushingDWPT, hasEvents); } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException { - boolean maybeMerge = false; + boolean hasEvents = false; while (flushingDWPT != null) { - maybeMerge = true; + hasEvents = true; boolean success = false; SegmentFlushTicket ticket = null; try { @@ -474,9 +492,24 @@ final class DocumentsWriter { // Each flush is assigned a ticket in the order they acquire the ticketQueue lock ticket = ticketQueue.addFlushTicket(flushingDWPT); - // flush concurrently without locking - final FlushedSegment newSegment = flushingDWPT.flush(); - ticketQueue.addSegment(ticket, newSegment); + final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM(); + boolean dwptSuccess = false; + try { + // flush concurrently without locking + final FlushedSegment newSegment = flushingDWPT.flush(); + ticketQueue.addSegment(ticket, newSegment); + dwptSuccess = true; + } finally { + subtractFlushedNumDocs(flushingDocsInRam); + if (!flushingDWPT.pendingFilesToDelete().isEmpty()) { + putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete())); + hasEvents = true; + } + if (!dwptSuccess) { + putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo())); + hasEvents = true; + } + } // flush was successful once we reached this point - new seg. has been assigned to the ticket! success = true; } finally { @@ -496,54 +529,38 @@ final class DocumentsWriter { // thread in innerPurge can't keep up with all // other threads flushing segments. In this case // we forcefully stall the producers. - ticketQueue.forcePurge(this); - } else { - ticketQueue.tryPurge(this); + putEvent(ForcedPurgeEvent.INSTANCE); + break; } - } finally { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); - indexWriter.flushCount.incrementAndGet(); - indexWriter.doAfterFlush(); } flushingDWPT = flushControl.nextPendingFlush(); } - + if (hasEvents) { + putEvent(MergePendingEvent.INSTANCE); + } // If deletes alone are consuming > 1/2 our RAM // buffer, force them all to apply now. This is to // prevent too-frequent flushing of a long tail of // tiny segments: - final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); + final double ramBufferSizeMB = config.getRAMBufferSizeMB(); if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB)); } - applyAllDeletes(deleteQueue); + hasEvents = true; + if (!this.applyAllDeletes(deleteQueue)) { + putEvent(ApplyDeletesEvent.INSTANCE); + } } - return maybeMerge; + return hasEvents; } - - void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) - throws IOException { - // Finish the flushed segment and publish it to IndexWriter - if (newSegment == null) { - assert bufferedDeletes != null; - if (bufferedDeletes != null && bufferedDeletes.any()) { - indexWriter.publishFrozenDeletes(bufferedDeletes); - if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes); - } - } - } else { - publishFlushedSegment(newSegment, bufferedDeletes); - } - } - final void subtractFlushedNumDocs(int numFlushed) { int oldValue = numDocsInRAM.get(); while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) { @@ -551,29 +568,6 @@ final class DocumentsWriter { } } - /** - * Publishes the flushed segment, segment private deletes (if any) and its - * associated global delete (if present) to IndexWriter. The actual - * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s - * delete generation is always GlobalPacket_deleteGeneration + 1 - */ - private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket) - throws IOException { - assert newSegment != null; - assert newSegment.segmentInfo != null; - final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes; - //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name); - if (infoStream.isEnabled("DW")) { - infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes); - } - - if (segmentDeletes != null && infoStream.isEnabled("DW")) { - infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes); - } - // now publish! - indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket); - } - // for asserts private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null; @@ -588,7 +582,7 @@ final class DocumentsWriter { * two stage operation; the caller must ensure (in try/finally) that finishFlush * is called after this method, to release the flush lock in DWFlushControl */ - final boolean flushAllThreads() + final boolean flushAllThreads(final IndexWriter indexWriter) throws IOException { final DocumentsWriterDeleteQueue flushingDeleteQueue; if (infoStream.isEnabled("DW")) { @@ -620,10 +614,9 @@ final class DocumentsWriter { if (infoStream.isEnabled("DW")) { infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); } - ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue); - } else { - ticketQueue.forcePurge(this); - } + ticketQueue.addDeletes(flushingDeleteQueue); + } + ticketQueue.forcePurge(indexWriter); assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets(); } finally { assert flushingDeleteQueue == currentFullFlushDelQueue; @@ -641,11 +634,94 @@ final class DocumentsWriter { // Release the flush lock flushControl.finishFullFlush(); } else { - flushControl.abortFullFlushes(); + Set newFilesSet = new HashSet<>(); + flushControl.abortFullFlushes(newFilesSet); + putEvent(new DeleteNewFilesEvent(newFilesSet)); + } } finally { pendingChangesInCurrentFullFlush = false; } } + + public LiveIndexWriterConfig getIndexWriterConfig() { + return config; + } + + private void putEvent(Event event) { + events.add(event); + } + + static final class ApplyDeletesEvent implements Event { + static final Event INSTANCE = new ApplyDeletesEvent(); + private int instCount = 0; + private ApplyDeletesEvent() { + assert instCount == 0; + instCount++; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException { + writer.applyDeletesAndPurge(true); // we always purge! + } + } + + static final class MergePendingEvent implements Event { + static final Event INSTANCE = new MergePendingEvent(); + private int instCount = 0; + private MergePendingEvent() { + assert instCount == 0; + instCount++; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException { + writer.doAfterSegmentFlushed(triggerMerge, forcePurge); + } + } + + static final class ForcedPurgeEvent implements Event { + static final Event INSTANCE = new ForcedPurgeEvent(); + private int instCount = 0; + private ForcedPurgeEvent() { + assert instCount == 0; + instCount++; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException { + writer.purge(true); + } + } + + static class FlushFailedEvent implements Event { + private final SegmentInfo info; + + public FlushFailedEvent(SegmentInfo info) { + this.info = info; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException { + writer.flushFailed(info); + } + } + + static class DeleteNewFilesEvent implements Event { + private final Collection files; + + public DeleteNewFilesEvent(Collection files) { + this.files = files; + } + + @Override + public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException { + writer.deleteNewFiles(files); + } + } + + public Queue eventQueue() { + return events; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index cef4410b19f..dcadc20f2bb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -23,9 +23,11 @@ import java.util.List; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.ThreadInterruptedException; /** @@ -66,14 +68,18 @@ final class DocumentsWriterFlushControl { private boolean closed = false; private final DocumentsWriter documentsWriter; private final LiveIndexWriterConfig config; + private final BufferedDeletesStream bufferedDeletesStream; + private final InfoStream infoStream; - DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) { + DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedDeletesStream bufferedDeletesStream) { + this.infoStream = config.getInfoStream(); this.stallControl = new DocumentsWriterStallControl(); this.perThreadPool = documentsWriter.perThreadPool; this.flushPolicy = documentsWriter.flushPolicy; - this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; this.config = config; + this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; this.documentsWriter = documentsWriter; + this.bufferedDeletesStream = bufferedDeletesStream; } public synchronized long activeBytes() { @@ -240,7 +246,6 @@ final class DocumentsWriterFlushControl { } public synchronized void waitForFlush() { - assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush"; while (flushingWriters.size() != 0) { try { this.wait(); @@ -277,7 +282,7 @@ final class DocumentsWriterFlushControl { } assert assertMemory(); // Take it out of the loop this DWPT is stale - perThreadPool.replaceForFlush(state, closed); + perThreadPool.reset(state, closed); } finally { updateStallState(); } @@ -295,7 +300,7 @@ final class DocumentsWriterFlushControl { assert fullFlush : "can not block if fullFlush == false"; final DocumentsWriterPerThread dwpt; final long bytes = perThread.bytesUsed; - dwpt = perThreadPool.replaceForFlush(perThread, closed); + dwpt = perThreadPool.reset(perThread, closed); numPending--; blockedFlushes.add(new BlockedFlush(dwpt, bytes)); }finally { @@ -311,12 +316,12 @@ final class DocumentsWriterFlushControl { // We are pending so all memory is already moved to flushBytes if (perThread.tryLock()) { try { - if (perThread.isActive()) { + if (perThread.isInitialized()) { assert perThread.isHeldByCurrentThread(); final DocumentsWriterPerThread dwpt; final long bytes = perThread.bytesUsed; // do that before // replace! - dwpt = perThreadPool.replaceForFlush(perThread, closed); + dwpt = perThreadPool.reset(perThread, closed); assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing"; // Record the flushing DWPT to reduce flushBytes in doAfterFlush flushingWriters.put(dwpt, Long.valueOf(bytes)); @@ -413,11 +418,11 @@ final class DocumentsWriterFlushControl { * Returns the number of delete terms in the global pool */ public int getNumGlobalTermDeletes() { - return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms(); + return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedDeletesStream.numTerms(); } public long getDeleteBytesUsed() { - return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed(); + return documentsWriter.deleteQueue.bytesUsed() + bufferedDeletesStream.bytesUsed(); } synchronized int numFlushingDWPT() { @@ -441,7 +446,7 @@ final class DocumentsWriterFlushControl { .currentThread(), documentsWriter); boolean success = false; try { - if (perThread.isActive() + if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) { // There is a flush-all in process and this DWPT is // now stale -- enroll it for flush and try for @@ -475,7 +480,10 @@ final class DocumentsWriterFlushControl { final ThreadState next = perThreadPool.getThreadState(i); next.lock(); try { - if (!next.isActive()) { + if (!next.isInitialized()) { + if (closed && next.isActive()) { + perThreadPool.deactivateThreadState(next); + } continue; } assert next.dwpt.deleteQueue == flushingQueue @@ -515,7 +523,7 @@ final class DocumentsWriterFlushControl { final ThreadState next = perThreadPool.getThreadState(i); next.lock(); try { - assert !next.isActive() || next.dwpt.deleteQueue == queue; + assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ; } finally { next.unlock(); } @@ -526,12 +534,12 @@ final class DocumentsWriterFlushControl { private final List fullFlushBuffer = new ArrayList(); void addFlushableState(ThreadState perThread) { - if (documentsWriter.infoStream.isEnabled("DWFC")) { - documentsWriter.infoStream.message("DWFC", "addFlushableState " + perThread.dwpt); + if (infoStream.isEnabled("DWFC")) { + infoStream.message("DWFC", "addFlushableState " + perThread.dwpt); } final DocumentsWriterPerThread dwpt = perThread.dwpt; assert perThread.isHeldByCurrentThread(); - assert perThread.isActive(); + assert perThread.isInitialized(); assert fullFlush; assert dwpt.deleteQueue != documentsWriter.deleteQueue; if (dwpt.getNumDocsInRAM() > 0) { @@ -545,11 +553,7 @@ final class DocumentsWriterFlushControl { fullFlushBuffer.add(flushingDWPT); } } else { - if (closed) { - perThreadPool.deactivateThreadState(perThread); // make this state inactive - } else { - perThreadPool.reinitThreadState(perThread); - } + perThreadPool.reset(perThread, closed); // make this state inactive } } @@ -594,19 +598,20 @@ final class DocumentsWriterFlushControl { return true; } - synchronized void abortFullFlushes() { + synchronized void abortFullFlushes(Set newFiles) { try { - abortPendingFlushes(); + abortPendingFlushes(newFiles); } finally { fullFlush = false; } } - synchronized void abortPendingFlushes() { + synchronized void abortPendingFlushes(Set newFiles) { try { for (DocumentsWriterPerThread dwpt : flushQueue) { try { - dwpt.abort(); + documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM()); + dwpt.abort(newFiles); } catch (Throwable ex) { // ignore - keep on aborting the flush queue } finally { @@ -617,7 +622,8 @@ final class DocumentsWriterFlushControl { try { flushingWriters .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes)); - blockedFlush.dwpt.abort(); + documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM()); + blockedFlush.dwpt.abort(newFiles); } catch (Throwable ex) { // ignore - keep on aborting the blocked queue } finally { @@ -670,8 +676,8 @@ final class DocumentsWriterFlushControl { * checked out DWPT are available */ void waitIfStalled() { - if (documentsWriter.infoStream.isEnabled("DWFC")) { - documentsWriter.infoStream.message("DWFC", + if (infoStream.isEnabled("DWFC")) { + infoStream.message("DWFC", "waitIfStalled: numFlushesPending: " + flushQueue.size() + " netBytes: " + netBytes() + " flushBytes: " + flushBytes() + " fullFlush: " + fullFlush); @@ -686,5 +692,12 @@ final class DocumentsWriterFlushControl { return stallControl.anyStalledThreads(); } + /** + * Returns the {@link IndexWriter} {@link InfoStream} + */ + public InfoStream getInfoStream() { + return infoStream; + } + } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java index 24c263c8af4..de87c351232 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java @@ -34,8 +34,7 @@ class DocumentsWriterFlushQueue { private final AtomicInteger ticketCount = new AtomicInteger(); private final ReentrantLock purgeLock = new ReentrantLock(); - void addDeletesAndPurge(DocumentsWriter writer, - DocumentsWriterDeleteQueue deleteQueue) throws IOException { + void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { synchronized (this) { incTickets();// first inc the ticket count - freeze opens // a window for #anyChanges to fail @@ -49,9 +48,6 @@ class DocumentsWriterFlushQueue { } } } - // don't hold the lock on the FlushQueue when forcing the purge - this blocks and deadlocks - // if we hold the lock. - forcePurge(writer); } private void incTickets() { @@ -98,8 +94,9 @@ class DocumentsWriterFlushQueue { return ticketCount.get() != 0; } - private void innerPurge(DocumentsWriter writer) throws IOException { + private int innerPurge(IndexWriter writer) throws IOException { assert purgeLock.isHeldByCurrentThread(); + int numPurged = 0; while (true) { final FlushTicket head; final boolean canPublish; @@ -108,6 +105,7 @@ class DocumentsWriterFlushQueue { canPublish = head != null && head.canPublish(); // do this synced } if (canPublish) { + numPurged++; try { /* * if we block on publish -> lock IW -> lock BufferedDeletes we don't block @@ -116,6 +114,7 @@ class DocumentsWriterFlushQueue { * be a ticket still in the queue. */ head.publish(writer); + } finally { synchronized (this) { // finally remove the published ticket from the queue @@ -128,27 +127,31 @@ class DocumentsWriterFlushQueue { break; } } + return numPurged; } - void forcePurge(DocumentsWriter writer) throws IOException { + int forcePurge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(this); + assert !Thread.holdsLock(writer); purgeLock.lock(); try { - innerPurge(writer); + return innerPurge(writer); } finally { purgeLock.unlock(); } } - void tryPurge(DocumentsWriter writer) throws IOException { + int tryPurge(IndexWriter writer) throws IOException { assert !Thread.holdsLock(this); + assert !Thread.holdsLock(writer); if (purgeLock.tryLock()) { try { - innerPurge(writer); + return innerPurge(writer); } finally { purgeLock.unlock(); } } + return 0; } public int getTicketCount() { @@ -169,8 +172,47 @@ class DocumentsWriterFlushQueue { this.frozenDeletes = frozenDeletes; } - protected abstract void publish(DocumentsWriter writer) throws IOException; + protected abstract void publish(IndexWriter writer) throws IOException; protected abstract boolean canPublish(); + + /** + * Publishes the flushed segment, segment private deletes (if any) and its + * associated global delete (if present) to IndexWriter. The actual + * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s + * delete generation is always GlobalPacket_deleteGeneration + 1 + */ + protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes globalPacket) + throws IOException { + assert newSegment != null; + assert newSegment.segmentInfo != null; + final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes; + //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name); + if (indexWriter.infoStream.isEnabled("DW")) { + indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes); + } + + if (segmentDeletes != null && indexWriter.infoStream.isEnabled("DW")) { + indexWriter.infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes); + } + // now publish! + indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket); + } + + protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes) + throws IOException { + // Finish the flushed segment and publish it to IndexWriter + if (newSegment == null) { + assert bufferedDeletes != null; + if (bufferedDeletes != null && bufferedDeletes.any()) { + indexWriter.publishFrozenDeletes(bufferedDeletes); + if (indexWriter.infoStream.isEnabled("DW")) { + indexWriter.infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes); + } + } + } else { + publishFlushedSegment(indexWriter, newSegment, bufferedDeletes); + } + } } static final class GlobalDeletesTicket extends FlushTicket { @@ -179,11 +221,11 @@ class DocumentsWriterFlushQueue { super(frozenDeletes); } @Override - protected void publish(DocumentsWriter writer) throws IOException { + protected void publish(IndexWriter writer) throws IOException { assert !published : "ticket was already publised - can not publish twice"; published = true; // its a global ticket - no segment to publish - writer.finishFlush(null, frozenDeletes); + finishFlush(writer, null, frozenDeletes); } @Override @@ -201,10 +243,10 @@ class DocumentsWriterFlushQueue { } @Override - protected void publish(DocumentsWriter writer) throws IOException { + protected void publish(IndexWriter writer) throws IOException { assert !published : "ticket was already publised - can not publish twice"; published = true; - writer.finishFlush(segment, frozenDeletes); + finishFlush(writer, segment, frozenDeletes); } protected void setSegment(FlushedSegment segment) { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index f9d3d1abf96..6fae432d5fb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -22,6 +22,7 @@ import java.text.NumberFormat; import java.util.Collection; import java.util.HashSet; import java.util.Locale; +import java.util.Set; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; @@ -144,7 +145,7 @@ class DocumentsWriterPerThread { * updating the index files) and must discard all * currently buffered docs. This resets our state, * discarding any docs added since last flush. */ - void abort() { + void abort(Set createdFiles) { //System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name); hasAborted = aborting = true; try { @@ -157,10 +158,7 @@ class DocumentsWriterPerThread { } pendingDeletes.clear(); - deleteSlice = deleteQueue.newSlice(); - // Reset all postings data - doAfterFlush(); - + createdFiles.addAll(directory.getCreatedFiles()); } finally { aborting = false; if (infoStream.isEnabled("DWPT")) { @@ -169,83 +167,77 @@ class DocumentsWriterPerThread { } } private final static boolean INFO_VERBOSE = false; - final DocumentsWriter parent; final Codec codec; - final IndexWriter writer; final TrackingDirectoryWrapper directory; final Directory directoryOrig; final DocState docState; final DocConsumer consumer; final Counter bytesUsed; - SegmentWriteState flushState; //Deletes for our still-in-RAM (to be flushed next) segment - BufferedDeletes pendingDeletes; - SegmentInfo segmentInfo; // Current segment we are working on + final BufferedDeletes pendingDeletes; + private final SegmentInfo segmentInfo; // Current segment we are working on boolean aborting = false; // True if an abort is pending boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting private FieldInfos.Builder fieldInfos; private final InfoStream infoStream; private int numDocsInRAM; - private int flushedDocCount; - DocumentsWriterDeleteQueue deleteQueue; - DeleteSlice deleteSlice; + final DocumentsWriterDeleteQueue deleteQueue; + private final DeleteSlice deleteSlice; private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT); final Allocator byteBlockAllocator; final IntBlockPool.Allocator intBlockAllocator; private final LiveIndexWriterConfig indexWriterConfig; - public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, - FieldInfos.Builder fieldInfos, IndexingChain indexingChain) { + public DocumentsWriterPerThread(String segmentName, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue, + FieldInfos.Builder fieldInfos) { this.directoryOrig = directory; this.directory = new TrackingDirectoryWrapper(directory); - this.parent = parent; this.fieldInfos = fieldInfos; - this.writer = parent.indexWriter; - this.indexWriterConfig = parent.indexWriterConfig; - this.infoStream = parent.infoStream; - this.codec = parent.codec; + this.indexWriterConfig = indexWriterConfig; + this.infoStream = infoStream; + this.codec = indexWriterConfig.getCodec(); this.docState = new DocState(this, infoStream); - this.docState.similarity = parent.indexWriter.getConfig().getSimilarity(); + this.docState.similarity = indexWriterConfig.getSimilarity(); bytesUsed = Counter.newCounter(); byteBlockAllocator = new DirectTrackingAllocator(bytesUsed); pendingDeletes = new BufferedDeletes(); intBlockAllocator = new IntBlockAllocator(bytesUsed); - initialize(); - // this should be the last call in the ctor - // it really sucks that we need to pull this within the ctor and pass this ref to the chain! - consumer = indexingChain.getChain(this); - } - - public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) { - this(other.directoryOrig, other.parent, fieldInfos, other.parent.chain); - } - - void initialize() { - deleteQueue = parent.deleteQueue; + this.deleteQueue = deleteQueue; assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; pendingDeletes.clear(); - deleteSlice = null; - } + deleteSlice = deleteQueue.newSlice(); + + segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1, + false, codec, null, null); + assert numDocsInRAM == 0; + if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { + infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue); + } + // this should be the last call in the ctor + // it really sucks that we need to pull this within the ctor and pass this ref to the chain! + consumer = indexWriterConfig.getIndexingChain().getChain(this); + } + void setAborting() { aborting = true; } - - final boolean testPoint(String message) { - if (infoStream.isEnabled("TP")) { - infoStream.message("TP", message); - } - return true; - } - + boolean checkAndResetHasAborted() { final boolean retval = hasAborted; hasAborted = false; return retval; } + + final boolean testPoint(String message) { + if (infoStream.isEnabled("TP")) { + infoStream.message("TP", message); + } + return true; + } public void updateDocument(IndexDocument doc, Analyzer analyzer, Term delTerm) throws IOException { assert testPoint("DocumentsWriterPerThread addDocument start"); @@ -253,9 +245,6 @@ class DocumentsWriterPerThread { docState.doc = doc; docState.analyzer = analyzer; docState.docID = numDocsInRAM; - if (segmentInfo == null) { - initSegmentInfo(); - } if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); } @@ -274,7 +263,7 @@ class DocumentsWriterPerThread { deleteDocID(docState.docID); numDocsInRAM++; } else { - abort(); + abort(filesToDelete); } } } @@ -284,29 +273,16 @@ class DocumentsWriterPerThread { success = true; } finally { if (!success) { - abort(); + abort(filesToDelete); } } finishDocument(delTerm); } - private void initSegmentInfo() { - String segment = writer.newSegmentName(); - segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segment, -1, - false, codec, null, null); - assert numDocsInRAM == 0; - if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { - infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue); - } - } - public int updateDocuments(Iterable docs, Analyzer analyzer, Term delTerm) throws IOException { assert testPoint("DocumentsWriterPerThread addDocuments start"); assert deleteQueue != null; docState.analyzer = analyzer; - if (segmentInfo == null) { - initSegmentInfo(); - } if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name); } @@ -331,7 +307,7 @@ class DocumentsWriterPerThread { // be called (because an exc is being thrown): numDocsInRAM++; } else { - abort(); + abort(filesToDelete); } } } @@ -341,7 +317,7 @@ class DocumentsWriterPerThread { success = true; } finally { if (!success) { - abort(); + abort(filesToDelete); } } @@ -384,21 +360,18 @@ class DocumentsWriterPerThread { * the updated slice we get from 1. holds all the deletes that have occurred * since we updated the slice the last time. */ - if (deleteSlice == null) { - deleteSlice = deleteQueue.newSlice(); - if (delTerm != null) { - deleteQueue.add(delTerm, deleteSlice); - deleteSlice.reset(); - } - - } else { - if (delTerm != null) { - deleteQueue.add(delTerm, deleteSlice); - assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; - deleteSlice.apply(pendingDeletes, numDocsInRAM); - } else if (deleteQueue.updateSlice(deleteSlice)) { - deleteSlice.apply(pendingDeletes, numDocsInRAM); - } + boolean applySlice = numDocsInRAM != 0; + if (delTerm != null) { + deleteQueue.add(delTerm, deleteSlice); + assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; + } else { + applySlice &= deleteQueue.updateSlice(deleteSlice); + } + + if (applySlice) { + deleteSlice.apply(pendingDeletes, numDocsInRAM); + } else { // if we don't need to apply we must reset! + deleteSlice.reset(); } ++numDocsInRAM; } @@ -434,14 +407,6 @@ class DocumentsWriterPerThread { return numDocsInRAM; } - /** Reset after a flush */ - private void doAfterFlush() { - segmentInfo = null; - directory.getCreatedFiles().clear(); - fieldInfos = new FieldInfos.Builder(fieldInfos.globalFieldNumbers); - parent.subtractFlushedNumDocs(numDocsInRAM); - numDocsInRAM = 0; - } /** * Prepares this DWPT for flushing. This method will freeze and return the @@ -457,7 +422,7 @@ class DocumentsWriterPerThread { // apply all deletes before we flush and release the delete slice deleteSlice.apply(pendingDeletes, numDocsInRAM); assert deleteSlice.isEmpty(); - deleteSlice = null; + deleteSlice.reset(); } return globalDeletes; } @@ -465,11 +430,11 @@ class DocumentsWriterPerThread { /** Flush all pending docs to a new segment */ FlushedSegment flush() throws IOException { assert numDocsInRAM > 0; - assert deleteSlice == null : "all deletes must be applied in prepareFlush"; + assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush"; segmentInfo.setDocCount(numDocsInRAM); - flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), + final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingDeletes, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed()))); - final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.; + final double startMBUsed = bytesUsed() / 1024. / 1024.; // Apply delete-by-docID now (delete-byDocID only // happens when an exception is hit processing that @@ -515,15 +480,11 @@ class DocumentsWriterPerThread { infoStream.message("DWPT", "flushed codec=" + codec); } - flushedDocCount += flushState.segmentInfo.getDocCount(); - final BufferedDeletes segmentDeletes; if (pendingDeletes.queries.isEmpty()) { - pendingDeletes.clear(); segmentDeletes = null; } else { segmentDeletes = pendingDeletes; - pendingDeletes = new BufferedDeletes(); } if (infoStream.isEnabled("DWPT")) { @@ -531,7 +492,7 @@ class DocumentsWriterPerThread { infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name + " ramUsed=" + nf.format(startMBUsed) + " MB" + " newFlushedSize(includes docstores)=" + nf.format(newSegmentSize) + " MB" + - " docs/MB=" + nf.format(flushedDocCount / newSegmentSize)); + " docs/MB=" + nf.format(flushState.segmentInfo.getDocCount() / newSegmentSize)); } assert segmentInfo != null; @@ -539,20 +500,21 @@ class DocumentsWriterPerThread { FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush); sealFlushedSegment(fs); - doAfterFlush(); success = true; return fs; } finally { if (!success) { - if (segmentInfo != null) { - writer.flushFailed(segmentInfo); - } - abort(); + abort(filesToDelete); } } } + private final Set filesToDelete = new HashSet(); + + public Set pendingFilesToDelete() { + return filesToDelete; + } /** * Seals the {@link SegmentInfo} for the new flushed segment and persists * the deleted documents {@link MutableBits}. @@ -568,12 +530,10 @@ class DocumentsWriterPerThread { boolean success = false; try { + if (indexWriterConfig.getUseCompoundFile()) { - - // Now build compound file - Collection oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context); + filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context)); newSegment.info.setUseCompoundFile(true); - writer.deleteNewFiles(oldFiles); } // Have codec write SegmentInfo. Must do this after @@ -618,7 +578,6 @@ class DocumentsWriterPerThread { infoStream.message("DWPT", "hit exception " + "reating compound file for newly flushed segment " + newSegment.info.name); } - writer.flushFailed(newSegment.info); } } } @@ -671,4 +630,5 @@ class DocumentsWriterPerThread { + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborting=" + aborting + ", numDocsInRAM=" + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]"; } + } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 2a16d324257..cc46ebec320 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -71,12 +71,16 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { * for indexing anymore. * @see #isActive() */ - private void resetWriter(DocumentsWriterPerThread dwpt) { + + private void deactivate() { assert this.isHeldByCurrentThread(); - if (dwpt == null) { - isActive = false; - } - this.dwpt = dwpt; + isActive = false; + reset(); + } + + private void reset() { + assert this.isHeldByCurrentThread(); + this.dwpt = null; this.bytesUsed = 0; this.flushPending = false; } @@ -91,6 +95,11 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { return isActive; } + boolean isInitialized() { + assert this.isHeldByCurrentThread(); + return isActive() && dwpt != null; + } + /** * Returns the number of currently active bytes in this ThreadState's * {@link DocumentsWriterPerThread} @@ -121,9 +130,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { private ThreadState[] threadStates; private volatile int numThreadStatesActive; - private SetOnce globalFieldMap = new SetOnce(); - private SetOnce documentsWriter = new SetOnce(); - + /** * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s. */ @@ -133,14 +140,8 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { } threadStates = new ThreadState[maxNumThreadStates]; numThreadStatesActive = 0; - } - - void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) { - this.documentsWriter.set(documentsWriter); // thread pool is bound to DW - this.globalFieldMap.set(globalFieldMap); for (int i = 0; i < threadStates.length; i++) { - final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap); - threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); + threadStates[i] = new ThreadState(null); } } @@ -158,9 +159,10 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { // should not happen throw new RuntimeException(e); } - clone.documentsWriter = new SetOnce(); - clone.globalFieldMap = new SetOnce(); clone.threadStates = new ThreadState[threadStates.length]; + for (int i = 0; i < threadStates.length; i++) { + clone.threadStates[i] = new ThreadState(null); + } return clone; } @@ -178,6 +180,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { int getActiveThreadState() { return numThreadStatesActive; } + /** * Returns a new {@link ThreadState} iff any new state is available otherwise @@ -198,8 +201,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { if (threadState.isActive()) { // unreleased thread states are deactivated during DW#close() numThreadStatesActive++; // increment will publish the ThreadState - assert threadState.dwpt != null; - threadState.dwpt.initialize(); + assert threadState.dwpt == null; unlock = false; return threadState; } @@ -220,7 +222,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { for (int i = numThreadStatesActive; i < threadStates.length; i++) { assert threadStates[i].tryLock() : "unreleased threadstate should not be locked"; try { - assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive"; + assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive"; } finally { threadStates[i].unlock(); } @@ -236,24 +238,20 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { final ThreadState threadState = threadStates[i]; threadState.lock(); try { - threadState.resetWriter(null); + threadState.deactivate(); } finally { threadState.unlock(); } } } - DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) { + DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) { assert threadState.isHeldByCurrentThread(); - assert globalFieldMap.get() != null; final DocumentsWriterPerThread dwpt = threadState.dwpt; if (!closed) { - final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get()); - final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos); - newDwpt.initialize(); - threadState.resetWriter(newDwpt); + threadState.reset(); } else { - threadState.resetWriter(null); + threadState.deactivate(); } return dwpt; } @@ -328,18 +326,6 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable { */ void deactivateThreadState(ThreadState threadState) { assert threadState.isActive(); - threadState.resetWriter(null); - } - - /** - * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should - * only be reinitialized if it is active without any pending documents. - * - * @param threadState the state to reinitialize - */ - void reinitThreadState(ThreadState threadState) { - assert threadState.isActive; - assert threadState.dwpt.getNumDocsInRAM() == 0; - threadState.dwpt.initialize(); + threadState.deactivate(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java index 4e2cc362785..452340193d7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java @@ -68,12 +68,11 @@ class FlushByRamOrCountsPolicy extends FlushPolicy { control.setApplyAllDeletes(); } } - final DocumentsWriter writer = this.writer.get(); if ((flushOnRAM() && control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) { control.setApplyAllDeletes(); - if (writer.infoStream.isEnabled("FP")) { - writer.infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); + if (infoStream.isEnabled("FP")) { + infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); } } } @@ -89,9 +88,8 @@ class FlushByRamOrCountsPolicy extends FlushPolicy { final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); final long totalRam = control.activeBytes() + control.getDeleteBytesUsed(); if (totalRam >= limit) { - final DocumentsWriter writer = this.writer.get(); - if (writer.infoStream.isEnabled("FP")) { - writer.infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); + if (infoStream.isEnabled("FP")) { + infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit); } markLargestWriterPending(control, state, totalRam); } diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java index 03f6b9fd32d..d86ab2339ec 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java @@ -20,6 +20,7 @@ import java.util.Iterator; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.SetOnce; /** @@ -52,8 +53,8 @@ import org.apache.lucene.util.SetOnce; * @see IndexWriterConfig#setFlushPolicy(FlushPolicy) */ abstract class FlushPolicy implements Cloneable { - protected SetOnce writer = new SetOnce(); protected LiveIndexWriterConfig indexWriterConfig; + protected InfoStream infoStream; /** * Called for each delete term. If this is a delete triggered due to an update @@ -93,9 +94,9 @@ abstract class FlushPolicy implements Cloneable { /** * Called by DocumentsWriter to initialize the FlushPolicy */ - protected synchronized void init(DocumentsWriter docsWriter) { - writer.set(docsWriter); - indexWriterConfig = docsWriter.indexWriter.getConfig(); + protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) { + this.indexWriterConfig = indexWriterConfig; + infoStream = indexWriterConfig.getInfoStream(); } /** @@ -127,8 +128,8 @@ abstract class FlushPolicy implements Cloneable { } private boolean assertMessage(String s) { - if (writer.get().infoStream.isEnabled("FP")) { - writer.get().infoStream.message("FP", s); + if (infoStream.isEnabled("FP")) { + infoStream.message("FP", s); } return true; } @@ -142,8 +143,8 @@ abstract class FlushPolicy implements Cloneable { // should not happen throw new RuntimeException(e); } - clone.writer = new SetOnce(); clone.indexWriterConfig = null; + clone.infoStream = null; return clone; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 6450708b003..361af42820d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -182,7 +183,7 @@ import org.apache.lucene.util.ThreadInterruptedException; * referenced by the "front" of the index). For this, IndexFileDeleter * keeps track of the last non commit checkpoint. */ -public class IndexWriter implements Closeable, TwoPhaseCommit { +public class IndexWriter implements Closeable, TwoPhaseCommit{ private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1; @@ -227,6 +228,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { final FieldNumbers globalFieldNumberMap; private DocumentsWriter docWriter; + private final Queue eventQueue; final IndexFileDeleter deleter; // used by forceMerge to note those needing merging @@ -360,7 +362,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized (fullFlushLock) { boolean success = false; try { - anySegmentFlushed = docWriter.flushAllThreads(); + anySegmentFlushed = docWriter.flushAllThreads(this); if (!anySegmentFlushed) { // prevent double increment since docWriter#doFlush increments the flushcount // if we flushed anything. @@ -730,7 +732,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // start with previous field numbers, but new FieldInfos globalFieldNumberMap = getFieldNumberMap(); - docWriter = new DocumentsWriter(codec, config, directory, this, globalFieldNumberMap, bufferedDeletesStream); + config.getFlushPolicy().init(config); + docWriter = new DocumentsWriter(this, config, directory); + eventQueue = docWriter.eventQueue(); // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: @@ -961,7 +965,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { if (doFlush) { flush(waitForMerges, true); } else { - docWriter.abort(); // already closed -- never sync on IW + docWriter.abort(this); // already closed -- never sync on IW } } finally { @@ -1033,7 +1037,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized(this) { closed = true; } - assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates(); + assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates() : "" + oldWriter.perThreadPool.numDeactivatedThreadStates() + " " + oldWriter.perThreadPool.getMaxThreadStates(); } catch (OutOfMemoryError oom) { handleOOM(oom, "closeInternal"); } finally { @@ -1280,9 +1284,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { ensureOpen(); try { boolean success = false; - boolean anySegmentFlushed = false; try { - anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm); + if (docWriter.updateDocuments(docs, analyzer, delTerm)) { + processEvents(true, false); + } success = true; } finally { if (!success) { @@ -1291,9 +1296,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } - if (anySegmentFlushed) { - maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); - } } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocuments"); } @@ -1313,7 +1315,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Term term) throws IOException { ensureOpen(); try { - docWriter.deleteTerms(term); + if (docWriter.deleteTerms(term)) { + processEvents(true, false); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term)"); } @@ -1412,7 +1416,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Term... terms) throws IOException { ensureOpen(); try { - docWriter.deleteTerms(terms); + if (docWriter.deleteTerms(terms)) { + processEvents(true, false); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term..)"); } @@ -1432,7 +1438,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Query query) throws IOException { ensureOpen(); try { - docWriter.deleteQueries(query); + if (docWriter.deleteQueries(query)) { + processEvents(true, false); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query)"); } @@ -1454,7 +1462,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { public void deleteDocuments(Query... queries) throws IOException { ensureOpen(); try { - docWriter.deleteQueries(queries); + if (docWriter.deleteQueries(queries)) { + processEvents(true, false); + } } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query..)"); } @@ -1505,9 +1515,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { ensureOpen(); try { boolean success = false; - boolean anySegmentFlushed = false; try { - anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term); + if (docWriter.updateDocument(doc, analyzer, term)) { + processEvents(true, false); + } success = true; } finally { if (!success) { @@ -1516,10 +1527,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } - - if (anySegmentFlushed) { - maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); - } } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocument"); } @@ -1730,7 +1737,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { // complete ensureOpen(); } - // NOTE: in the ConcurrentMergeScheduler case, when // doWait is false, we can return immediately while // background threads accomplish the merging @@ -2009,8 +2015,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { mergeScheduler.close(); bufferedDeletesStream.clear(); + processEvents(false, true); docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes - docWriter.abort(); // don't sync on IW here + docWriter.abort(this); // don't sync on IW here synchronized(this) { if (pendingCommit != null) { @@ -2102,7 +2109,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { * sure it's just like a fresh index. */ try { - docWriter.lockAndAbortAll(); + docWriter.lockAndAbortAll(this); + processEvents(false, true); synchronized (this) { try { // Abort any running merges @@ -2135,7 +2143,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } finally { - docWriter.unlockAllAfterAbortAll(); + docWriter.unlockAllAfterAbortAll(this); } } } @@ -2243,33 +2251,40 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { * Atomically adds the segment private delete packet and publishes the flushed * segments SegmentInfo to the index writer. */ - synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment, + void publishFlushedSegment(SegmentInfoPerCommit newSegment, FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException { - // Lock order IW -> BDS - synchronized (bufferedDeletesStream) { - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "publishFlushedSegment"); + try { + synchronized (this) { + // Lock order IW -> BDS + synchronized (bufferedDeletesStream) { + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "publishFlushedSegment"); + } + + if (globalPacket != null && globalPacket.any()) { + bufferedDeletesStream.push(globalPacket); + } + // Publishing the segment must be synched on IW -> BDS to make the sure + // that no merge prunes away the seg. private delete packet + final long nextGen; + if (packet != null && packet.any()) { + nextGen = bufferedDeletesStream.push(packet); + } else { + // Since we don't have a delete packet to apply we can get a new + // generation right away + nextGen = bufferedDeletesStream.getNextGen(); + } + if (infoStream.isEnabled("IW")) { + infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment)); + } + newSegment.setBufferedDeletesGen(nextGen); + segmentInfos.add(newSegment); + checkpoint(); + } } - - if (globalPacket != null && globalPacket.any()) { - bufferedDeletesStream.push(globalPacket); - } - // Publishing the segment must be synched on IW -> BDS to make the sure - // that no merge prunes away the seg. private delete packet - final long nextGen; - if (packet != null && packet.any()) { - nextGen = bufferedDeletesStream.push(packet); - } else { - // Since we don't have a delete packet to apply we can get a new - // generation right away - nextGen = bufferedDeletesStream.getNextGen(); - } - if (infoStream.isEnabled("IW")) { - infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment)); - } - newSegment.setBufferedDeletesGen(nextGen); - segmentInfos.add(newSegment); - checkpoint(); + } finally { + flushCount.incrementAndGet(); + doAfterFlush(); } } @@ -2705,12 +2720,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { boolean flushSuccess = false; boolean success = false; try { - anySegmentsFlushed = docWriter.flushAllThreads(); + anySegmentsFlushed = docWriter.flushAllThreads(this); if (!anySegmentsFlushed) { // prevent double increment since docWriter#doFlush increments the flushcount // if we flushed anything. flushCount.incrementAndGet(); } + processEvents(false, true); flushSuccess = true; synchronized(this) { @@ -2750,7 +2766,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } catch (OutOfMemoryError oom) { handleOOM(oom, "prepareCommit"); } - + boolean success = false; try { if (anySegmentsFlushed) { @@ -2765,7 +2781,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { } } } - + startCommit(toCommit); } } @@ -2950,10 +2966,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized (fullFlushLock) { boolean flushSuccess = false; try { - anySegmentFlushed = docWriter.flushAllThreads(); + anySegmentFlushed = docWriter.flushAllThreads(this); flushSuccess = true; } finally { docWriter.finishFullFlush(flushSuccess); + processEvents(false, true); } } synchronized(this) { @@ -4307,4 +4324,65 @@ public class IndexWriter implements Closeable, TwoPhaseCommit { synchronized final void flushFailed(SegmentInfo info) throws IOException { deleter.refresh(info.name); } + + final int purge(boolean forced) throws IOException { + return docWriter.purgeBuffer(this, forced); + } + + final void applyDeletesAndPurge(boolean forcePurge) throws IOException { + try { + purge(forcePurge); + } finally { + applyAllDeletes(); + flushCount.incrementAndGet(); + } + } + final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException { + try { + purge(forcePurge); + } finally { + if (triggerMerge) { + maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); + } + } + + } + + private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException { + return processEvents(eventQueue, triggerMerge, forcePurge); + } + + private boolean processEvents(Queue queue, boolean triggerMerge, boolean forcePurge) throws IOException { + Event event; + boolean processed = false; + while((event = queue.poll()) != null) { + processed = true; + event.process(this, triggerMerge, forcePurge); + } + return processed; + } + + /** + * Interface for internal atomic events. See {@link DocumentsWriter} for details. Events are executed concurrently and no order is guaranteed. + * Each event should only rely on the serializeability within it's process method. All actions that must happen before or after a certain action must be + * encoded inside the {@link #process(IndexWriter, boolean, boolean)} method. + * + */ + static interface Event { + + /** + * Processes the event. This method is called by the {@link IndexWriter} + * passed as the first argument. + * + * @param writer + * the {@link IndexWriter} that executes the event. + * @param triggerMerge + * false iff this event should not trigger any segment merges + * @param clearBuffers + * true iff this event should clear all buffers associated with the event. + * @throws IOException + * if an {@link IOException} occurs + */ + void process(IndexWriter writer, boolean triggerMerge, boolean clearBuffers) throws IOException; + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java index a79bac6a061..7a49d3b79da 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java @@ -281,7 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase { Iterator allActiveThreads = flushControl.allActiveThreadStates(); long bytesUsed = 0; while (allActiveThreads.hasNext()) { - bytesUsed += allActiveThreads.next().dwpt.bytesUsed(); + ThreadState next = allActiveThreads.next(); + if (next.dwpt != null) { + bytesUsed += next.dwpt.bytesUsed(); + } } assertEquals(bytesUsed, flushControl.activeBytes()); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index 34e41dcf3cd..5fe04427966 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -1702,7 +1702,6 @@ public class TestIndexWriter extends LuceneTestCase { w.deleteAll(); w.commit(); - // Make sure we accumulate no files except for empty // segments_N and segments.gen: assertTrue(d.listAll().length <= 2); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java index 433e49c2f65..11c1353b304 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterForceMerge.java @@ -83,7 +83,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase { IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( TEST_VERSION_CURRENT, new MockAnalyzer(random())) .setMaxBufferedDocs(2).setMergePolicy(ldmp).setMergeScheduler(new ConcurrentMergeScheduler())); - + for(int iter=0;iter<10;iter++) { for(int i=0;i<19;i++) writer.addDocument(doc); @@ -96,7 +96,6 @@ public class TestIndexWriterForceMerge extends LuceneTestCase { sis.read(dir); final int segCount = sis.size(); - writer.forceMerge(7); writer.commit(); writer.waitForMerges(); @@ -108,7 +107,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase { if (segCount < 7) assertEquals(segCount, optSegCount); else - assertEquals(7, optSegCount); + assertEquals("seg: " + segCount, 7, optSegCount); } writer.close(); dir.close();