git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-09-04 18:21:46 +00:00
parent 4c92244758
commit b1288f25de
3 changed files with 34 additions and 19 deletions

View File

@ -389,6 +389,8 @@ public class ProtocolConverter {
String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
} else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
} else {
stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
}

View File

@ -87,6 +87,7 @@ public interface Stomp {
public interface AckModeValues {
String AUTO = "auto";
String CLIENT = "client";
String INDIVIDUAL = "client-individual";
}
}

View File

@ -40,6 +40,7 @@ public class StompSubscription {
public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
private final ProtocolConverter protocolConverter;
private final String subscriptionId;
@ -66,6 +67,10 @@ public class StompSubscription {
synchronized (this) {
dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
}
} else if (ackMode == INDIVIDUAL_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
@ -99,31 +104,38 @@ public class StompSubscription {
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setConsumerId(consumerInfo.getConsumerId());
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
if (ackMode == CLIENT_ACK) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Entry)iter.next();
String id = (String)entry.getKey();
MessageId msgid = (MessageId)entry.getValue();
Map.Entry entry = (Entry)iter.next();
String id = (String)entry.getKey();
MessageId msgid = (MessageId)entry.getValue();
if (ack.getFirstMessageId() == null) {
ack.setFirstMessageId(msgid);
}
iter.remove();
count++;
if (id.equals(messageId)) {
ack.setLastMessageId(msgid);
break;
}
if (ack.getFirstMessageId() == null) {
ack.setFirstMessageId(msgid);
}
iter.remove();
count++;
if (id.equals(messageId)) {
ack.setLastMessageId(msgid);
break;
}
ack.setMessageCount(count);
}
else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
ack.setMessageID(msgid);
dispatchedMessage.remove(messageId);
}
ack.setMessageCount(count);
return ack;
}