mirror of https://github.com/apache/lucene.git
Concurrently flush next buffer during commit in RandomIndexWriter (#607)
This is a spinn-off from `LUCENE-8700` that is satisfied by IndexWriter#flushNextBuffer. The idea here is to additionally call flushNextBuffer in RandomIndexWriter for better test coverage. This is a test-only change.
This commit is contained in:
parent
c2a6772f1e
commit
ffb1fc83de
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
|
@ -198,16 +199,7 @@ public class RandomIndexWriter implements Closeable {
|
|||
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
|
||||
if (docCount++ == flushAt) {
|
||||
if (r.nextBoolean()) {
|
||||
if (LuceneTestCase.VERBOSE) {
|
||||
System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount);
|
||||
}
|
||||
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
|
||||
int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1));
|
||||
for (int i = 0; i < numFlushes; i++) {
|
||||
if (w.flushNextBuffer() == false) {
|
||||
break; // stop once we didn't flush anything
|
||||
}
|
||||
}
|
||||
flushAllBuffersSequentially();
|
||||
} else if (r.nextBoolean()) {
|
||||
if (LuceneTestCase.VERBOSE) {
|
||||
System.out.println("RIW.add/updateDocument: now doing a flush at docCount=" + docCount);
|
||||
|
@ -227,6 +219,19 @@ public class RandomIndexWriter implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private void flushAllBuffersSequentially() throws IOException {
|
||||
if (LuceneTestCase.VERBOSE) {
|
||||
System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount);
|
||||
}
|
||||
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
|
||||
int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1));
|
||||
for (int i = 0; i < numFlushes; i++) {
|
||||
if (w.flushNextBuffer() == false) {
|
||||
break; // stop once we didn't flush anything
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
|
||||
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
|
||||
long seqNo = w.addDocuments(docs);
|
||||
|
@ -312,6 +317,27 @@ public class RandomIndexWriter implements Closeable {
|
|||
|
||||
public long commit() throws IOException {
|
||||
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
|
||||
if (r.nextInt(10) == 0) {
|
||||
AtomicReference<Throwable> exception = new AtomicReference<>();
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
flushAllBuffersSequentially();
|
||||
} catch (IOException e) {
|
||||
exception.set(e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
long seqId = w.commit();
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
if (exception.get() != null) {
|
||||
throw new AssertionError(exception.get());
|
||||
}
|
||||
return seqId;
|
||||
}
|
||||
return w.commit();
|
||||
}
|
||||
|
||||
|
@ -328,8 +354,8 @@ public class RandomIndexWriter implements Closeable {
|
|||
return getReader(true, false);
|
||||
}
|
||||
|
||||
private boolean doRandomForceMerge = true;
|
||||
private boolean doRandomForceMergeAssert = true;
|
||||
private boolean doRandomForceMerge;
|
||||
private boolean doRandomForceMergeAssert;
|
||||
|
||||
public void forceMergeDeletes(boolean doWait) throws IOException {
|
||||
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
|
||||
|
@ -491,7 +517,7 @@ public class RandomIndexWriter implements Closeable {
|
|||
* Simple interface that is executed for each <tt>TP</tt> {@link InfoStream} component
|
||||
* message. See also {@link RandomIndexWriter#mockIndexWriter(Random, Directory, IndexWriterConfig, TestPoint)}
|
||||
*/
|
||||
public static interface TestPoint {
|
||||
public abstract void apply(String message);
|
||||
public interface TestPoint {
|
||||
void apply(String message);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue