From 5badacf391c810ed208dec5d3026f7a76e1a45ae Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 8 Feb 2018 14:23:24 -0500 Subject: [PATCH] Fix race condition in queue size test The queue size test has a race condition. Namely the offering thread can run so quickly completing all of its offering iterations before the queue size thread ever has a chance to run a single size poll iteration. This means that the size will never actually be polled and the test can spuriously fail. What we really want to do here, since this test is checking for a race condition between polling the size of the queue and offers to the queue, we want to execute each iteration in lockstep giving the threads multiple changes for the race between polling the size and offers to occur. This commit addresses this by running the two threads in lockstep for multiple iterations so that they have multiple chances to race. Relates #28584 --- .../concurrent/SizeBlockingQueueTests.java | 56 +++++++++---------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueueTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueueTests.java index ebb49c59803..cc8d7b5d2b3 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueueTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueueTests.java @@ -23,8 +23,8 @@ import org.elasticsearch.test.ESTestCase; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; @@ -46,45 +46,41 @@ public class SizeBlockingQueueTests extends ESTestCase { sizeBlockingQueue.offer(i); } - final CountDownLatch latch = new CountDownLatch(1); - final AtomicBoolean spin = new AtomicBoolean(true); - final AtomicInteger maxSize = new AtomicInteger(); - - // this thread will repeatedly poll the size of the queue keeping track of the maximum size that it sees - final Thread queueSizeThread = new Thread(() -> { - try { - latch.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - while (spin.get()) { - maxSize.set(Math.max(maxSize.get(), sizeBlockingQueue.size())); - } - }); - queueSizeThread.start(); + final int iterations = 1 << 16; + final CyclicBarrier barrier = new CyclicBarrier(2); // this thread will try to offer items to the queue while the queue size thread is polling the size final Thread queueOfferThread = new Thread(() -> { - try { - latch.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - for (int i = 0; i < 4096; i++) { + for (int i = 0; i < iterations; i++) { + try { + // synchronize each iteration of checking the size with each iteration of offering, each iteration is a race + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } sizeBlockingQueue.offer(capacity + i); } }); queueOfferThread.start(); - // synchronize the start of the two threads - latch.countDown(); + // this thread will repeatedly poll the size of the queue keeping track of the maximum size that it sees + final AtomicInteger maxSize = new AtomicInteger(); + final Thread queueSizeThread = new Thread(() -> { + for (int i = 0; i < iterations; i++) { + try { + // synchronize each iteration of checking the size with each iteration of offering, each iteration is a race + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + maxSize.set(Math.max(maxSize.get(), sizeBlockingQueue.size())); + } + }); + queueSizeThread.start(); - // wait for the offering thread to finish + // wait for the threads to finish queueOfferThread.join(); - - // stop the queue size thread - spin.set(false); queueSizeThread.join(); // the maximum size of the queue should be equal to the capacity