diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index d4492e1bf6..889b6f7c3e 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -49,8 +49,8 @@ public class StompSubscription { protected final String subscriptionId; protected final ConsumerInfo consumerInfo; - protected final LinkedHashMap dispatchedMessage = new LinkedHashMap(); - protected final LinkedList unconsumedMessage = new LinkedList(); + protected final LinkedHashMap dispatchedMessage = new LinkedHashMap<>(); + protected final LinkedList unconsumedMessage = new LinkedList<>(); protected String ackMode = AUTO_ACK; protected ActiveMQDestination destination; @@ -65,15 +65,11 @@ public class StompSubscription { void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException { ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); - if (ackMode == CLIENT_ACK) { + if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) { synchronized (this) { dispatchedMessage.put(message.getMessageId(), md); } - } else if (ackMode == INDIVIDUAL_ACK) { - synchronized (this) { - dispatchedMessage.put(message.getMessageId(), md); - } - } else if (ackMode == AUTO_ACK) { + } else if (ackMode.equals(AUTO_ACK)) { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); protocolConverter.getStompTransport().sendToActiveMQ(ack); } @@ -179,11 +175,13 @@ public class StompSubscription { } else if (ackMode == INDIVIDUAL_ACK) { ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); ack.setMessageID(msgId); + ack.setMessageCount(1); if (transactionId != null) { unconsumedMessage.add(dispatchedMessage.get(msgId)); ack.setTransactionId(transactionId); + } else { + dispatchedMessage.remove(msgId); } - dispatchedMessage.remove(msgId); } return ack; } diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index f61c899552..5050399400 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -1131,7 +1131,17 @@ public class Stomp11Test extends StompTestSupport { } @Test(timeout = 60000) - public void testTransactionRollbackAllowsSecondAckOutsideTX() throws Exception { + public void testTransactionRollbackAllowsSecondAckOutsideTXClientAck() throws Exception { + doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client"); + } + + @Test(timeout = 60000) + public void testTransactionRollbackAllowsSecondAckOutsideTXClientIndividualAck() throws Exception { + doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck("client-individual"); + } + + public void doTestTransactionRollbackAllowsSecondAckOutsideTXClientAck(String ackMode) throws Exception { + MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Hello")); producer.close(); @@ -1150,7 +1160,7 @@ public class Stomp11Test extends StompTestSupport { stompConnection.sendFrame(frame); frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + - "id:12345\n" + "ack:client\n\n" + Stomp.NULL; + "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); StompFrame received = stompConnection.receive();