diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 2deab5a444..833f745f65 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -477,7 +477,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); } } - if (session.isClientAcknowledge()) { + if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) { m.setAcknowledgeCallback(new Callback() { public void execute() throws Exception { session.checkClosed(); @@ -767,7 +767,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); - } else { + } else if (session.isIndividualAcknowledge()){ + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + synchronized(deliveredMessages){ + deliveredMessages.remove(md); + } + } + else { throw new IllegalStateException("Invalid session state."); } } @@ -968,7 +975,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } afterMessageIsConsumed(md, expired); } catch (RuntimeException e) { - if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge()) { + if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) { // Redeliver the message } else { // Transacted or Client ack: Deliver the diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index c58e3d4f9d..32edbf2a0a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -132,6 +132,8 @@ import java.util.concurrent.atomic.AtomicBoolean; * @see javax.jms.XASession */ public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { + + public static final int INDIVIDUAL_ACKNOWLEDGE=4; public static interface DeliveryListener { void beforeDelivery(ActiveMQSession session, Message msg); @@ -710,7 +712,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta continue; } - if (isClientAcknowledge()) { + if (isClientAcknowledge()||isIndividualAcknowledge()) { message.setAcknowledgeCallback(new Callback() { public void execute() throws Exception { } @@ -1705,6 +1707,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta public boolean isDupsOkAcknowledge() { return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; } + + public boolean isIndividualAcknowledge(){ + return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; + } /** * Returns the message delivery listener.