ensure drain completion clear currently tracked credit value, next flow
should update to the correct value.
This commit is contained in:
Timothy Bish 2015-05-27 11:30:16 -04:00
parent 0a21c5f8ff
commit 5e7b70f11f
3 changed files with 66 additions and 0 deletions

View File

@ -400,6 +400,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
// It's the end of browse signal in response to a MessagePull // It's the end of browse signal in response to a MessagePull
getEndpoint().drained(); getEndpoint().drained();
draining = false; draining = false;
currentCredit = 0;
} else { } else {
jms.setRedeliveryCounter(md.getRedeliveryCounter()); jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true); jms.setReadOnlyBody(true);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp; package org.apache.activemq.transport.amqp;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.Set; import java.util.Set;
@ -42,6 +43,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.spring.SpringSslContext;
@ -332,6 +334,19 @@ public class AmqpTestSupport {
return proxy; return proxy;
} }
protected SubscriptionViewMBean getProxyToQueueSubscriber(String name) throws MalformedObjectNameException, JMSException, IOException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
SubscriptionViewMBean subscription = null;
for (ObjectName subscriber : proxy.getSubscriptions()) {
subscription = (SubscriptionViewMBean) brokerService.getManagementContext()
.newProxyInstance(subscriber, SubscriptionViewMBean.class, true);
}
return subscription;
}
protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()

View File

@ -17,15 +17,19 @@
package org.apache.activemq.transport.amqp; package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -113,4 +117,50 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
session.close(); session.close();
} }
@Test(timeout = 60000)
public void testQueueTXRollbackAndCommit() throws Exception {
final int MSG_COUNT = 3;
connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to rollback", i);
TextMessage message = session.createTextMessage("Rolled back Message: " + i);
message.setIntProperty("MessageSequence", i);
producer.send(message);
}
session.rollback();
assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize());
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to commit", i);
TextMessage message = session.createTextMessage("Commit Message: " + i);
message.setIntProperty("MessageSequence", i);
producer.send(message);
}
session.commit();
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Trying to receive message: {}", i);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull("Message " + i + "should be available", message);
assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence"));
}
}
} }