diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 833f745f65..f7c410fbfe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -477,13 +477,20 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); } } - if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) { + if (session.isClientAcknowledge()) { m.setAcknowledgeCallback(new Callback() { public void execute() throws Exception { session.checkClosed(); session.acknowledge(); } }); + }else if (session.isIndividualAcknowledge()) { + m.setAcknowledgeCallback(new Callback() { + public void execute() throws Exception { + session.checkClosed(); + acknowledge(md); + } + }); } return m; } @@ -765,15 +772,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } else if (session.isDupsOkAcknowledge()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); - } else if (session.isClientAcknowledge()) { + } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); - } else if (session.isIndividualAcknowledge()){ - MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); - session.asyncSendPacket(ack); - synchronized(deliveredMessages){ - deliveredMessages.remove(md); - } - } + } else { throw new IllegalStateException("Invalid session state."); } @@ -855,6 +856,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } + + void acknowledge(MessageDispatch md) throws JMSException { + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + synchronized(deliveredMessages){ + deliveredMessages.remove(md); + } + } public void commit() throws JMSException { synchronized (deliveredMessages) {