From c4d2ddfce9944cff6ec4003242ce79a2e843aac8 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 16 Apr 2021 13:12:13 +0100 Subject: [PATCH] AMQ-7298 - fix regression with broker redelivery plugin, fix and test relates to AMQ-8168 --- .../activemq/ActiveMQMessageConsumer.java | 2 ++ .../activemq/broker/BrokerRedeliveryTest.java | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 764f8c93b5..3fc56cf957 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -1263,6 +1263,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { MessageDispatch md = iter.next(); md.getMessage().onMessageRolledBack(); + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, md.getMessage()); } if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java index fbc0212f58..2635c8f4c8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java @@ -23,6 +23,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; @@ -40,12 +41,18 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class); BrokerService broker = null; + TransportConnector tcpConnector = null; final ActiveMQQueue destination = new ActiveMQQueue("Redelivery"); final String data = "hi"; final long redeliveryDelayMillis = 2000; long initialRedeliveryDelayMillis = 4000; int maxBrokerRedeliveries = 2; + public Boolean checkForDuplicates = Boolean.TRUE; + + public void initCombosForTestScheduledRedelivery() { + addCombinationValues("checkForDuplicates", new Object[] {Boolean.TRUE, Boolean.FALSE}); + } public void testScheduledRedelivery() throws Exception { doTestScheduledRedelivery(maxBrokerRedeliveries, true); @@ -130,6 +137,8 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { Message dlqMessage = dlqConsumer.receive(2000); assertNotNull("Got message from dql", dlqMessage); assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); + + consumerConnection.close(); } public void testNoScheduledRedeliveryOfDuplicates() throws Exception { @@ -178,6 +187,8 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { Message dlqMessage = dlqConsumer.receive(4000); assertNotNull("Got message from dql", dlqMessage); assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); + + consumerConnection.close(); } private void sendMessage(int timeToLive) throws Exception { @@ -206,6 +217,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { broker = new BrokerService(); broker.setPersistent(persistent); broker.setSchedulerSupport(true); + tcpConnector = broker.addConnector("tcp://localhost:0"); RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin(); @@ -231,7 +243,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://localhost"); + return new ActiveMQConnectionFactory("failover:(" + tcpConnector.getPublishableConnectString() + ")?jms.checkForDuplicates=" + checkForDuplicates.toString()); } @Override @@ -239,4 +251,8 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { stopBroker(); super.tearDown(); } + + public static Test suite() { + return suite(BrokerRedeliveryTest.class); + } }