ARTEMIS-3604 Small test fix on ThresholdActorTest
This commit is contained in:
parent
1e62979577
commit
af13d90c57
|
@ -17,9 +17,11 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.utils.actors;
|
package org.apache.activemq.artemis.utils.actors;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -119,7 +121,7 @@ public class ThresholdActorTest {
|
||||||
|
|
||||||
public void block() {
|
public void block() {
|
||||||
try {
|
try {
|
||||||
if (!semaphore.tryAcquire()) {
|
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
|
||||||
errors.incrementAndGet();
|
errors.incrementAndGet();
|
||||||
System.err.println("acquire failed");
|
System.err.println("acquire failed");
|
||||||
}
|
}
|
||||||
|
@ -128,18 +130,36 @@ public class ThresholdActorTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void unblock() {
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFlow() throws Exception {
|
public void testFlow() throws Exception {
|
||||||
final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
final ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, semaphore::release);
|
ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, this::unblock);
|
||||||
|
|
||||||
final int LAST_ELEMENT = 1000;
|
final int LAST_ELEMENT = 1111;
|
||||||
|
|
||||||
for (int i = 0; i <= LAST_ELEMENT; i++) {
|
final CountDownLatch latchDone = new CountDownLatch(1);
|
||||||
actor.act(new Element(i, i % 2 == 0 ? 20 : 1));
|
|
||||||
}
|
executorService.execute(() -> {
|
||||||
|
for (int i = 0; i <= LAST_ELEMENT; i++) {
|
||||||
|
try {
|
||||||
|
semaphore.acquire();
|
||||||
|
semaphore.release();
|
||||||
|
actor.act(new Element(i, i % 2 == 0 ? 20 : 1));
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
errors.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
latchDone.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
Wait.assertEquals(LAST_ELEMENT, lastProcessed::get);
|
Wait.assertEquals(LAST_ELEMENT, lastProcessed::get);
|
||||||
Assert.assertEquals(0, errors.get());
|
Assert.assertEquals(0, errors.get());
|
||||||
|
|
Loading…
Reference in New Issue