diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 298ce38ddc..32eeb00820 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -548,13 +548,18 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private boolean redeliveryExceeded(MessageDispatch md) { try { - return session.getTransacted() - && redeliveryPolicy != null - && redeliveryPolicy.isPreDispatchCheck() - && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES - && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries() - // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin - && md.getMessage().getProperty("redeliveryDelay") == null; + if(!session.getTransacted() || redeliveryPolicy == null || !redeliveryPolicy.isPreDispatchCheck()) { + return false; + } + + if(info.isBrowser() && redeliveryPolicy.isQueueBrowserIgnored()) { + return false; + } + + return redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries() + // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin + && md.getMessage().getProperty("redeliveryDelay") == null; } catch (Exception ignored) { return false; } diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java index 1ced50755c..c3190eff6d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -46,6 +46,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, protected double backOffMultiplier = 5.0; protected long redeliveryDelay = initialRedeliveryDelay; protected boolean preDispatchCheck = true; + protected boolean queueBrowserIgnored = true; public RedeliveryPolicy() { } @@ -165,4 +166,12 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, public boolean isPreDispatchCheck() { return preDispatchCheck; } + + public void setQueueBrowserIgnored(boolean queueBrowserIgnored) { + this.queueBrowserIgnored = queueBrowserIgnored; + } + + public boolean isQueueBrowserIgnored() { + return queueBrowserIgnored; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java index 30802adab8..774427bd54 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java @@ -17,12 +17,15 @@ package org.apache.activemq.usecases; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; - +import static org.junit.Assert.fail; import java.io.IOException; import java.net.URI; import java.util.Enumeration; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -35,8 +38,14 @@ import jakarta.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.DestinationView; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.QueueMessageReference; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.junit.After; import org.junit.Before; @@ -68,6 +77,10 @@ public class QueueBrowsingTest { connectUri = connector.getConnectUri(); factory = new ActiveMQConnectionFactory(connectUri); + factory.setWatchTopicAdvisories(false); + factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l); + factory.getRedeliveryPolicy().setRedeliveryDelay(0l); + factory.getRedeliveryPolicy().setMaximumRedeliveryDelay(0l); } public BrokerService createBroker() throws IOException { @@ -217,4 +230,195 @@ public class QueueBrowsingTest { browser.close(); assertTrue("got at least maxPageSize", received >= maxPageSize); } + + @Test // https://issues.apache.org/jira/browse/AMQ-9554 + public void testBrowseRedeliveryMaxRedelivered() throws Exception { + browseRedelivery(0, true); + } + + @Test // Ignore https://issues.apache.org/jira/browse/AMQ-9554 + public void testBrowseRedeliveryIgnored() throws Exception { + browseRedelivery(1, false); + } + + protected void browseRedelivery(int browseExpected, boolean dlqDlqExpected) throws Exception { + IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy(); + individualDeadLetterStrategy.setQueuePrefix(""); + individualDeadLetterStrategy.setQueueSuffix(".dlq"); + individualDeadLetterStrategy.setUseQueueForQueueMessages(true); + broker.getDestinationPolicy().getDefaultEntry().setDeadLetterStrategy(individualDeadLetterStrategy); + broker.getDestinationPolicy().getDefaultEntry().setPersistJMSRedelivered(true); + + if(dlqDlqExpected) { + factory.getRedeliveryPolicy().setQueueBrowserIgnore(false); + } + + String messageId = null; + + String queueName = "browse.redeliverd.tx"; + String dlqQueueName = "browse.redeliverd.tx.dlq"; + String dlqDlqQueueName = "browse.redeliverd.tx.dlq.dlq"; + + ActiveMQQueue queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0"); + ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName + "?consumer.prefetchSize=0"); + ActiveMQQueue queueDLQDLQ = new ActiveMQQueue(dlqDlqQueueName); + + broker.getAdminView().addQueue(queueName); + broker.getAdminView().addQueue(dlqQueueName); + + DestinationView dlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqQueueName); + DestinationView queueView = broker.getAdminView().getBroker().getQueueView(queueName); + + verifyQueueStats(0l, 0l, 0l, dlqQueueView); + verifyQueueStats(0l, 0l, 0l, queueView); + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(queue); + + Message sendMessage = session.createTextMessage("Hello world!"); + producer.send(sendMessage); + messageId = sendMessage.getJMSMessageID(); + session.commit(); + producer.close(); + + verifyQueueStats(0l, 0l, 0l, dlqQueueView); + verifyQueueStats(1l, 0l, 1l, queueView); + + // Redeliver message to DLQ + Message message = null; + MessageConsumer consumer = session.createConsumer(queue); + int rollbackCount = 0; + do { + message = consumer.receive(2000l); + if(message != null) { + session.rollback(); + rollbackCount++; + } + } while (message != null); + + assertEquals(Integer.valueOf(7), Integer.valueOf(rollbackCount)); + verifyQueueStats(1l, 0l, 1l, dlqQueueView); + verifyQueueStats(1l, 1l, 0l, queueView); + + session.commit(); + consumer.close(); + + // Increment redelivery counter on the message in the DLQ + // Close the consumer to force broker to dispatch + Message messageDLQ = null; + MessageConsumer consumerDLQ = session.createConsumer(queueDLQ); + int dlqRollbackCount = 0; + int dlqRollbackCountLimit = 5; + do { + messageDLQ = consumerDLQ.receive(2000l); + if(messageDLQ != null) { + session.rollback(); + session.close(); + consumerDLQ.close(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumerDLQ = session.createConsumer(queueDLQ); + dlqRollbackCount++; + } + } while (messageDLQ != null && dlqRollbackCount < dlqRollbackCountLimit); + session.commit(); + consumerDLQ.close(); + + // Browse in tx mode works when we are at the edge of maxRedeliveries + // aka browse does not increment redeliverCounter as expected + Queue brokerQueueDLQ = resolveQueue(broker, queueDLQ); + + for(int i=0; i<16; i++) { + QueueBrowser browser = session.createBrowser(queueDLQ); + Enumeration enumeration = browser.getEnumeration(); + ActiveMQMessage activemqMessage = null; + int received = 0; + while (enumeration.hasMoreElements()) { + activemqMessage = (ActiveMQMessage)enumeration.nextElement(); + received++; + } + browser.close(); + assertEquals(Integer.valueOf(1), Integer.valueOf(received)); + assertEquals(Integer.valueOf(6), Integer.valueOf(activemqMessage.getRedeliveryCounter())); + + // Confirm broker-side redeliveryCounter + QueueMessageReference queueMessageReference = brokerQueueDLQ.getMessage(messageId); + assertEquals(Integer.valueOf(6), Integer.valueOf(queueMessageReference.getRedeliveryCounter())); + } + + session.close(); + connection.close(); + + // Change redelivery max and the browser will fail + factory.getRedeliveryPolicy().setMaximumRedeliveries(3); + final Connection browseConnection = factory.createConnection(); + browseConnection.start(); + + final AtomicInteger browseCounter = new AtomicInteger(0); + final AtomicInteger jmsExceptionCounter = new AtomicInteger(0); + + final Session browseSession = browseConnection.createSession(true, Session.SESSION_TRANSACTED); + + Thread browseThread = new Thread() { + public void run() { + + QueueBrowser browser = null; + try { + browser = browseSession.createBrowser(queueDLQ); + Enumeration enumeration = browser.getEnumeration(); + if(Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + } + while (enumeration.hasMoreElements()) { + Message message = (Message)enumeration.nextElement(); + if(message != null) { + browseCounter.incrementAndGet(); + } + } + } catch (JMSException e) { + jmsExceptionCounter.incrementAndGet(); + } finally { + if(browser != null) { try { browser.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } } + if(browseSession != null) { try { browseSession.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } } + if(browseConnection != null) { try { browseConnection.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } } + } + } + }; + browseThread.start(); + Thread.sleep(2000l); + browseThread.interrupt(); + + assertEquals(Integer.valueOf(browseExpected), Integer.valueOf(browseCounter.get())); + assertEquals(Integer.valueOf(0), Integer.valueOf(jmsExceptionCounter.get())); + + // ActiveMQConsumer sends a poison ack, messages gets moved to .dlq.dlq AND remains on the .dlq + DestinationView dlqDlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqDlqQueueName); + verifyQueueStats(1l, 1l, 0l, queueView); + verifyQueueStats(1l, 0l, 1l, dlqQueueView); + + if(dlqDlqExpected) { + verifyQueueStats(1l, 0l, 1l, dlqDlqQueueView); + } else { + assertNull(dlqDlqQueueView); + } + } + protected static void verifyQueueStats(long enqueueCount, long dequeueCount, long queueSize, DestinationView queueView) { + assertEquals(Long.valueOf(enqueueCount), Long.valueOf(queueView.getEnqueueCount())); + assertEquals(Long.valueOf(dequeueCount), Long.valueOf(queueView.getDequeueCount())); + assertEquals(Long.valueOf(queueSize), Long.valueOf(queueView.getQueueSize())); + } + + protected static Queue resolveQueue(BrokerService brokerService, ActiveMQQueue activemqQueue) throws Exception { + Set destinations = brokerService.getBroker().getDestinations(activemqQueue); + if(destinations == null || destinations.isEmpty()) { + return null; + } + + if(destinations.size() > 1) { + fail("Expected one-and-only one queue for: " + activemqQueue); + } + + return (Queue)destinations.iterator().next(); + } }