From 4ddb3e3f6c3f15229ae75058a84cc461f452b901 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 16 May 2008 18:29:33 +0000 Subject: [PATCH] tidy up for https://issues.apache.org/activemq/browse/AMQ-1732 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@657155 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 833f745f65..f7c410fbfe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -477,13 +477,20 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); } } - if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) { + if (session.isClientAcknowledge()) { m.setAcknowledgeCallback(new Callback() { public void execute() throws Exception { session.checkClosed(); session.acknowledge(); } }); + }else if (session.isIndividualAcknowledge()) { + m.setAcknowledgeCallback(new Callback() { + public void execute() throws Exception { + session.checkClosed(); + acknowledge(md); + } + }); } return m; } @@ -765,15 +772,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } else if (session.isDupsOkAcknowledge()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); - } else if (session.isClientAcknowledge()) { + } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); - } else if (session.isIndividualAcknowledge()){ - MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); - session.asyncSendPacket(ack); - synchronized(deliveredMessages){ - deliveredMessages.remove(md); - } - } + } else { throw new IllegalStateException("Invalid session state."); } @@ -855,6 +856,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } + + void acknowledge(MessageDispatch md) throws JMSException { + MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); + session.asyncSendPacket(ack); + synchronized(deliveredMessages){ + deliveredMessages.remove(md); + } + } public void commit() throws JMSException { synchronized (deliveredMessages) {