#6541 improve testConcurrentAccess perf

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-07-26 11:30:31 +02:00
parent 9e047ab412
commit d8a890f71e
1 changed files with 31 additions and 35 deletions

View File

@ -15,20 +15,20 @@ package org.eclipse.jetty.util;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await;
import static org.hamcrest.MatcherAssert.assertThat;
@ -212,21 +212,20 @@ public class BlockingArrayQueueTest
}
@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testConcurrentAccess() throws Exception
{
final int THREADS = 50;
final int THREADS = 32;
final int LOOPS = 1000;
final BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS);
BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS);
final ConcurrentLinkedQueue<Integer> produced = new ConcurrentLinkedQueue<>();
final ConcurrentLinkedQueue<Integer> consumed = new ConcurrentLinkedQueue<>();
Set<Integer> produced = ConcurrentHashMap.newKeySet();
Set<Integer> consumed = ConcurrentHashMap.newKeySet();
final AtomicBoolean running = new AtomicBoolean(true);
AtomicBoolean consumersRunning = new AtomicBoolean(true);
// start consumers
final CyclicBarrier barrier0 = new CyclicBarrier(THREADS + 1);
CyclicBarrier consumersBarrier = new CyclicBarrier(THREADS + 1);
for (int i = 0; i < THREADS; i++)
{
new Thread()
@ -234,20 +233,18 @@ public class BlockingArrayQueueTest
@Override
public void run()
{
final Random random = new Random();
setPriority(getPriority() - 1);
try
{
while (running.get())
while (consumersRunning.get())
{
int r = 1 + random.nextInt(10);
int r = 1 + ThreadLocalRandom.current().nextInt(10);
if (r % 2 == 0)
{
Integer msg = queue.poll();
if (msg == null)
{
Thread.sleep(1 + random.nextInt(10));
Thread.sleep(ThreadLocalRandom.current().nextInt(2));
continue;
}
consumed.add(msg);
@ -268,7 +265,7 @@ public class BlockingArrayQueueTest
{
try
{
barrier0.await();
consumersBarrier.await();
}
catch (Exception e)
{
@ -280,7 +277,7 @@ public class BlockingArrayQueueTest
}
// start producers
final CyclicBarrier barrier1 = new CyclicBarrier(THREADS + 1);
CyclicBarrier producersBarrier = new CyclicBarrier(THREADS + 1);
for (int i = 0; i < THREADS; i++)
{
final int id = i;
@ -289,16 +286,15 @@ public class BlockingArrayQueueTest
@Override
public void run()
{
final Random random = new Random();
try
{
for (int j = 0; j < LOOPS; j++)
{
Integer msg = random.nextInt();
Integer msg = ThreadLocalRandom.current().nextInt();
produced.add(msg);
if (!queue.offer(msg))
throw new Exception(id + " FULL! " + queue.size());
Thread.sleep(1 + random.nextInt(10));
Thread.sleep(ThreadLocalRandom.current().nextInt(2));
}
}
catch (Exception e)
@ -309,7 +305,7 @@ public class BlockingArrayQueueTest
{
try
{
barrier1.await();
producersBarrier.await();
}
catch (Exception e)
{
@ -320,22 +316,22 @@ public class BlockingArrayQueueTest
}.start();
}
barrier1.await();
int size = queue.size();
int last = size - 1;
while (size > 0 && size != last)
producersBarrier.await();
AtomicInteger last = new AtomicInteger(queue.size() - 1);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
last = size;
Thread.sleep(500);
size = queue.size();
}
running.set(false);
barrier0.await();
int size = queue.size();
if (size == 0 && last.get() == size)
return true;
last.set(size);
return false;
});
HashSet<Integer> prodSet = new HashSet<>(produced);
HashSet<Integer> consSet = new HashSet<>(consumed);
consumersRunning.set(false);
consumersBarrier.await();
assertEquals(prodSet, consSet);
assertEquals(produced, consumed);
}
@Test