diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index a707cf28f4..f61c899552 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -35,6 +35,7 @@ import javax.jms.TextMessage; import javax.management.ObjectName; import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.util.Wait; @@ -1128,4 +1129,62 @@ public class Stomp11Test extends StompTestSupport { assertEquals(view.getDurableTopicSubscribers().length, 2); assertEquals(view.getInactiveDurableTopicSubscribers().length, 0); } + + @Test(timeout = 60000) + public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception { + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + producer.close(); + + String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" + + "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + + QueueViewMBean queueView = getProxyToQueue(getQueueName()); + assertEquals(1, queueView.getQueueSize()); + + frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + + // ack it in the TX then abort + frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" + + received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // rollback first message + frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + assertEquals(1, queueView.getQueueSize()); + + // ack it outside the TX and it should be really ack'd + frame = "ACK\n" + "subscription:12345\n" + "message-id:" + + received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 0; + } + })); + + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(unsub); + + String receipt = stompConnection.receiveFrame(); + assertTrue(receipt.contains("RECEIPT")); + } }