Ensure `InternalEngineTests.testConcurrentWritesAndCommits` doesn't pile up commits (#25367)
`InternalEngineTests.testConcurrentWritesAndCommits` can be very heavy on disks if threads are slow and the main thread keeps on pulling commit points holding on to many many segments. This commit adds some quadratic backoff to not pile up too many commits and to make sure indexing threads can make progress. This also now doesn't do busy waiting but waits on a latch with a timeout. Closes #25110
This commit is contained in:
parent
a077fa9b07
commit
59b625121b
|
@ -162,6 +162,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -2135,6 +2136,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final int numDocsPerThread = randomIntBetween(500, 1000);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
|
||||
final List<Thread> indexingThreads = new ArrayList<>();
|
||||
final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
|
||||
// create N indexing threads to index documents simultaneously
|
||||
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
|
||||
final int threadIdx = threadNum;
|
||||
|
@ -2149,7 +2151,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
doneLatch.countDown();
|
||||
}
|
||||
|
||||
});
|
||||
indexingThreads.add(indexingThread);
|
||||
}
|
||||
|
@ -2159,12 +2164,19 @@ public class InternalEngineTests extends ESTestCase {
|
|||
thread.start();
|
||||
}
|
||||
barrier.await(); // wait for indexing threads to all be ready to start
|
||||
|
||||
int commitLimit = randomIntBetween(10, 20);
|
||||
long sleepTime = 1;
|
||||
// create random commit points
|
||||
boolean doneIndexing;
|
||||
do {
|
||||
doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0;
|
||||
doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
|
||||
commits.add(engine.acquireIndexCommit(true));
|
||||
if (commits.size() > commitLimit) { // don't keep on piling up too many commits
|
||||
IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
|
||||
// we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
|
||||
// this will reduce pressure on disks and will allow threads to make progress without piling up too many commits
|
||||
sleepTime = sleepTime * 2;
|
||||
}
|
||||
} while (doneIndexing == false);
|
||||
|
||||
// now, verify all the commits have the correct docs according to the user commit data
|
||||
|
|
Loading…
Reference in New Issue