#6541 improve testConcurrentAccess perf

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-07-26 11:30:31 +02:00
parent 9726a0987f
commit 020770f82d
1 changed files with 31 additions and 35 deletions

View File

@ -19,18 +19,18 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Random; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -214,21 +214,20 @@ public class BlockingArrayQueueTest
} }
@Test @Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testConcurrentAccess() throws Exception public void testConcurrentAccess() throws Exception
{ {
final int THREADS = 50; final int THREADS = 32;
final int LOOPS = 1000; final int LOOPS = 1000;
final BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS); BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS);
final ConcurrentLinkedQueue<Integer> produced = new ConcurrentLinkedQueue<>(); Set<Integer> produced = ConcurrentHashMap.newKeySet();
final ConcurrentLinkedQueue<Integer> consumed = new ConcurrentLinkedQueue<>(); Set<Integer> consumed = ConcurrentHashMap.newKeySet();
final AtomicBoolean running = new AtomicBoolean(true); AtomicBoolean consumersRunning = new AtomicBoolean(true);
// start consumers // start consumers
final CyclicBarrier barrier0 = new CyclicBarrier(THREADS + 1); CyclicBarrier consumersBarrier = new CyclicBarrier(THREADS + 1);
for (int i = 0; i < THREADS; i++) for (int i = 0; i < THREADS; i++)
{ {
new Thread() new Thread()
@ -236,20 +235,18 @@ public class BlockingArrayQueueTest
@Override @Override
public void run() public void run()
{ {
final Random random = new Random();
setPriority(getPriority() - 1); setPriority(getPriority() - 1);
try try
{ {
while (running.get()) while (consumersRunning.get())
{ {
int r = 1 + random.nextInt(10); int r = 1 + ThreadLocalRandom.current().nextInt(10);
if (r % 2 == 0) if (r % 2 == 0)
{ {
Integer msg = queue.poll(); Integer msg = queue.poll();
if (msg == null) if (msg == null)
{ {
Thread.sleep(1 + random.nextInt(10)); Thread.sleep(ThreadLocalRandom.current().nextInt(2));
continue; continue;
} }
consumed.add(msg); consumed.add(msg);
@ -270,7 +267,7 @@ public class BlockingArrayQueueTest
{ {
try try
{ {
barrier0.await(); consumersBarrier.await();
} }
catch (Exception e) catch (Exception e)
{ {
@ -282,7 +279,7 @@ public class BlockingArrayQueueTest
} }
// start producers // start producers
final CyclicBarrier barrier1 = new CyclicBarrier(THREADS + 1); CyclicBarrier producersBarrier = new CyclicBarrier(THREADS + 1);
for (int i = 0; i < THREADS; i++) for (int i = 0; i < THREADS; i++)
{ {
final int id = i; final int id = i;
@ -291,16 +288,15 @@ public class BlockingArrayQueueTest
@Override @Override
public void run() public void run()
{ {
final Random random = new Random();
try try
{ {
for (int j = 0; j < LOOPS; j++) for (int j = 0; j < LOOPS; j++)
{ {
Integer msg = random.nextInt(); Integer msg = ThreadLocalRandom.current().nextInt();
produced.add(msg); produced.add(msg);
if (!queue.offer(msg)) if (!queue.offer(msg))
throw new Exception(id + " FULL! " + queue.size()); throw new Exception(id + " FULL! " + queue.size());
Thread.sleep(1 + random.nextInt(10)); Thread.sleep(ThreadLocalRandom.current().nextInt(2));
} }
} }
catch (Exception e) catch (Exception e)
@ -311,7 +307,7 @@ public class BlockingArrayQueueTest
{ {
try try
{ {
barrier1.await(); producersBarrier.await();
} }
catch (Exception e) catch (Exception e)
{ {
@ -322,22 +318,22 @@ public class BlockingArrayQueueTest
}.start(); }.start();
} }
barrier1.await(); producersBarrier.await();
int size = queue.size();
int last = size - 1; AtomicInteger last = new AtomicInteger(queue.size() - 1);
while (size > 0 && size != last) await().atMost(5, TimeUnit.SECONDS).until(() ->
{ {
last = size; int size = queue.size();
Thread.sleep(500); if (size == 0 && last.get() == size)
size = queue.size(); return true;
} last.set(size);
running.set(false); return false;
barrier0.await(); });
HashSet<Integer> prodSet = new HashSet<>(produced); consumersRunning.set(false);
HashSet<Integer> consSet = new HashSet<>(consumed); consumersBarrier.await();
assertEquals(prodSet, consSet); assertEquals(produced, consumed);
} }
@Test @Test