diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 889b6f7c3e..dbbe871b07 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -115,8 +115,11 @@ public class StompSubscription { } } - if (!unconsumedMessage.isEmpty()) { + // For individual Ack we already sent an Ack that will be applied on commit + // we don't send a second standard Ack as that would produce an error. + if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) { ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + ack.setTransactionId(transactionId); unconsumedMessage.clear(); } } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index e12eacaf91..8679684f8c 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -1197,4 +1197,73 @@ public class Stomp11Test extends StompTestSupport { String receipt = stompConnection.receiveFrame(); assertTrue(receipt.contains("RECEIPT")); } + + @Test(timeout = 60000) + public void testAckMessagesInTransactionOutOfOrderWithTXClientAck() throws Exception { + doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client"); + } + + @Test(timeout = 60000) + public void testAckMessagesInTransactionOutOfOrderWithTXClientIndividualAck() throws Exception { + doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual"); + } + + public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception { + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Message 1")); + producer.send(session.createTextMessage("Message 2")); + producer.close(); + + String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" + + "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + + final QueueViewMBean queueView = getProxyToQueue(getQueueName()); + assertEquals(2, queueView.getQueueSize()); + + frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame receivedFirst = stompConnection.receive(); + assertTrue(receivedFirst.getAction().equals("MESSAGE")); + StompFrame receivedSecond = stompConnection.receive(); + assertTrue(receivedSecond.getAction().equals("MESSAGE")); + + // ack second, then first message + frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" + + receivedSecond.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" + + receivedFirst.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // commit transaction + frame = "COMMIT\n" + "receipt:1\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String receipt = stompConnection.receiveFrame(); + LOG.debug("Receipt Frame = {}", receipt); + assertTrue(receipt.contains("RECEIPT")); + + assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 0; + } + })); + + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(unsub); + + receipt = stompConnection.receiveFrame(); + assertTrue(receipt.contains("RECEIPT")); + } }