diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java index 5c715ecc3f..01e547185d 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java @@ -17,9 +17,11 @@ package org.apache.activemq.artemis.utils.actors; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -119,7 +121,7 @@ public class ThresholdActorTest { public void block() { try { - if (!semaphore.tryAcquire()) { + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { errors.incrementAndGet(); System.err.println("acquire failed"); } @@ -128,18 +130,36 @@ public class ThresholdActorTest { } } + public void unblock() { + semaphore.release(); + } + @Test public void testFlow() throws Exception { - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ExecutorService executorService = Executors.newFixedThreadPool(2); try { - ThresholdActor actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, semaphore::release); + ThresholdActor actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, this::unblock); - final int LAST_ELEMENT = 1000; + final int LAST_ELEMENT = 1111; - for (int i = 0; i <= LAST_ELEMENT; i++) { - actor.act(new Element(i, i % 2 == 0 ? 20 : 1)); - } + final CountDownLatch latchDone = new CountDownLatch(1); + + executorService.execute(() -> { + for (int i = 0; i <= LAST_ELEMENT; i++) { + try { + semaphore.acquire(); + semaphore.release(); + actor.act(new Element(i, i % 2 == 0 ? 20 : 1)); + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + latchDone.countDown(); + }); + + Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS)); Wait.assertEquals(LAST_ELEMENT, lastProcessed::get); Assert.assertEquals(0, errors.get());