From 3804d66f6f007d20c3372df72b8349033dafecb9 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 30 Jan 2009 11:57:06 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-1730 - add some more tests and remove workaround for prefetch=0, relates to https://issues.apache.org/activemq/browse/AMQ-2087 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@739249 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 2 - .../org/apache/activemq/JMSConsumerTest.java | 21 +++++-- .../activemq/JmsRollbackRedeliveryTest.java | 55 +++++++++++++++++-- .../CloseRollbackRedeliveryQueueTest.java | 6 +- 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 9a0306bfb1..43201f3cbd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -230,8 +230,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void afterRollback() throws Exception { synchronized(dispatchLock) { - // ActiveMQ workaround for AMQ-1730 - Please Ignore next line - node.incrementRedeliveryCounter(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 6939ffa247..535119ef24 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -630,10 +630,11 @@ public class JMSConsumerTest extends JmsTestSupport { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); - sendMessages(connection, destination, 1); + sendMessages(connection, destination, 2); MessageConsumer consumer = session.createConsumer(destination); assertNotNull(consumer.receive(1000)); + assertNotNull(consumer.receive(1000)); // install another consumer while message dispatch is unacked/uncommitted Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -645,8 +646,12 @@ public class JMSConsumerTest extends JmsTestSupport { Message msg = redispatchConsumer.receive(1000); assertNotNull(msg); assertTrue(msg.getJMSRedelivered()); - // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much! - assertEquals(3, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + + msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); redispatchSession.commit(); assertNull(redispatchConsumer.receive(500)); @@ -660,10 +665,11 @@ public class JMSConsumerTest extends JmsTestSupport { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); - sendMessages(connection, destination, 1); + sendMessages(connection, destination, 2); MessageConsumer consumer = session.createConsumer(destination); assertNotNull(consumer.receive(1000)); + assertNotNull(consumer.receive(1000)); // install another consumer while message dispatch is unacked/uncommitted Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -675,8 +681,11 @@ public class JMSConsumerTest extends JmsTestSupport { Message msg = redispatchConsumer.receive(1000); assertNotNull(msg); assertTrue(msg.getJMSRedelivered()); - // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much! - assertEquals(3, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); redispatchSession.commit(); assertNull(redispatchConsumer.receive(500)); diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java index 939c7ba6c0..ef82cecb65 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java @@ -38,6 +38,7 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { protected static final Log LOG = LogFactory.getLog(JmsRollbackRedeliveryTest.class); final int nbMessages = 10; final String destinationName = "Destination"; + final String brokerUrl = "vm://localhost?create=false"; boolean consumerClose = true; boolean rollback = true; @@ -52,13 +53,21 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { public void testRedelivery() throws Exception { - doTestRedelivery("vm://localhost", false); + doTestRedelivery(brokerUrl, false); } public void testRedeliveryWithInterleavedProducer() throws Exception { - doTestRedelivery("vm://localhost", true); + doTestRedelivery(brokerUrl, true); } + public void testRedeliveryWithPrefetch0() throws Exception { + doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0", true); + } + + public void testRedeliveryWithPrefetch1() throws Exception { + doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1", true); + } + public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception { final int nbMessages = 10; @@ -88,9 +97,11 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); session.commit(); } else { LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + assertFalse(msg.getJMSRedelivered()); session.rollback(); } } @@ -102,7 +113,8 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { public void testRedeliveryOnSingleConsumer() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); connection.start(); @@ -135,7 +147,8 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { public void testRedeliveryOnSingleSession() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); connection.start(); @@ -168,7 +181,8 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); connection.start(); @@ -195,6 +209,37 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport { } } + public void testRedeliveryPropertyWithNoRollback() throws Exception { + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + connection.close(); + + { + AtomicInteger received = new AtomicInteger(); + while (received.get() < nbMessages) { + connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(2000); + if (msg != null) { + LOG.info("Received message " + msg.getText() + + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertFalse(msg.getJMSRedelivered()); + assertEquals(1, msg.getLongProperty("JMSXDeliveryCount")); + } + session.close(); + connection.close(); + } + } + } + private void populateDestination(final int nbMessages, final String destinationName, Connection connection) throws JMSException { diff --git a/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java index 2d6e812573..1c754d416f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java @@ -55,7 +55,7 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport session.commit(); assertNotNull(message); assertEquals("redelivered message", id, message.getJMSMessageID()); - assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + assertEquals(2, message.getLongProperty("JMSXDeliveryCount")); } public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable { @@ -75,7 +75,7 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport session.commit(); assertNotNull(message); assertEquals("redelivered message", id, message.getJMSMessageID()); - assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + assertEquals(2, message.getLongProperty("JMSXDeliveryCount")); } public void testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws Throwable { @@ -94,7 +94,7 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport session.commit(); assertNotNull(message); assertEquals("redelivered message", id, message.getJMSMessageID()); - assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + assertEquals(2, message.getLongProperty("JMSXDeliveryCount")); } protected void setUp() throws Exception {