mirror of https://github.com/apache/activemq.git
AMQ-6796 Avoid double Ack on commit in STOMP individual ack mode
During commit of transaction when subscription mode is individual ack the messages can get double acked leading to an error about receipt of an invalid ack.
This commit is contained in:
parent
63f0b7e20d
commit
4c986d102c
|
@ -115,8 +115,11 @@ public class StompSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!unconsumedMessage.isEmpty()) {
|
// For individual Ack we already sent an Ack that will be applied on commit
|
||||||
|
// we don't send a second standard Ack as that would produce an error.
|
||||||
|
if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
|
||||||
ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
|
ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
|
||||||
|
ack.setTransactionId(transactionId);
|
||||||
unconsumedMessage.clear();
|
unconsumedMessage.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1197,4 +1197,73 @@ public class Stomp11Test extends StompTestSupport {
|
||||||
String receipt = stompConnection.receiveFrame();
|
String receipt = stompConnection.receiveFrame();
|
||||||
assertTrue(receipt.contains("RECEIPT"));
|
assertTrue(receipt.contains("RECEIPT"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAckMessagesInTransactionOutOfOrderWithTXClientAck() throws Exception {
|
||||||
|
doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAckMessagesInTransactionOutOfOrderWithTXClientIndividualAck() throws Exception {
|
||||||
|
doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String ackMode) throws Exception {
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.send(session.createTextMessage("Message 1"));
|
||||||
|
producer.send(session.createTextMessage("Message 2"));
|
||||||
|
producer.close();
|
||||||
|
|
||||||
|
String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
|
||||||
|
"accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String f = stompConnection.receiveFrame();
|
||||||
|
assertTrue(f.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
final QueueViewMBean queueView = getProxyToQueue(getQueueName());
|
||||||
|
assertEquals(2, queueView.getQueueSize());
|
||||||
|
|
||||||
|
frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
StompFrame receivedFirst = stompConnection.receive();
|
||||||
|
assertTrue(receivedFirst.getAction().equals("MESSAGE"));
|
||||||
|
StompFrame receivedSecond = stompConnection.receive();
|
||||||
|
assertTrue(receivedSecond.getAction().equals("MESSAGE"));
|
||||||
|
|
||||||
|
// ack second, then first message
|
||||||
|
frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" +
|
||||||
|
receivedSecond.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + "message-id:" +
|
||||||
|
receivedFirst.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
// commit transaction
|
||||||
|
frame = "COMMIT\n" + "receipt:1\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
String receipt = stompConnection.receiveFrame();
|
||||||
|
LOG.debug("Receipt Frame = {}", receipt);
|
||||||
|
assertTrue(receipt.contains("RECEIPT"));
|
||||||
|
|
||||||
|
assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return queueView.getQueueSize() == 0;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
|
||||||
|
"receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(unsub);
|
||||||
|
|
||||||
|
receipt = stompConnection.receiveFrame();
|
||||||
|
assertTrue(receipt.contains("RECEIPT"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue