diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 13826b3505..1dd99d2146 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -400,6 +400,7 @@ public class AmqpSender extends AmqpAbstractLink { // It's the end of browse signal in response to a MessagePull getEndpoint().drained(); draining = false; + currentCredit = 0; } else { jms.setRedeliveryCounter(md.getRedeliveryCounter()); jms.setReadOnlyBody(true); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index b841ecfc08..6f00ab20f5 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.io.File; +import java.io.IOException; import java.net.URI; import java.security.SecureRandom; import java.util.Set; @@ -42,6 +43,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.spring.SpringSslContext; @@ -332,6 +334,19 @@ public class AmqpTestSupport { return proxy; } + protected SubscriptionViewMBean getProxyToQueueSubscriber(String name) throws MalformedObjectNameException, JMSException, IOException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + SubscriptionViewMBean subscription = null; + for (ObjectName subscriber : proxy.getSubscriptions()) { + subscription = (SubscriptionViewMBean) brokerService.getManagementContext() + .newProxyInstance(subscriber, SubscriptionViewMBean.class, true); + } + + return subscription; + } + protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index 47dc9ec6a3..508638e18f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -17,15 +17,19 @@ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,4 +117,50 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { session.close(); } + + @Test(timeout = 60000) + public void testQueueTXRollbackAndCommit() throws Exception { + final int MSG_COUNT = 3; + + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue destination = session.createQueue(getDestinationName()); + + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to rollback", i); + TextMessage message = session.createTextMessage("Rolled back Message: " + i); + message.setIntProperty("MessageSequence", i); + producer.send(message); + } + + session.rollback(); + + assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize()); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Sending message: {} to commit", i); + TextMessage message = session.createTextMessage("Commit Message: " + i); + message.setIntProperty("MessageSequence", i); + producer.send(message); + } + + session.commit(); + + assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize()); + SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName()); + assertNotNull(subscription); + assertTrue(subscription.getPrefetchSize() > 0); + + for (int i = 1; i <= MSG_COUNT; i++) { + LOG.info("Trying to receive message: {}", i); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull("Message " + i + "should be available", message); + assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence")); + } + } }