git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@819035 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2009-09-25 21:45:23 +00:00
parent 7290d95b88
commit 87cdbbe961
1 changed files with 14 additions and 14 deletions

View File

@ -35,7 +35,7 @@ import org.apache.activemq.command.TransactionId;
/** /**
* Keeps track of the STOMP subscription so that acking is correctly done. * Keeps track of the STOMP subscription so that acking is correctly done.
* *
* @author <a href="http://hiramchirino.com">chirino</a> * @author <a href="http://hiramchirino.com">chirino</a>
*/ */
public class StompSubscription { public class StompSubscription {
@ -54,7 +54,7 @@ public class StompSubscription {
private String ackMode = AUTO_ACK; private String ackMode = AUTO_ACK;
private ActiveMQDestination destination; private ActiveMQDestination destination;
private String transformation; private String transformation;
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) { public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
this.protocolConverter = stompTransport; this.protocolConverter = stompTransport;
@ -79,7 +79,7 @@ public class StompSubscription {
} }
boolean ignoreTransformation = false; boolean ignoreTransformation = false;
if (transformation != null) { if (transformation != null) {
message.setReadOnlyProperties(false); message.setReadOnlyProperties(false);
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation); message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
@ -88,21 +88,21 @@ public class StompSubscription {
ignoreTransformation = true; ignoreTransformation = true;
} }
} }
StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation); StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
command.setAction(Stomp.Responses.MESSAGE); command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId != null) { if (subscriptionId != null) {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
} }
protocolConverter.getTransportFilter().sendToStomp(command); protocolConverter.getTransportFilter().sendToStomp(command);
} }
synchronized void onStompAbort(TransactionId transactionId) { synchronized void onStompAbort(TransactionId transactionId) {
unconsumedMessage.clear(); unconsumedMessage.clear();
} }
synchronized void onStompCommit(TransactionId transactionId) { synchronized void onStompCommit(TransactionId transactionId) {
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Entry)iter.next(); Map.Entry entry = (Entry)iter.next();
@ -116,9 +116,9 @@ public class StompSubscription {
} }
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
MessageId msgId = new MessageId(messageId); MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) { if (!dispatchedMessage.containsKey(msgId)) {
return null; return null;
} }
@ -139,7 +139,7 @@ public class StompSubscription {
if (ack.getFirstMessageId() == null) { if (ack.getFirstMessageId() == null) {
ack.setFirstMessageId(id); ack.setFirstMessageId(id);
} }
if (transactionId != null) { if (transactionId != null) {
if (!unconsumedMessage.contains(msg)) { if (!unconsumedMessage.contains(msg)) {
unconsumedMessage.add(msg); unconsumedMessage.add(msg);
@ -147,8 +147,8 @@ public class StompSubscription {
} else { } else {
iter.remove(); iter.remove();
} }
count++; count++;
if (id.equals(msgId)) { if (id.equals(msgId)) {
@ -168,8 +168,8 @@ public class StompSubscription {
if (transactionId != null) { if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId)); unconsumedMessage.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId); ack.setTransactionId(transactionId);
} }
dispatchedMessage.remove(messageId); dispatchedMessage.remove(msgId);
} }
return ack; return ack;
} }