LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads

Index/Update Threads try to help out flushing pending document buffers to
disk. This change adds an expert setting to opt ouf of this behavior unless
flusing is falling behind.
This commit is contained in:
Simon Willnauer 2017-12-06 18:20:48 +01:00
parent a3141457d6
commit ede46fe6e9
7 changed files with 135 additions and 2 deletions

View File

@ -59,6 +59,13 @@ API Changes
* LUCENE-8051: LevensteinDistance renamed to LevenshteinDistance.
(Pulak Ghosh via Adrien Grand)
Improvements
* LUCENE-8081: Allow IndexWriter to opt out of flushing on indexing threads
Index/Update Threads try to help out flushing pending document buffers to
disk. This change adds an expert setting to opt ouf of this behavior unless
flusing is falling behind. (Simon Willnauer)
======================= Lucene 7.2.0 =======================
API Changes

View File

@ -392,7 +392,8 @@ final class DocumentsWriter implements Closeable, Accountable {
private boolean preUpdate() throws IOException, AbortingException {
ensureOpen();
boolean hasEvents = false;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall:
do {
// Try pick up pending threads here if possible
@ -412,7 +413,7 @@ final class DocumentsWriter implements Closeable, Accountable {
hasEvents |= applyAllDeletes(deleteQueue);
if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT);
} else {
} else if (config.checkPendingFlushOnUpdate) {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
hasEvents |= doFlush(nextPendingFlush);

View File

@ -479,5 +479,10 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
sb.append("writer=").append(writer.get()).append("\n");
return sb.toString();
}
@Override
public IndexWriterConfig setCheckPendingFlushUpdate(boolean checkPendingFlushOnUpdate) {
return (IndexWriterConfig) super.setCheckPendingFlushUpdate(checkPendingFlushOnUpdate);
}
}

View File

@ -103,6 +103,9 @@ public class LiveIndexWriterConfig {
/** The field names involved in the index sort */
protected Set<String> indexSortFields = Collections.emptySet();
/** if an indexing thread should check for pending flushes on update in order to help out on a full flush*/
protected volatile boolean checkPendingFlushOnUpdate = true;
// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer;
@ -426,6 +429,29 @@ public class LiveIndexWriterConfig {
return indexSortFields;
}
/**
* Expert: Returns if indexing threads check for pending flushes on update in order
* to help our flushing indexing buffers to disk
* @lucene.eperimental
*/
public boolean isCheckPendingFlushOnUpdate() {
return checkPendingFlushOnUpdate;
}
/**
* Expert: sets if indexing threads check for pending flushes on update in order
* to help our flushing indexing buffers to disk. As a consequence, threads calling
* {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter)} or {@link IndexWriter#flush()} will
* be the only thread writing segments to disk unless flushes are falling behind. If indexing is stalled
* due to too many pending flushes indexing threads will help our writing pending segment flushes to disk.
*
* @lucene.eperimental
*/
public LiveIndexWriterConfig setCheckPendingFlushUpdate(boolean checkPendingFlushOnUpdate) {
this.checkPendingFlushOnUpdate = checkPendingFlushOnUpdate;
return this;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -448,6 +474,7 @@ public class LiveIndexWriterConfig {
sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n");
sb.append("commitOnClose=").append(getCommitOnClose()).append("\n");
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
return sb.toString();
}
}

View File

@ -29,6 +29,7 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -37,6 +38,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.CannedTokenStream;
@ -2877,4 +2879,89 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close();
}
public void testCheckPendingFlushPostUpdate() throws IOException, InterruptedException {
MockDirectoryWrapper dir = newMockDirectory();
Set<String> flushingThreads = Collections.synchronizedSet(new HashSet<>());
dir.failOn(new MockDirectoryWrapper.Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("flush".equals(trace[i].getMethodName())
&& "org.apache.lucene.index.DocumentsWriterPerThread".equals(trace[i].getClassName())) {
flushingThreads.add(Thread.currentThread().getName());
break;
}
}
}
});
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()
.setCheckPendingFlushUpdate(false)
.setMaxBufferedDocs(Integer.MAX_VALUE)
.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH));
AtomicBoolean done = new AtomicBoolean(false);
int numThreads = 1 + random().nextInt(3);
CountDownLatch latch = new CountDownLatch(numThreads);
Set<String> indexingThreads = new HashSet<>();
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
latch.countDown();
int numDocs = 0;
while (done.get() == false) {
Document doc = new Document();
doc.add(new StringField("id", "foo", Field.Store.YES));
try {
w.addDocument(doc);
} catch (Exception e) {
throw new AssertionError(e);
}
if (numDocs++ % 10 == 0) {
Thread.yield();
}
}
});
indexingThreads.add(threads[i].getName());
threads[i].start();
}
latch.await();
try {
int numIters = rarely() ? 1 + random().nextInt(5) : 1;
for (int i = 0; i < numIters; i++) {
waitForDocs(w);
w.commit();
assertTrue(flushingThreads.toString(), flushingThreads.contains(Thread.currentThread().getName()));
flushingThreads.retainAll(indexingThreads);
assertTrue(flushingThreads.toString(), flushingThreads.isEmpty());
}
w.getConfig().setCheckPendingFlushUpdate(true);
numIters = 0;
while (true) {
assertFalse("should finish in less than 100 iterations", numIters++ >= 100);
waitForDocs(w);
w.flush();
flushingThreads.retainAll(indexingThreads);
if (flushingThreads.isEmpty() == false) {
break;
}
}
} finally {
done.set(true);
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
IOUtils.close(w, dir);
}
}
private static void waitForDocs(IndexWriter w) {
int numDocsInRam = w.numRamDocs();
while(true) {
if (numDocsInRam != w.numRamDocs()) {
return;
}
}
}
}

View File

@ -74,6 +74,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertEquals(Codec.getDefault(), conf.getCodec());
assertEquals(InfoStream.getDefault(), conf.getInfoStream());
assertEquals(IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM, conf.getUseCompoundFile());
assertTrue(conf.isCheckPendingFlushOnUpdate());
// Sanity check - validate that all getters are covered.
Set<String> getters = new HashSet<>();
getters.add("getAnalyzer");
@ -98,6 +99,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
getters.add("getCodec");
getters.add("getInfoStream");
getters.add("getUseCompoundFile");
getters.add("isCheckPendingFlushOnUpdate");
for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {

View File

@ -141,6 +141,7 @@ import com.carrotsearch.randomizedtesting.rules.StaticFieldsInvariantRule;
import junit.framework.AssertionFailedError;
import static com.carrotsearch.randomizedtesting.RandomizedTest.frequently;
import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAsBoolean;
import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAsInt;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
@ -988,6 +989,9 @@ public abstract class LuceneTestCase extends Assert {
}
c.setUseCompoundFile(r.nextBoolean());
c.setReaderPooling(r.nextBoolean());
if (rarely(r)) {
c.setCheckPendingFlushUpdate(false);
}
return c;
}