LUCENE-7894: make sure IW finishes segment flush so we don't see a temporary drop in IW.maxDoc()

This commit is contained in:
Mike McCandless 2017-07-02 15:32:44 -04:00
parent 80ae5e29d5
commit ee1edd9d46
3 changed files with 86 additions and 9 deletions

View File

@ -544,11 +544,11 @@ final class DocumentsWriter implements Closeable, Accountable {
dwptSuccess = true;
} finally {
subtractFlushedNumDocs(flushingDocsInRam);
if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
hasEvents = true;
}
if (!dwptSuccess) {
if (dwptSuccess == false) {
putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
hasEvents = true;
}
@ -582,6 +582,10 @@ final class DocumentsWriter implements Closeable, Accountable {
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
writer.doAfterSegmentFlushed(false, false);
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
@ -605,7 +609,7 @@ final class DocumentsWriter implements Closeable, Accountable {
void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
while (numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed) == false) {
oldValue = numDocsInRAM.get();
}
assert numDocsInRAM.get() >= 0;
@ -726,10 +730,9 @@ final class DocumentsWriter implements Closeable, Accountable {
static final class ApplyDeletesEvent implements Event {
static final Event INSTANCE = new ApplyDeletesEvent();
private int instCount = 0;
private ApplyDeletesEvent() {
assert instCount == 0;
instCount++;
// only one instance
}
@Override
@ -740,10 +743,9 @@ final class DocumentsWriter implements Closeable, Accountable {
static final class ForcedPurgeEvent implements Event {
static final Event INSTANCE = new ForcedPurgeEvent();
private int instCount = 0;
private ForcedPurgeEvent() {
assert instCount == 0;
instCount++;
// only one instance
}
@Override

View File

@ -119,6 +119,10 @@ class DocumentsWriterFlushQueue {
synchronized (this) {
// finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
// we hold the purgeLock so no other thread should have polled:
assert poll == head;
ticketCount.decrementAndGet();
assert poll == head;
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
public class TestIndexManyDocuments extends LuceneTestCase {
public void test() throws Exception {
Directory dir = newFSDirectory(createTempDir());
IndexWriterConfig iwc = new IndexWriterConfig();
iwc.setMaxBufferedDocs(TestUtil.nextInt(random(), 100, 2000));
int numDocs = atLeast(10000);
final IndexWriter w = new IndexWriter(dir, iwc);
final AtomicInteger count = new AtomicInteger();
Thread[] threads = new Thread[2];
for(int i=0;i<threads.length;i++) {
threads[i] = new Thread() {
@Override
public void run() {
while (count.getAndIncrement() < numDocs) {
Document doc = new Document();
doc.add(newTextField("field", "text", Field.Store.NO));
try {
w.addDocument(doc);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
};
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
assertEquals("lost " + (numDocs - w.maxDoc()) + " documents; maxBufferedDocs=" + iwc.getMaxBufferedDocs(), numDocs, w.maxDoc());
w.close();
IndexReader r = DirectoryReader.open(dir);
assertEquals(numDocs, r.maxDoc());
IOUtils.close(r, dir);
}
}