From b1288f25de3553292114883418882f30af8e0e13 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 4 Sep 2008 18:21:46 +0000 Subject: [PATCH] Patch applied to https://issues.apache.org/activemq/browse/AMQ-1874 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692183 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/ProtocolConverter.java | 2 + .../activemq/transport/stomp/Stomp.java | 1 + .../transport/stomp/StompSubscription.java | 50 ++++++++++++------- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index f63a85b449..44e4328c72 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -389,6 +389,8 @@ public class ProtocolConverter { String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); + } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { + stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); } else { stompSubscription.setAckMode(StompSubscription.AUTO_ACK); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java index 265e2d234c..81185baf86 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java @@ -87,6 +87,7 @@ public interface Stomp { public interface AckModeValues { String AUTO = "auto"; String CLIENT = "client"; + String INDIVIDUAL = "client-individual"; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 2748576fbc..25ef407284 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -40,6 +40,7 @@ public class StompSubscription { public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO; public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT; + public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL; private final ProtocolConverter protocolConverter; private final String subscriptionId; @@ -66,6 +67,10 @@ public class StompSubscription { synchronized (this) { dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId()); } + } else if (ackMode == INDIVIDUAL_ACK) { + synchronized (this) { + dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId()); + } } else if (ackMode == AUTO_ACK) { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); protocolConverter.getTransportFilter().sendToActiveMQ(ack); @@ -99,31 +104,38 @@ public class StompSubscription { MessageAck ack = new MessageAck(); ack.setDestination(consumerInfo.getDestination()); - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setConsumerId(consumerInfo.getConsumerId()); - int count = 0; - for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { + if (ackMode == CLIENT_ACK) { + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + int count = 0; + for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { - Map.Entry entry = (Entry)iter.next(); - String id = (String)entry.getKey(); - MessageId msgid = (MessageId)entry.getValue(); + Map.Entry entry = (Entry)iter.next(); + String id = (String)entry.getKey(); + MessageId msgid = (MessageId)entry.getValue(); + + if (ack.getFirstMessageId() == null) { + ack.setFirstMessageId(msgid); + } + + iter.remove(); + count++; + + if (id.equals(messageId)) { + ack.setLastMessageId(msgid); + break; + } - if (ack.getFirstMessageId() == null) { - ack.setFirstMessageId(msgid); } - - iter.remove(); - count++; - - if (id.equals(messageId)) { - ack.setLastMessageId(msgid); - break; - } - + ack.setMessageCount(count); + } + else if (ackMode == INDIVIDUAL_ACK) { + ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); + MessageId msgid = (MessageId)dispatchedMessage.get(messageId); + ack.setMessageID(msgid); + dispatchedMessage.remove(messageId); } - - ack.setMessageCount(count); return ack; }