AMQ-7298 - fix regression with broker redelivery plugin, fix and test relates to AMQ-8168

This commit is contained in:
gtully 2021-04-16 13:12:13 +01:00
parent bd5af82944
commit c4d2ddfce9
2 changed files with 19 additions and 1 deletions

View File

@ -1263,6 +1263,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = iter.next(); MessageDispatch md = iter.next();
md.getMessage().onMessageRolledBack(); md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, md.getMessage());
} }
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES

View File

@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.RedeliveryPolicy;
@ -40,12 +41,18 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class); static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class);
BrokerService broker = null; BrokerService broker = null;
TransportConnector tcpConnector = null;
final ActiveMQQueue destination = new ActiveMQQueue("Redelivery"); final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
final String data = "hi"; final String data = "hi";
final long redeliveryDelayMillis = 2000; final long redeliveryDelayMillis = 2000;
long initialRedeliveryDelayMillis = 4000; long initialRedeliveryDelayMillis = 4000;
int maxBrokerRedeliveries = 2; int maxBrokerRedeliveries = 2;
public Boolean checkForDuplicates = Boolean.TRUE;
public void initCombosForTestScheduledRedelivery() {
addCombinationValues("checkForDuplicates", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testScheduledRedelivery() throws Exception { public void testScheduledRedelivery() throws Exception {
doTestScheduledRedelivery(maxBrokerRedeliveries, true); doTestScheduledRedelivery(maxBrokerRedeliveries, true);
@ -130,6 +137,8 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
Message dlqMessage = dlqConsumer.receive(2000); Message dlqMessage = dlqConsumer.receive(2000);
assertNotNull("Got message from dql", dlqMessage); assertNotNull("Got message from dql", dlqMessage);
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
consumerConnection.close();
} }
public void testNoScheduledRedeliveryOfDuplicates() throws Exception { public void testNoScheduledRedeliveryOfDuplicates() throws Exception {
@ -178,6 +187,8 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
Message dlqMessage = dlqConsumer.receive(4000); Message dlqMessage = dlqConsumer.receive(4000);
assertNotNull("Got message from dql", dlqMessage); assertNotNull("Got message from dql", dlqMessage);
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
consumerConnection.close();
} }
private void sendMessage(int timeToLive) throws Exception { private void sendMessage(int timeToLive) throws Exception {
@ -206,6 +217,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
broker = new BrokerService(); broker = new BrokerService();
broker.setPersistent(persistent); broker.setPersistent(persistent);
broker.setSchedulerSupport(true); broker.setSchedulerSupport(true);
tcpConnector = broker.addConnector("tcp://localhost:0");
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin(); RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
@ -231,7 +243,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
@Override @Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://localhost"); return new ActiveMQConnectionFactory("failover:(" + tcpConnector.getPublishableConnectString() + ")?jms.checkForDuplicates=" + checkForDuplicates.toString());
} }
@Override @Override
@ -239,4 +251,8 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
stopBroker(); stopBroker();
super.tearDown(); super.tearDown();
} }
public static Test suite() {
return suite(BrokerRedeliveryTest.class);
}
} }