LUCENE-8043: Fix document accounting in IndexWriter

The IndexWriter check for too many documents does not always work, resulting in
going over the limit. Once this happens, Lucene refuses to open the index and
throws a CorruptIndexException: Too many documents.
This change also fixes document accounting if the index writer hits an aborting
exception and/or the writer is rolled back. Pending document counts are now consistent
with the latest SegmentInfos once the writer has been rolled back.
This commit is contained in:
Simon Willnauer 2017-11-30 18:56:21 +01:00
parent 52cefbe742
commit b7d8731bbf
6 changed files with 180 additions and 30 deletions

View File

@ -157,6 +157,10 @@ Bug Fixes
* LUCENE-8055: MemoryIndex.MemoryDocValuesIterator returns 2 documents
instead of 1. (Simon Willnauer)
* LUCENE-8043: Fix document accounting in IndexWriter to prevent writing too many
documents. Once this happens, Lucene refuses to open the index and throws a
CorruptIndexException. (Simon Willnauer, Yonik Seeley, Mike McCandless)
Build
* LUCENE-6144: Upgrade Ivy to 2.4.0; 'ant ivy-bootstrap' now removes old Ivy

View File

@ -549,7 +549,6 @@ final class DocumentsWriter implements Closeable, Accountable {
try {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
@ -681,7 +680,9 @@ final class DocumentsWriter implements Closeable, Accountable {
ticketQueue.addDeletes(flushingDeleteQueue);
}
ticketQueue.forcePurge(writer);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
// we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
// concurrently if we have very small ram buffers this happens quite frequently
assert !flushingDeleteQueue.anyChanges();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
}

View File

@ -118,6 +118,7 @@ class DocumentsWriterPerThread {
void abort() {
//System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name);
aborted = true;
pendingNumDocs.addAndGet(-numDocsInRAM);
try {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "now abort");
@ -522,7 +523,6 @@ class DocumentsWriterPerThread {
*/
void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap) throws IOException {
assert flushedSegment != null;
SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH);

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -551,13 +552,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return true;
}
public synchronized void drop(SegmentCommitInfo info) throws IOException {
public synchronized boolean drop(SegmentCommitInfo info) throws IOException {
final ReadersAndUpdates rld = readerMap.get(info);
if (rld != null) {
assert info == rld.info;
readerMap.remove(info);
rld.dropReaders();
return true;
}
return false;
}
public synchronized long ramBytesUsed() {
@ -726,7 +729,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final Iterator<Map.Entry<SegmentCommitInfo,ReadersAndUpdates>> it = readerMap.entrySet().iterator();
while(it.hasNext()) {
final ReadersAndUpdates rld = it.next().getValue();
try {
if (doSave && rld.writeLiveDocs(directory)) {
// Make sure we only write del docs and field updates for a live segment:
@ -1097,7 +1099,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rollbackSegments = segmentInfos.createBackupSegmentInfos();
}
commitUserData = new HashMap<String,String>(segmentInfos.getUserData()).entrySet();
commitUserData = new HashMap<>(segmentInfos.getUserData()).entrySet();
pendingNumDocs.set(segmentInfos.totalMaxDoc());
@ -1267,6 +1269,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now flush at close");
}
flush(true, true);
waitForMerges();
commitInternal(config.getMergePolicy());
@ -1617,9 +1620,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// merge will skip merging it and will then drop
// it once it's done:
if (mergingSegments.contains(info) == false) {
segmentInfos.remove(info);
pendingNumDocs.addAndGet(-info.info.maxDoc());
readerPool.drop(info);
// it's possible that we invoke this method more than once for the same SCI
// we must only remove the docs once!
boolean dropPendingDocs = segmentInfos.remove(info);
try {
// this is sneaky - we might hit an exception while dropping a reader but then we have already
// removed the segment for the segmentInfo and we lost the pendingDocs update due to that.
// therefore we execute the adjustPendingNumDocs in a finally block to account for that.
dropPendingDocs |= readerPool.drop(info);
} finally {
if (dropPendingDocs) {
adjustPendingNumDocs(-info.info.maxDoc());
}
}
}
}
@ -2342,6 +2355,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Make sure no commit is running, else e.g. we can close while another thread is still fsync'ing:
synchronized(commitLock) {
rollbackInternalNoCommit();
assert pendingNumDocs.get() == segmentInfos.totalMaxDoc()
: "pendingNumDocs " + pendingNumDocs.get() + " != " + segmentInfos.totalMaxDoc() + " totalMaxDoc";
}
}
@ -2354,7 +2370,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
abortMerges();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback: done finish merges");
}
@ -2365,6 +2380,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
docWriter.abort(this); // don't sync on IW here
docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes
purge(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
synchronized(this) {
if (pendingCommit != null) {
@ -2376,15 +2393,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
notifyAll();
}
}
// Don't bother saving any changes in our segmentInfos
readerPool.dropAll(false);
final int totalMaxDoc = segmentInfos.totalMaxDoc();
// Keep the same segmentInfos instance but replace all
// of its SegmentInfo instances so IFD below will remove
// any segments we flushed since the last commit:
segmentInfos.rollbackSegmentInfos(rollbackSegments);
int rollbackMaxDoc = segmentInfos.totalMaxDoc();
// now we need to adjust this back to the rolled back SI but don't set it to the absolute value
// otherwise we might hide internal bugsf
adjustPendingNumDocs(-(totalMaxDoc-rollbackMaxDoc));
if (infoStream.isEnabled("IW") ) {
infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
}
@ -2495,9 +2514,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
try {
synchronized (fullFlushLock) {
long abortedDocCount = docWriter.lockAndAbortAll(this);
pendingNumDocs.addAndGet(-abortedDocCount);
docWriter.lockAndAbortAll(this);
processEvents(false, true);
synchronized (this) {
try {
@ -2505,11 +2522,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
abortMerges();
// Let merges run again
stopMerges = false;
adjustPendingNumDocs(-segmentInfos.totalMaxDoc());
// Remove all segments
pendingNumDocs.addAndGet(-segmentInfos.totalMaxDoc());
segmentInfos.clear();
// Ask deleter to locate unreferenced files & remove them:
deleter.checkpoint(segmentInfos, false);
/* don't refresh the deleter here since there might
* be concurrent indexing requests coming in opening
* files on the directory after we called DW#abort()
@ -2522,12 +2540,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changeCount.incrementAndGet();
segmentInfos.changed();
globalFieldNumberMap.clear();
success = true;
long seqNo = docWriter.deleteQueue.getNextSequenceNumber();
docWriter.setLastSeqNo(seqNo);
return seqNo;
} finally {
docWriter.unlockAllAfterAbortAll(this);
if (!success) {
@ -2660,6 +2676,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized void publishFlushedSegment(SegmentCommitInfo newSegment,
FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
Sorter.DocMap sortMap) throws IOException {
boolean published = false;
try {
// Lock order IW -> BDS
ensureOpen(false);
@ -2695,6 +2712,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
published = true;
checkpoint();
if (packet != null && packet.any() && sortMap != null) {
@ -2705,6 +2723,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} finally {
if (published == false) {
adjustPendingNumDocs(-newSegment.info.maxDoc());
}
flushCount.incrementAndGet();
doAfterFlush();
}
@ -3894,7 +3915,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// doing this makes MockDirWrapper angry in
// TestNRTThreads (LUCENE-5434):
readerPool.drop(merge.info);
// Safe: these files must exist:
deleteNewFiles(merge.info.files());
return false;
@ -3955,9 +3975,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now deduct the deleted docs that we just reclaimed from this
// merge:
int delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc();
int delDocCount;
if (dropSegment) {
// if we drop the segment we have to reduce the pendingNumDocs by merge.totalMaxDocs since we never drop
// the docs when we apply deletes if the segment is currently merged.
delDocCount = merge.totalMaxDoc;
} else {
delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc();
}
assert delDocCount >= 0;
pendingNumDocs.addAndGet(-delDocCount);
adjustPendingNumDocs(-delDocCount);
if (dropSegment) {
assert !segmentInfos.contains(merge.info);
@ -5072,11 +5099,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
private void processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
boolean processed = false;
if (tragedy == null) {
Event event;
while ((event = queue.poll()) != null) {
processed = true;
event.process(this, triggerMerge, forcePurge);
}
}
@ -5088,7 +5113,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* encoded inside the {@link #process(IndexWriter, boolean, boolean)} method.
*
*/
static interface Event {
interface Event {
/**
* Processes the event. This method is called by the {@link IndexWriter}
@ -5111,9 +5136,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* IllegalArgumentException} if it's not allowed. */
private void reserveDocs(long addedNumDocs) {
assert addedNumDocs >= 0;
if (pendingNumDocs.addAndGet(addedNumDocs) > actualMaxDocs) {
if (adjustPendingNumDocs(addedNumDocs) > actualMaxDocs) {
// Reserve failed: put the docs back and throw exc:
pendingNumDocs.addAndGet(-addedNumDocs);
adjustPendingNumDocs(-addedNumDocs);
tooManyDocs(addedNumDocs);
}
}
@ -5142,4 +5167,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
ensureOpen();
return docWriter.getMaxCompletedSequenceNumber();
}
private long adjustPendingNumDocs(long numDocs) {
long count = pendingNumDocs.addAndGet(numDocs);
assert count >= 0 : "pendingNumDocs is negative: " + count;
return count;
}
}

View File

@ -972,8 +972,8 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
/** Remove the provided {@link SegmentCommitInfo}.
*
* <p><b>WARNING</b>: O(N) cost */
public void remove(SegmentCommitInfo si) {
segments.remove(si);
public boolean remove(SegmentCommitInfo si) {
return segments.remove(si);
}
/** Remove the {@link SegmentCommitInfo} at the

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexTooManyDocs extends LuceneTestCase {
/*
* This test produces a boat load of very small segments with lot of deletes which are likely deleting
* the entire segment. see https://issues.apache.org/jira/browse/LUCENE-8043
*/
public void testIndexTooManyDocs() throws IOException, InterruptedException {
Directory dir = newDirectory();
int numMaxDoc = 25;
IndexWriterConfig config = new IndexWriterConfig();
config.setRAMBufferSizeMB(0.000001); // force lots of small segments and logs of concurrent deletes
IndexWriter writer = new IndexWriter(dir, config);
try {
IndexWriter.setMaxDocs(numMaxDoc);
int numThreads = 5 + random().nextInt(5);
Thread[] threads = new Thread[numThreads];
CountDownLatch latch = new CountDownLatch(numThreads);
CountDownLatch indexingDone = new CountDownLatch(numThreads - 2);
AtomicBoolean done = new AtomicBoolean(false);
for (int i = 0; i < numThreads; i++) {
if (i >= 2) {
threads[i] = new Thread(() -> {
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
for (int d = 0; d < 100; d++) {
Document doc = new Document();
String id = Integer.toString(random().nextInt(numMaxDoc * 2));
doc.add(new StringField("id", id, Field.Store.NO));
try {
Term t = new Term("id", id);
if (random().nextInt(5) == 0) {
writer.deleteDocuments(new TermQuery(t));
}
writer.updateDocument(t, doc);
} catch (IOException e) {
throw new AssertionError(e);
} catch (IllegalArgumentException e) {
assertEquals("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs(), e.getMessage());
}
}
indexingDone.countDown();
});
} else {
threads[i] = new Thread(() -> {
try {
latch.countDown();
latch.await();
DirectoryReader open = DirectoryReader.open(writer, true, true);
while (done.get() == false) {
DirectoryReader directoryReader = DirectoryReader.openIfChanged(open);
if (directoryReader != null) {
open.close();
open = directoryReader;
}
}
IOUtils.closeWhileHandlingException(open);
} catch (Exception e) {
throw new AssertionError(e);
}
});
}
threads[i].start();
}
indexingDone.await();
done.set(true);
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
writer.close();
dir.close();
} finally {
IndexWriter.setMaxDocs(IndexWriter.MAX_DOCS);
}
}
}