diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index c86156607ce..1ef55cdde5a 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -157,6 +157,10 @@ Bug Fixes * LUCENE-8055: MemoryIndex.MemoryDocValuesIterator returns 2 documents instead of 1. (Simon Willnauer) +* LUCENE-8043: Fix document accounting in IndexWriter to prevent writing too many + documents. Once this happens, Lucene refuses to open the index and throws a + CorruptIndexException. (Simon Willnauer, Yonik Seeley, Mike McCandless) + Build * LUCENE-6144: Upgrade Ivy to 2.4.0; 'ant ivy-bootstrap' now removes old Ivy diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 6aca9f4b697..d4e4e239e69 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -549,7 +549,6 @@ final class DocumentsWriter implements Closeable, Accountable { try { // Each flush is assigned a ticket in the order they acquire the ticketQueue lock ticket = ticketQueue.addFlushTicket(flushingDWPT); - final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM(); boolean dwptSuccess = false; try { @@ -681,7 +680,9 @@ final class DocumentsWriter implements Closeable, Accountable { ticketQueue.addDeletes(flushingDeleteQueue); } ticketQueue.forcePurge(writer); - assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets(); + // we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue + // concurrently if we have very small ram buffers this happens quite frequently + assert !flushingDeleteQueue.anyChanges(); } finally { assert flushingDeleteQueue == currentFullFlushDelQueue; } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 94ffba77e2c..76c29065f74 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -118,6 +118,7 @@ class DocumentsWriterPerThread { void abort() { //System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name); aborted = true; + pendingNumDocs.addAndGet(-numDocsInRAM); try { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "now abort"); @@ -491,7 +492,7 @@ class DocumentsWriterPerThread { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0)/1000000.0) + " msec"); } - + return fs; } catch (Throwable th) { abort(); @@ -522,7 +523,6 @@ class DocumentsWriterPerThread { */ void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap) throws IOException { assert flushedSegment != null; - SegmentCommitInfo newSegment = flushedSegment.segmentInfo; IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH); diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 6059218f99d..a7e050da246 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -551,13 +552,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { return true; } - public synchronized void drop(SegmentCommitInfo info) throws IOException { + public synchronized boolean drop(SegmentCommitInfo info) throws IOException { final ReadersAndUpdates rld = readerMap.get(info); if (rld != null) { assert info == rld.info; readerMap.remove(info); rld.dropReaders(); + return true; } + return false; } public synchronized long ramBytesUsed() { @@ -726,7 +729,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { final Iterator> it = readerMap.entrySet().iterator(); while(it.hasNext()) { final ReadersAndUpdates rld = it.next().getValue(); - try { if (doSave && rld.writeLiveDocs(directory)) { // Make sure we only write del docs and field updates for a live segment: @@ -1097,7 +1099,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { rollbackSegments = segmentInfos.createBackupSegmentInfos(); } - commitUserData = new HashMap(segmentInfos.getUserData()).entrySet(); + commitUserData = new HashMap<>(segmentInfos.getUserData()).entrySet(); pendingNumDocs.set(segmentInfos.totalMaxDoc()); @@ -1267,6 +1269,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now flush at close"); } + flush(true, true); waitForMerges(); commitInternal(config.getMergePolicy()); @@ -1617,9 +1620,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // merge will skip merging it and will then drop // it once it's done: if (mergingSegments.contains(info) == false) { - segmentInfos.remove(info); - pendingNumDocs.addAndGet(-info.info.maxDoc()); - readerPool.drop(info); + // it's possible that we invoke this method more than once for the same SCI + // we must only remove the docs once! + boolean dropPendingDocs = segmentInfos.remove(info); + try { + // this is sneaky - we might hit an exception while dropping a reader but then we have already + // removed the segment for the segmentInfo and we lost the pendingDocs update due to that. + // therefore we execute the adjustPendingNumDocs in a finally block to account for that. + dropPendingDocs |= readerPool.drop(info); + } finally { + if (dropPendingDocs) { + adjustPendingNumDocs(-info.info.maxDoc()); + } + } } } @@ -2342,6 +2355,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Make sure no commit is running, else e.g. we can close while another thread is still fsync'ing: synchronized(commitLock) { rollbackInternalNoCommit(); + + assert pendingNumDocs.get() == segmentInfos.totalMaxDoc() + : "pendingNumDocs " + pendingNumDocs.get() + " != " + segmentInfos.totalMaxDoc() + " totalMaxDoc"; } } @@ -2354,7 +2370,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { try { abortMerges(); - if (infoStream.isEnabled("IW")) { infoStream.message("IW", "rollback: done finish merges"); } @@ -2365,6 +2380,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes docWriter.abort(this); // don't sync on IW here + docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes + purge(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources synchronized(this) { if (pendingCommit != null) { @@ -2376,15 +2393,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { notifyAll(); } } - // Don't bother saving any changes in our segmentInfos readerPool.dropAll(false); - + final int totalMaxDoc = segmentInfos.totalMaxDoc(); // Keep the same segmentInfos instance but replace all // of its SegmentInfo instances so IFD below will remove // any segments we flushed since the last commit: segmentInfos.rollbackSegmentInfos(rollbackSegments); - + int rollbackMaxDoc = segmentInfos.totalMaxDoc(); + // now we need to adjust this back to the rolled back SI but don't set it to the absolute value + // otherwise we might hide internal bugsf + adjustPendingNumDocs(-(totalMaxDoc-rollbackMaxDoc)); if (infoStream.isEnabled("IW") ) { infoStream.message("IW", "rollback: infos=" + segString(segmentInfos)); } @@ -2495,9 +2514,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { */ try { synchronized (fullFlushLock) { - long abortedDocCount = docWriter.lockAndAbortAll(this); - pendingNumDocs.addAndGet(-abortedDocCount); - + docWriter.lockAndAbortAll(this); processEvents(false, true); synchronized (this) { try { @@ -2505,11 +2522,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { abortMerges(); // Let merges run again stopMerges = false; + adjustPendingNumDocs(-segmentInfos.totalMaxDoc()); // Remove all segments - pendingNumDocs.addAndGet(-segmentInfos.totalMaxDoc()); segmentInfos.clear(); // Ask deleter to locate unreferenced files & remove them: deleter.checkpoint(segmentInfos, false); + /* don't refresh the deleter here since there might * be concurrent indexing requests coming in opening * files on the directory after we called DW#abort() @@ -2522,12 +2540,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { changeCount.incrementAndGet(); segmentInfos.changed(); globalFieldNumberMap.clear(); - success = true; long seqNo = docWriter.deleteQueue.getNextSequenceNumber(); docWriter.setLastSeqNo(seqNo); return seqNo; - } finally { docWriter.unlockAllAfterAbortAll(this); if (!success) { @@ -2660,6 +2676,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { synchronized void publishFlushedSegment(SegmentCommitInfo newSegment, FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket, Sorter.DocMap sortMap) throws IOException { + boolean published = false; try { // Lock order IW -> BDS ensureOpen(false); @@ -2695,6 +2712,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } newSegment.setBufferedDeletesGen(nextGen); segmentInfos.add(newSegment); + published = true; checkpoint(); if (packet != null && packet.any() && sortMap != null) { @@ -2705,6 +2723,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } } finally { + if (published == false) { + adjustPendingNumDocs(-newSegment.info.maxDoc()); + } flushCount.incrementAndGet(); doAfterFlush(); } @@ -3894,7 +3915,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // doing this makes MockDirWrapper angry in // TestNRTThreads (LUCENE-5434): readerPool.drop(merge.info); - // Safe: these files must exist: deleteNewFiles(merge.info.files()); return false; @@ -3955,9 +3975,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Now deduct the deleted docs that we just reclaimed from this // merge: - int delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc(); + int delDocCount; + if (dropSegment) { + // if we drop the segment we have to reduce the pendingNumDocs by merge.totalMaxDocs since we never drop + // the docs when we apply deletes if the segment is currently merged. + delDocCount = merge.totalMaxDoc; + } else { + delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc(); + } assert delDocCount >= 0; - pendingNumDocs.addAndGet(-delDocCount); + adjustPendingNumDocs(-delDocCount); if (dropSegment) { assert !segmentInfos.contains(merge.info); @@ -5072,11 +5099,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } private void processEvents(Queue queue, boolean triggerMerge, boolean forcePurge) throws IOException { - boolean processed = false; if (tragedy == null) { Event event; while ((event = queue.poll()) != null) { - processed = true; event.process(this, triggerMerge, forcePurge); } } @@ -5088,7 +5113,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * encoded inside the {@link #process(IndexWriter, boolean, boolean)} method. * */ - static interface Event { + interface Event { /** * Processes the event. This method is called by the {@link IndexWriter} @@ -5111,9 +5136,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { * IllegalArgumentException} if it's not allowed. */ private void reserveDocs(long addedNumDocs) { assert addedNumDocs >= 0; - if (pendingNumDocs.addAndGet(addedNumDocs) > actualMaxDocs) { + if (adjustPendingNumDocs(addedNumDocs) > actualMaxDocs) { // Reserve failed: put the docs back and throw exc: - pendingNumDocs.addAndGet(-addedNumDocs); + adjustPendingNumDocs(-addedNumDocs); tooManyDocs(addedNumDocs); } } @@ -5142,4 +5167,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { ensureOpen(); return docWriter.getMaxCompletedSequenceNumber(); } + + private long adjustPendingNumDocs(long numDocs) { + long count = pendingNumDocs.addAndGet(numDocs); + assert count >= 0 : "pendingNumDocs is negative: " + count; + return count; + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java index f796d34d0e4..008b6e379a7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java @@ -972,8 +972,8 @@ public final class SegmentInfos implements Cloneable, IterableWARNING: O(N) cost */ - public void remove(SegmentCommitInfo si) { - segments.remove(si); + public boolean remove(SegmentCommitInfo si) { + return segments.remove(si); } /** Remove the {@link SegmentCommitInfo} at the diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java new file mode 100644 index 00000000000..78305f43cb8 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; + +public class TestIndexTooManyDocs extends LuceneTestCase { + + /* + * This test produces a boat load of very small segments with lot of deletes which are likely deleting + * the entire segment. see https://issues.apache.org/jira/browse/LUCENE-8043 + */ + public void testIndexTooManyDocs() throws IOException, InterruptedException { + Directory dir = newDirectory(); + int numMaxDoc = 25; + IndexWriterConfig config = new IndexWriterConfig(); + config.setRAMBufferSizeMB(0.000001); // force lots of small segments and logs of concurrent deletes + IndexWriter writer = new IndexWriter(dir, config); + try { + IndexWriter.setMaxDocs(numMaxDoc); + int numThreads = 5 + random().nextInt(5); + Thread[] threads = new Thread[numThreads]; + CountDownLatch latch = new CountDownLatch(numThreads); + CountDownLatch indexingDone = new CountDownLatch(numThreads - 2); + AtomicBoolean done = new AtomicBoolean(false); + for (int i = 0; i < numThreads; i++) { + if (i >= 2) { + threads[i] = new Thread(() -> { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + for (int d = 0; d < 100; d++) { + Document doc = new Document(); + String id = Integer.toString(random().nextInt(numMaxDoc * 2)); + doc.add(new StringField("id", id, Field.Store.NO)); + try { + Term t = new Term("id", id); + if (random().nextInt(5) == 0) { + writer.deleteDocuments(new TermQuery(t)); + } + writer.updateDocument(t, doc); + } catch (IOException e) { + throw new AssertionError(e); + } catch (IllegalArgumentException e) { + assertEquals("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs(), e.getMessage()); + } + } + indexingDone.countDown(); + }); + } else { + threads[i] = new Thread(() -> { + try { + latch.countDown(); + latch.await(); + DirectoryReader open = DirectoryReader.open(writer, true, true); + while (done.get() == false) { + DirectoryReader directoryReader = DirectoryReader.openIfChanged(open); + if (directoryReader != null) { + open.close(); + open = directoryReader; + } + } + IOUtils.closeWhileHandlingException(open); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + threads[i].start(); + } + + indexingDone.await(); + done.set(true); + + + for (int i = 0; i < numThreads; i++) { + threads[i].join(); + } + writer.close(); + dir.close(); + } finally { + IndexWriter.setMaxDocs(IndexWriter.MAX_DOCS); + } + } +} \ No newline at end of file