From 6ba54964bd447e9f536d905af27de6504e79f066 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 25 Apr 2023 14:12:25 -0500 Subject: [PATCH] ARTEMIS-4258 delayBeforeDispatch not working with OpenWire --- .../protocol/openwire/amq/AMQConsumer.java | 12 ++ .../JMSConsumerDelayDispatchTest.java} | 119 ++++++++++++------ 2 files changed, 92 insertions(+), 39 deletions(-) rename tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/{client/ConsumerDelayDispatchTest.java => multiprotocol/JMSConsumerDelayDispatchTest.java} (64%) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index c6ca2310ad..fee1f6c849 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -80,6 +80,7 @@ public class AMQConsumer { //it's address/queue to management service private boolean internalAddress = false; private volatile Set rolledbackMessageRefs; + private ScheduledFuture delayedDispatchPrompter; public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, @@ -179,6 +180,14 @@ public class AMQConsumer { } } + if (serverConsumer != null && serverConsumer.getQueue() != null && serverConsumer.getQueue().getQueueConfiguration() != null) { + Long delayBeforeDispatch = serverConsumer.getQueue().getQueueConfiguration().getDelayBeforeDispatch(); + if (delayBeforeDispatch != null && delayBeforeDispatch > 0) { + Long schedule = delayBeforeDispatch / 2; + delayedDispatchPrompter = scheduledPool.scheduleAtFixedRate(() -> serverConsumer.promptDelivery(), schedule, schedule, TimeUnit.MILLISECONDS); + } + } + serverConsumer.setProtocolData(this); } @@ -404,6 +413,9 @@ public class AMQConsumer { public void removeConsumer() throws Exception { serverConsumer.close(false); + if (delayedDispatchPrompter != null) { + delayedDispatchPrompter.cancel(false); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java similarity index 64% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java index 3240911484..dd4e265de4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java @@ -14,10 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.tests.integration.jms.client; +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -29,43 +28,45 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -/** - * Exclusive Test - */ -public class ConsumerDelayDispatchTest extends JMSTestBase { +public class JMSConsumerDelayDispatchTest extends MultiprotocolJMSClientTestSupport { private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.delay.queue"); - private SimpleString normalQueueName = SimpleString.toSimpleString("jms.noraml.queue"); + private SimpleString normalQueueName = SimpleString.toSimpleString("jms.normal.queue"); private static final long DELAY_BEFORE_DISPATCH = 10000L; @Override - @Before - public void setUp() throws Exception { - super.setUp(); + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + super.createAddressAndQueues(server); server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true).setConsumersBeforeDispatch(2).setDelayBeforeDispatch(DELAY_BEFORE_DISPATCH)); server.createQueue(new QueueConfiguration(normalQueueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true)); } - - protected ConnectionFactory getCF() throws Exception { - return cf; + @Test + public void testNoDelayOnDefaultAMQP() throws Exception { + testNoDelayOnDefault(AMQPConnection); } @Test - public void testNoDelayOnDefault() throws Exception { - sendMessage(normalQueueName); + public void testNoDelayOnDefaultOpenWire() throws Exception { + testNoDelayOnDefault(OpenWireConnection); + } - ConnectionFactory fact = getCF(); - Connection connection = fact.createConnection(); + @Test + public void testNoDelayOnDefaultCore() throws Exception { + testNoDelayOnDefault(CoreConnection); + } + + private void testNoDelayOnDefault(ConnectionSupplier supplier) throws Exception { + sendMessage(normalQueueName, supplier); + + Connection connection = supplier.createConnection(); try { - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); connection.start(); @@ -79,14 +80,26 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { } @Test - public void testDelayBeforeDispatch() throws Exception { - sendMessage(queueName); + public void testDelayBeforeDispatchAMQP() throws Exception { + testDelayBeforeDispatch(AMQPConnection); + } - ConnectionFactory fact = getCF(); - Connection connection = fact.createConnection(); + @Test + public void testDelayBeforeDispatchOpenWire() throws Exception { + testDelayBeforeDispatch(OpenWireConnection); + } + + @Test + public void testDelayBeforeDispatchCore() throws Exception { + testDelayBeforeDispatch(CoreConnection); + } + + private void testDelayBeforeDispatch(ConnectionSupplier supplier) throws Exception { + sendMessage(queueName, supplier); + + Connection connection = supplier.createConnection(); try { - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); connection.start(); @@ -103,12 +116,24 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { } @Test - public void testConsumersBeforeDispatch() throws Exception { - sendMessage(queueName); + public void testConsumersBeforeDispatchAMQP() throws Exception { + testConsumersBeforeDispatch(AMQPConnection); + } + @Test + public void testConsumersBeforeDispatchOpenWire() throws Exception { + testConsumersBeforeDispatch(OpenWireConnection); + } - ConnectionFactory fact = getCF(); - Connection connection = fact.createConnection(); + @Test + public void testConsumersBeforeDispatchCore() throws Exception { + testConsumersBeforeDispatch(CoreConnection); + } + + private void testConsumersBeforeDispatch(ConnectionSupplier supplier) throws Exception { + sendMessage(queueName, supplier); + + Connection connection = supplier.createConnection(); try { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -127,13 +152,25 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { } } + @Test + public void testContinueAndResetConsumerAMQP() throws Exception { + testContinueAndResetConsumer(AMQPConnection); + } @Test - public void testContinueAndResetConsumer() throws Exception { - sendMessage(queueName); + public void testContinueAndResetConsumerOpenWire() throws Exception { + testContinueAndResetConsumer(OpenWireConnection); + } - ConnectionFactory fact = getCF(); - Connection connection = fact.createConnection(); + @Test + public void testContinueAndResetConsumerCore() throws Exception { + testContinueAndResetConsumer(CoreConnection); + } + + private void testContinueAndResetConsumer(ConnectionSupplier supplier) throws Exception { + sendMessage(queueName, supplier); + + Connection connection = supplier.createConnection(); try { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -151,15 +188,17 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { consumer2.close(); //Ensure that now dispatch is active, if we close a consumer, dispatching continues. - sendMessage(queueName); + sendMessage(queueName, supplier); Assert.assertNotNull(receive(consumer1)); //Stop all consumers, which should reset dispatch rules. consumer1.close(); + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //Ensure that once all consumers are stopped, that dispatch rules reset and wait for min consumers. - sendMessage(queueName); + sendMessage(queueName, supplier); MessageConsumer consumer3 = session.createConsumer(queue); @@ -173,9 +212,11 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { //Stop all consumers, which should reset dispatch rules. consumer3.close(); consumer4.close(); + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //Ensure that once all consumers are stopped, that dispatch rules reset and wait for delay. - sendMessage(queueName); + sendMessage(queueName, supplier); MessageConsumer consumer5 = session.createConsumer(queue); @@ -191,6 +232,7 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { } private Message receive(MessageConsumer consumer1) throws JMSException { + System.out.println("receiving..."); return consumer1.receive(1000); } @@ -202,9 +244,8 @@ public class ConsumerDelayDispatchTest extends JMSTestBase { return receivedMessage; } - public void sendMessage(SimpleString queue) throws Exception { - ConnectionFactory fact = getCF(); - Connection connection = fact.createConnection(); + public void sendMessage(SimpleString queue, ConnectionSupplier supplier) throws Exception { + Connection connection = supplier.createConnection(); try { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);