From 8e9f80e7d6fdee3e3ecd2293fa364c05a2e9b18b Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 23 Aug 2017 12:21:03 -0400 Subject: [PATCH] AMQ-6796 Avoid double Ack on commit in STOMP individual ack mode During commit of transaction when subscription mode is individual ack the messages can get double acked leading to an error about receipt of an invalid ack. (cherry picked from commit 4c986d102cd5c862d88fd84eec1889b9786e9970) --- .../transport/stomp/StompSubscription.java | 5 +- .../activemq/transport/stomp/Stomp11Test.java | 69 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 889b6f7c3e..dbbe871b07 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -115,8 +115,11 @@ public class StompSubscription { } } - if (!unconsumedMessage.isEmpty()) { + // For individual Ack we already sent an Ack that will be applied on commit + // we don't send a second standard Ack as that would produce an error. + if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) { ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + ack.setTransactionId(transactionId); unconsumedMessage.clear(); } } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index e12eacaf91..8679684f8c 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -1197,4 +1197,73 @@ public class Stomp11Test extends StompTestSupport { String receipt = stompConnection.receiveFrame(); assertTrue(receipt.contains("RECEIPT")); } + + @Test(timeout = 60000) + public void testAckMessagesInTransactionOutOfOrderWithTXClientAck() throws Exception { + doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client"); + } + + @Test(timeout = 60000) + public void testAckMessagesInTransactionOutOfOrderWithTXClientIndividualAck() throws Exception { + doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual"); + } + + public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception { + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Message 1")); + producer.send(session.createTextMessage("Message 2")); + producer.close(); + + String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" + + "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("CONNECTED")); + + final QueueViewMBean queueView = getProxyToQueue(getQueueName()); + assertEquals(2, queueView.getQueueSize()); + + frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame receivedFirst = stompConnection.receive(); + assertTrue(receivedFirst.getAction().equals("MESSAGE")); + StompFrame receivedSecond = stompConnection.receive(); + assertTrue(receivedSecond.getAction().equals("MESSAGE")); + + // ack second, then first message + frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" + + receivedSecond.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" + + receivedFirst.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // commit transaction + frame = "COMMIT\n" + "receipt:1\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String receipt = stompConnection.receiveFrame(); + LOG.debug("Receipt Frame = {}", receipt); + assertTrue(receipt.contains("RECEIPT")); + + assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 0; + } + })); + + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "receipt:1\n" + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(unsub); + + receipt = stompConnection.receiveFrame(); + assertTrue(receipt.contains("RECEIPT")); + } }