diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 5530179bf1..98f728d3cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -86,7 +86,6 @@ import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.Env; -import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; @@ -859,16 +858,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private boolean internalFlushExecutor(long timeout, boolean log) { - FutureLatch future = new FutureLatch(); - getExecutor().execute(future); - - boolean result = future.await(timeout); - - if (log && !result) { - ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout); + if (!getExecutor().flush(timeout, TimeUnit.MILLISECONDS)) { + if (log) { + ActiveMQServerLogger.LOGGER.queueBusy(this.name.toString(), timeout); + } + return false; + } else { + return true; } - return result; } @Override diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java index ab36b33ec8..0181a05b43 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.timing.core.server.impl; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -46,7 +47,7 @@ public class QueueImplTest extends ActiveMQTestBase { private ScheduledExecutorService scheduledExecutor; - // private ExecutorService executor; + private ExecutorService executor; @Override @Before @@ -54,12 +55,14 @@ public class QueueImplTest extends ActiveMQTestBase { super.setUp(); scheduledExecutor = new ScheduledThreadPoolExecutor(1); + executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); } @Override @After public void tearDown() throws Exception { scheduledExecutor.shutdownNow(); + executor.shutdown(); super.tearDown(); } @@ -68,17 +71,14 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduledNoConsumer() throws Exception { - QueueImpl queue = - new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, - false, scheduledExecutor, null, null, null, - ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); + QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, ArtemisExecutor.delegate(executor), null, null); // Send one scheduled long now = System.currentTimeMillis(); MessageReference ref1 = generateReference(queue, 1); - ref1.setScheduledDeliveryTime(now + 7000); + ref1.setScheduledDeliveryTime(now + 700); queue.addTail(ref1); // Send some non scheduled messages @@ -93,19 +93,19 @@ public class QueueImplTest extends ActiveMQTestBase { // Now send some more scheduled messages MessageReference ref5 = generateReference(queue, 5); - ref5.setScheduledDeliveryTime(now + 5000); + ref5.setScheduledDeliveryTime(now + 500); queue.addTail(ref5); MessageReference ref6 = generateReference(queue, 6); - ref6.setScheduledDeliveryTime(now + 4000); + ref6.setScheduledDeliveryTime(now + 400); queue.addTail(ref6); MessageReference ref7 = generateReference(queue, 7); - ref7.setScheduledDeliveryTime(now + 3000); + ref7.setScheduledDeliveryTime(now + 300); queue.addTail(ref7); MessageReference ref8 = generateReference(queue, 8); - ref8.setScheduledDeliveryTime(now + 6000); + ref8.setScheduledDeliveryTime(now + 600); queue.addTail(ref8); List refs = new ArrayList<>(); @@ -123,7 +123,7 @@ public class QueueImplTest extends ActiveMQTestBase { refs.add(ref3); refs.add(ref4); - Thread.sleep(7500); + Thread.sleep(750); FakeConsumer consumer = new FakeConsumer(); @@ -136,8 +136,7 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduled() throws Exception { - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, - ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); + QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, ArtemisExecutor.delegate(executor), null, null); FakeConsumer consumer = null; @@ -146,7 +145,7 @@ public class QueueImplTest extends ActiveMQTestBase { long now = System.currentTimeMillis(); MessageReference ref1 = generateReference(queue, 1); - ref1.setScheduledDeliveryTime(now + 7000); + ref1.setScheduledDeliveryTime(now + 700); queue.addTail(ref1); // Send some non scheduled messages @@ -161,19 +160,19 @@ public class QueueImplTest extends ActiveMQTestBase { // Now send some more scheduled messages MessageReference ref5 = generateReference(queue, 5); - ref5.setScheduledDeliveryTime(now + 5000); + ref5.setScheduledDeliveryTime(now + 500); queue.addTail(ref5); MessageReference ref6 = generateReference(queue, 6); - ref6.setScheduledDeliveryTime(now + 4000); + ref6.setScheduledDeliveryTime(now + 400); queue.addTail(ref6); MessageReference ref7 = generateReference(queue, 7); - ref7.setScheduledDeliveryTime(now + 3000); + ref7.setScheduledDeliveryTime(now + 300); queue.addTail(ref7); MessageReference ref8 = generateReference(queue, 8); - ref8.setScheduledDeliveryTime(now + 6000); + ref8.setScheduledDeliveryTime(now + 600); queue.addTail(ref8); consumer = new FakeConsumer(); @@ -196,27 +195,27 @@ public class QueueImplTest extends ActiveMQTestBase { MessageReference ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); Assert.assertEquals(ref7, ref); long now2 = System.currentTimeMillis(); - Assert.assertTrue(now2 - now >= 3000); + Assert.assertTrue(now2 - now >= 300); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); Assert.assertEquals(ref6, ref); now2 = System.currentTimeMillis(); - Assert.assertTrue(now2 - now >= 4000); + Assert.assertTrue(now2 - now >= 400); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); Assert.assertEquals(ref5, ref); now2 = System.currentTimeMillis(); - Assert.assertTrue(now2 - now >= 5000); + Assert.assertTrue(now2 - now >= 500); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); Assert.assertEquals(ref8, ref); now2 = System.currentTimeMillis(); - Assert.assertTrue(now2 - now >= 6000); + Assert.assertTrue(now2 - now >= 600); ref = consumer.waitForNextReference(QueueImplTest.TIMEOUT); Assert.assertEquals(ref1, ref); now2 = System.currentTimeMillis(); - Assert.assertTrue(now2 - now >= 7000); + Assert.assertTrue(now2 - now >= 700); Assert.assertTrue(consumer.getReferences().isEmpty()); } @@ -236,10 +235,10 @@ public class QueueImplTest extends ActiveMQTestBase { } }; QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, - ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())), null, null); + ArtemisExecutor.delegate(executor), null, null); MessageReference messageReference = generateReference(queue, 1); queue.addConsumer(consumer); - messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); + messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 200); queue.addHead(messageReference, false); boolean gotLatch = countDownLatch.await(3000, TimeUnit.MILLISECONDS);