diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index b158b5be61..4fe7e5d1da 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -35,7 +35,7 @@ import org.apache.activemq.command.TransactionId; /** * Keeps track of the STOMP subscription so that acking is correctly done. - * + * * @author chirino */ public class StompSubscription { @@ -54,7 +54,7 @@ public class StompSubscription { private String ackMode = AUTO_ACK; private ActiveMQDestination destination; private String transformation; - + public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { this.protocolConverter = stompTransport; @@ -79,7 +79,7 @@ public class StompSubscription { } boolean ignoreTransformation = false; - + if (transformation != null) { message.setReadOnlyProperties(false); message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); @@ -88,21 +88,21 @@ public class StompSubscription { ignoreTransformation = true; } } - + StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation); command.setAction(Stomp.Responses.MESSAGE); if (subscriptionId != null) { command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); } - + protocolConverter.getTransportFilter().sendToStomp(command); } - + synchronized void onStompAbort(TransactionId transactionId) { unconsumedMessage.clear(); } - + synchronized void onStompCommit(TransactionId transactionId) { for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { Map.Entry entry = (Entry)iter.next(); @@ -116,9 +116,9 @@ public class StompSubscription { } synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { - + MessageId msgId = new MessageId(messageId); - + if (!dispatchedMessage.containsKey(msgId)) { return null; } @@ -139,7 +139,7 @@ public class StompSubscription { if (ack.getFirstMessageId() == null) { ack.setFirstMessageId(id); } - + if (transactionId != null) { if (!unconsumedMessage.contains(msg)) { unconsumedMessage.add(msg); @@ -147,8 +147,8 @@ public class StompSubscription { } else { iter.remove(); } - - + + count++; if (id.equals(msgId)) { @@ -168,8 +168,8 @@ public class StompSubscription { if (transactionId != null) { unconsumedMessage.add(dispatchedMessage.get(msgId)); ack.setTransactionId(transactionId); - } - dispatchedMessage.remove(messageId); + } + dispatchedMessage.remove(msgId); } return ack; }