LUCENE-8206: improvements to TestIndexWriterWithThreads.

This commit is contained in:
Dawid Weiss 2018-03-18 19:14:18 +01:00
parent 916ed60eea
commit d09cc1cb0b
1 changed files with 55 additions and 63 deletions

View File

@ -18,7 +18,8 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@ -53,6 +54,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
// Used by test cases below
private static class IndexerThread extends Thread {
private final CyclicBarrier syncStart;
boolean diskFull;
Throwable error;
AlreadyClosedException ace;
@ -60,13 +62,20 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
boolean noErrors;
volatile int addCount;
public IndexerThread(IndexWriter writer, boolean noErrors) {
public IndexerThread(IndexWriter writer, boolean noErrors, CyclicBarrier syncStart) {
this.writer = writer;
this.noErrors = noErrors;
this.syncStart = syncStart;
}
@Override
public void run() {
try {
syncStart.await();
} catch (BrokenBarrierException | InterruptedException e) {
error = e;
throw new RuntimeException(e);
}
final Document doc = new Document();
FieldType customType = new FieldType(TextField.TYPE_STORED);
@ -79,7 +88,6 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
int idUpto = 0;
int fullCount = 0;
final long stopTime = System.currentTimeMillis() + 200;
do {
try {
@ -114,7 +122,6 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
// OK: abort closes the writer
break;
} catch (Throwable t) {
//t.printStackTrace(System.out);
if (noErrors) {
System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
t.printStackTrace(System.out);
@ -122,7 +129,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
}
break;
}
} while(System.currentTimeMillis() < stopTime);
} while (true);
}
}
@ -133,7 +140,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
int NUM_THREADS = 3;
final int numIterations = TEST_NIGHTLY ? 10 : 3;
for(int iter=0;iter<numIterations;iter++) {
for (int iter=0;iter<numIterations;iter++) {
if (VERBOSE) {
System.out.println("\nTEST: iter=" + iter);
}
@ -149,15 +156,15 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
dir.setMaxSizeInBytes(4*1024+20*iter);
CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
for(int i=0;i<NUM_THREADS;i++)
threads[i] = new IndexerThread(writer, true);
for(int i=0;i<NUM_THREADS;i++)
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new IndexerThread(writer, true, syncStart);
threads[i].start();
}
syncStart.await();
for(int i=0;i<NUM_THREADS;i++) {
for (int i = 0; i < NUM_THREADS; i++) {
// Without fix for LUCENE-1130: one of the
// threads will hang
threads[i].join();
@ -203,16 +210,17 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
);
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
for(int i=0;i<NUM_THREADS;i++)
threads[i] = new IndexerThread(writer, false);
for(int i=0;i<NUM_THREADS;i++)
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new IndexerThread(writer, false, syncStart);
threads[i].start();
}
syncStart.await();
boolean done = false;
while(!done) {
while (!done) {
Thread.sleep(100);
for(int i=0;i<NUM_THREADS;i++)
// only stop when at least one thread has added a doc
@ -238,6 +246,8 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
// Without fix for LUCENE-1130: one of the
// threads will hang
threads[i].join();
// [DW] this is unreachable once join() returns a thread cannot be alive.
if (threads[i].isAlive())
fail("thread seems to be hung");
}
@ -266,7 +276,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
int NUM_THREADS = 3;
for(int iter=0;iter<2;iter++) {
for (int iter = 0; iter < 2; iter++) {
if (VERBOSE) {
System.out.println("TEST: iter=" + iter);
}
@ -282,20 +292,18 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
);
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
for(int i=0;i<NUM_THREADS;i++)
threads[i] = new IndexerThread(writer, true);
for(int i=0;i<NUM_THREADS;i++)
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new IndexerThread(writer, true, syncStart);
threads[i].start();
Thread.sleep(10);
}
syncStart.await();
dir.failOn(failure);
failure.setDoFail();
for(int i=0;i<NUM_THREADS;i++) {
for (int i = 0; i < NUM_THREADS; i++) {
threads[i].join();
assertTrue("hit unexpected Throwable", threads[i].error == null);
}
@ -502,39 +510,30 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
// and closes before the second IndexWriter time's out trying to get the Lock,
// we should see both documents
public void testOpenTwoIndexWritersOnDifferentThreads() throws IOException, InterruptedException {
final Directory dir = newDirectory();
CountDownLatch oneIWConstructed = new CountDownLatch(1);
DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(
dir, oneIWConstructed);
DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(
dir, oneIWConstructed);
try (final Directory dir = newDirectory()) {
CyclicBarrier syncStart = new CyclicBarrier(2);
DelayedIndexAndCloseRunnable thread1 = new DelayedIndexAndCloseRunnable(dir, syncStart);
DelayedIndexAndCloseRunnable thread2 = new DelayedIndexAndCloseRunnable(dir, syncStart);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
thread1.start();
thread2.start();
oneIWConstructed.await();
thread1.startIndexing();
thread2.startIndexing();
thread1.join();
thread2.join();
// ensure the directory is closed if we hit the timeout and throw assume
// TODO: can we improve this in LuceneTestCase? I dont know what the logic would be...
try {
assumeFalse("aborting test: timeout obtaining lock", thread1.failure instanceof LockObtainFailedException);
assumeFalse("aborting test: timeout obtaining lock", thread2.failure instanceof LockObtainFailedException);
if (thread1.failure instanceof LockObtainFailedException ||
thread2.failure instanceof LockObtainFailedException) {
// We only care about the situation when the two writers succeeded.
return;
}
assertFalse("Failed due to: " + thread1.failure, thread1.failed);
assertFalse("Failed due to: " + thread2.failure, thread2.failed);
// now verify that we have two documents in the index
IndexReader reader = DirectoryReader.open(dir);
assertEquals("IndexReader should have one document per thread running", 2,
reader.numDocs());
reader.close();
} finally {
dir.close();
}
}
@ -542,17 +541,12 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
private final Directory dir;
boolean failed = false;
Throwable failure = null;
private final CountDownLatch startIndexing = new CountDownLatch(1);
private CountDownLatch iwConstructed;
private CyclicBarrier syncStart;
public DelayedIndexAndCloseRunnable(Directory dir,
CountDownLatch iwConstructed) {
CyclicBarrier syncStart) {
this.dir = dir;
this.iwConstructed = iwConstructed;
}
public void startIndexing() {
this.startIndexing.countDown();
this.syncStart = syncStart;
}
@Override
@ -561,16 +555,14 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
Document doc = new Document();
Field field = newTextField("field", "testData", Field.Store.YES);
doc.add(field);
syncStart.await();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
iwConstructed.countDown();
startIndexing.await();
writer.addDocument(doc);
writer.close();
} catch (Throwable e) {
failed = true;
failure = e;
failure.printStackTrace(System.out);
return;
}
}
}