From a3836b562d019ad1ce637bf1341c1fc3775c7113 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 3 Jul 2012 20:45:12 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3224 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1356927 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 25 +++- .../activemq/ActiveMQConnectionFactory.java | 26 ++++- .../activemq/ActiveMQMessageConsumer.java | 2 +- .../apache/activemq/RedeliveryPolicyTest.java | 110 ++++++++++++++++++ 4 files changed, 154 insertions(+), 9 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 0f76e796a6..20ddf7732c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -55,6 +55,7 @@ import javax.jms.XAConnection; import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.blob.BlobTransferPolicy; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempDestination; @@ -131,7 +132,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // Configuration options variables private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); private BlobTransferPolicy blobTransferPolicy; - private RedeliveryPolicy redeliveryPolicy; + private RedeliveryPolicyMap redeliveryPolicyMap; private MessageTransformer transformer; private boolean disableTimeStampsByDefault; @@ -1644,14 +1645,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * @throws JMSException */ public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { - return redeliveryPolicy; + return redeliveryPolicyMap.getDefaultEntry(); } /** * Sets the redelivery policy to be used when messages are rolled back */ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { - this.redeliveryPolicy = redeliveryPolicy; + this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); } public BlobTransferPolicy getBlobTransferPolicy() { @@ -2549,4 +2550,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } } + + /** + * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back. + * @param redeliveryPolicyMap the redeliveryPolicyMap to set + */ + public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { + this.redeliveryPolicyMap = redeliveryPolicyMap; + } + + /** + * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the + * Consumers when dealing with transaction messages that have been rolled back. + * + * @return the redeliveryPolicyMap + */ + public RedeliveryPolicyMap getRedeliveryPolicyMap() { + return redeliveryPolicyMap; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 2f0f392c6d..19802ac197 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -36,6 +36,7 @@ import javax.jms.TopicConnectionFactory; import javax.naming.Context; import org.apache.activemq.blob.BlobTransferPolicy; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; import org.apache.activemq.jndi.JNDIBaseStorable; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; @@ -90,7 +91,10 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne // client policies private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); - private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); + { + redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy()); + } private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); private MessageTransformer transformer; @@ -317,7 +321,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); connection.setExclusiveConsumer(isExclusiveConsumer()); - connection.setRedeliveryPolicy(getRedeliveryPolicy()); + connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap()); connection.setTransformer(getTransformer()); connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); @@ -577,15 +581,27 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne } public RedeliveryPolicy getRedeliveryPolicy() { - return redeliveryPolicy; + return redeliveryPolicyMap.getDefaultEntry(); } /** - * Sets the global redelivery policy to be used when a message is delivered + * Sets the global default redelivery policy to be used when a message is delivered * but the session is rolled back */ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { - this.redeliveryPolicy = redeliveryPolicy; + this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); + } + + public RedeliveryPolicyMap getRedeliveryPolicyMap() { + return this.redeliveryPolicyMap; + } + + /** + * Sets the global redelivery policy mapping to be used when a message is delivered + * but the session is rolled back + */ + public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { + this.redeliveryPolicyMap = redeliveryPolicyMap; } public MessageTransformer getTransformer() { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 89b6bba062..5977c3ba99 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -208,7 +208,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } this.session = session; - this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); + this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest); setTransformer(session.getTransformer()); this.info = new ConsumerInfo(consumerId); diff --git a/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 1e378d4697..4378fd65af 100644 --- a/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -23,7 +23,9 @@ import javax.jms.TextMessage; import junit.framework.Test; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; /** * Test cases used to test the JMS message exclusive consumers. @@ -479,4 +481,112 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals("2nd", m.getText()); session.commit(); } + + public void testRedeliveryPolicyPerDestination() throws Exception { + + RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); + queuePolicy.setInitialRedeliveryDelay(0); + queuePolicy.setRedeliveryDelay(1000); + queuePolicy.setUseExponentialBackOff(false); + queuePolicy.setMaximumRedeliveries(2); + + RedeliveryPolicy topicPolicy = new RedeliveryPolicy(); + topicPolicy.setInitialRedeliveryDelay(0); + topicPolicy.setRedeliveryDelay(1000); + topicPolicy.setUseExponentialBackOff(false); + topicPolicy.setMaximumRedeliveries(3); + + // Receive a message with the JMS API + RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); + map.put(new ActiveMQTopic(">"), topicPolicy); + map.put(new ActiveMQQueue(">"), queuePolicy); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + ActiveMQTopic topic = new ActiveMQTopic("TEST"); + + MessageProducer producer = session.createProducer(null); + + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer topicConsumer = session.createConsumer(topic); + + // Send the messages + producer.send(queue, session.createTextMessage("1st")); + producer.send(queue, session.createTextMessage("2nd")); + producer.send(topic, session.createTextMessage("1st")); + producer.send(topic, session.createTextMessage("2nd")); + + session.commit(); + + TextMessage m; + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.rollback(); + + m = (TextMessage)queueConsumer.receive(100); + assertNotNull("first immediate redelivery", m); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull("first immediate redelivery", m); + session.rollback(); + + m = (TextMessage)queueConsumer.receive(100); + assertNull("second delivery delayed: " + m, m); + m = (TextMessage)topicConsumer.receive(100); + assertNull("second delivery delayed: " + m, m); + + m = (TextMessage)queueConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)topicConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.rollback(); + + m = (TextMessage)queueConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)topicConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.rollback(); + + // No third attempt for the Queue consumer + m = (TextMessage)queueConsumer.receive(2000); + assertNull(m); + m = (TextMessage)topicConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)queueConsumer.receive(100); + assertNull(m); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + } }