NO-JIRA improvements on MultiThreadCriticalMeasureTest
Avoiding intermittent failures speed up test
This commit is contained in:
parent
4e52758a62
commit
a68510279b
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.activemq.artemis.utils.critical;
|
package org.apache.activemq.artemis.utils.critical;
|
||||||
|
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -35,11 +37,11 @@ public class MultiThreadCriticalMeasureTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiThread() throws Throwable {
|
public void testMultiThread() throws Throwable {
|
||||||
int THREADS = 100;
|
int THREADS = 20;
|
||||||
|
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
|
||||||
AtomicInteger errors = new AtomicInteger(0);
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
Thread[] threads = new Thread[THREADS];
|
|
||||||
AtomicBoolean running = new AtomicBoolean(true);
|
AtomicBoolean running = new AtomicBoolean(true);
|
||||||
ReusableLatch latch = new ReusableLatch(0);
|
AtomicBoolean load = new AtomicBoolean(true);
|
||||||
ReusableLatch latchOnMeasure = new ReusableLatch(0);
|
ReusableLatch latchOnMeasure = new ReusableLatch(0);
|
||||||
try {
|
try {
|
||||||
CriticalMeasure measure = new CriticalMeasure(null, 0);
|
CriticalMeasure measure = new CriticalMeasure(null, 0);
|
||||||
|
@ -48,12 +50,15 @@ public class MultiThreadCriticalMeasureTest {
|
||||||
|
|
||||||
Runnable runnable = () -> {
|
Runnable runnable = () -> {
|
||||||
try {
|
try {
|
||||||
logger.debug("Thread " + Thread.currentThread().getName() + " waiting to Star");
|
logger.debug("Thread " + Thread.currentThread().getName() + " waiting to Start");
|
||||||
barrier.await();
|
barrier.await();
|
||||||
logger.debug("Thread " + Thread.currentThread().getName() + " Started");
|
logger.debug("Thread " + Thread.currentThread().getName() + " Started");
|
||||||
while (running.get()) {
|
while (running.get()) {
|
||||||
if (!latch.await(1, TimeUnit.NANOSECONDS)) {
|
if (!load.get()) {
|
||||||
latch.await();
|
// 1st barrier will let the unit test do its job
|
||||||
|
barrier.await();
|
||||||
|
// 2nd barrier waiting the test to finish its job
|
||||||
|
barrier.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
try (AutoCloseable closeable = measure.measure()) {
|
try (AutoCloseable closeable = measure.measure()) {
|
||||||
|
@ -67,47 +72,45 @@ public class MultiThreadCriticalMeasureTest {
|
||||||
};
|
};
|
||||||
|
|
||||||
for (int i = 0; i < THREADS; i++) {
|
for (int i = 0; i < THREADS; i++) {
|
||||||
threads[i] = new Thread(runnable, "t=" + i);
|
executorService.execute(runnable);
|
||||||
threads[i].start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("Going to release it now");
|
logger.debug("Going to release it now");
|
||||||
barrier.await();
|
barrier.await();
|
||||||
|
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
|
// Waiting some time to have load generated
|
||||||
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
|
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
|
||||||
logger.debug("Count up " + i);
|
|
||||||
|
|
||||||
// simulating load down on the system... this will freeze load
|
// Disable load, so the measure threads will wait on the barrier
|
||||||
latch.countUp();
|
load.set(false);
|
||||||
|
|
||||||
|
// first barrier waiting the simulated load to stop
|
||||||
|
barrier.await(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// waiting a few milliseconds as the bug was about measuring load after a no load
|
||||||
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(20));
|
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(20));
|
||||||
Assert.assertFalse(measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(10), false));
|
Assert.assertFalse(measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(10), false));
|
||||||
logger.debug("Count down");
|
logger.debug("Count down");
|
||||||
|
|
||||||
// this will resume load
|
// letting load to happen again
|
||||||
latch.countDown();
|
load.set(true);
|
||||||
|
// Leaving barrier out so test is back on generating load
|
||||||
|
barrier.await(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
latchOnMeasure.countUp();
|
latchOnMeasure.countUp();
|
||||||
|
|
||||||
Assert.assertTrue(Wait.waitFor(() -> measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(100), false), 1_000, 1));
|
Assert.assertTrue(Wait.waitFor(() -> measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(100), false), 1_000, 1));
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
latch.countDown();
|
load.set(true);
|
||||||
latchOnMeasure.countDown();
|
|
||||||
running.set(false);
|
running.set(false);
|
||||||
|
latchOnMeasure.countDown();
|
||||||
|
|
||||||
Assert.assertEquals(0, errors.get());
|
Assert.assertEquals(0, errors.get());
|
||||||
|
executorService.shutdown();
|
||||||
for (Thread t : threads) {
|
Wait.assertTrue(executorService::isShutdown);
|
||||||
if (t != null) {
|
Wait.assertTrue(executorService::isTerminated, 5000, 1);
|
||||||
t.join(100);
|
|
||||||
if (t.isAlive()) {
|
|
||||||
t.interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue