AMQ-6697 Preserve dispatched state on client-individual tx ack

Need to preserve the messages in the dispatched list when a
client-individual ack comes in so that on abort the state remains
dispatched and the message can still be ack'd
(cherry picked from commit e83bb6dc38)
This commit is contained in:
Timothy Bish 2017-06-02 14:19:44 -04:00
parent 1c141eae40
commit 0be8b63fde
2 changed files with 19 additions and 11 deletions

View File

@ -49,8 +49,8 @@ public class StompSubscription {
protected final String subscriptionId; protected final String subscriptionId;
protected final ConsumerInfo consumerInfo; protected final ConsumerInfo consumerInfo;
protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>(); protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>(); protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
protected String ackMode = AUTO_ACK; protected String ackMode = AUTO_ACK;
protected ActiveMQDestination destination; protected ActiveMQDestination destination;
@ -65,15 +65,11 @@ public class StompSubscription {
void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException { void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if (ackMode == CLIENT_ACK) { if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
synchronized (this) { synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md); dispatchedMessage.put(message.getMessageId(), md);
} }
} else if (ackMode == INDIVIDUAL_ACK) { } else if (ackMode.equals(AUTO_ACK)) {
synchronized (this) {
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack); protocolConverter.getStompTransport().sendToActiveMQ(ack);
} }
@ -179,12 +175,14 @@ public class StompSubscription {
} else if (ackMode == INDIVIDUAL_ACK) { } else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId); ack.setMessageID(msgId);
ack.setMessageCount(1);
if (transactionId != null) { if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId)); unconsumedMessage.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId); ack.setTransactionId(transactionId);
} } else {
dispatchedMessage.remove(msgId); dispatchedMessage.remove(msgId);
} }
}
return ack; return ack;
} }

View File

@ -1131,7 +1131,17 @@ public class Stomp11Test extends StompTestSupport {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception { public void testTransactionRollbackAllowsSecondAckOutsideTXClientAck() throws Exception {
doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client");
}
@Test(timeout = 60000)
public void testTransactionRollbackAllowsSecondAckOutsideTXClientIndividualAck() throws Exception {
doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client-individual");
}
public void doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck(String ackMode) throws Exception {
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello")); producer.send(session.createTextMessage("Hello"));
producer.close(); producer.close();
@ -1150,7 +1160,7 @@ public class Stomp11Test extends StompTestSupport {
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:client\n\n" + Stomp.NULL; "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
StompFrame received = stompConnection.receive(); StompFrame received = stompConnection.receive();