diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 8ece15674b..683d484f2d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -238,6 +238,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + private boolean isAutoAcknowledgeEach() { + return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() ); + } + + private boolean isAutoAcknowledgeBatch() { + return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; + } + public StatsImpl getStats() { return stats; } @@ -642,7 +650,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void deliverAcks() { MessageAck ack = null; if (deliveryingAcknowledgements.compareAndSet(false, true)) { - if (session.isAutoAcknowledge()) { + if (isAutoAcknowledgeEach()) { synchronized(deliveredMessages) { ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { @@ -687,7 +695,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // Ack any delivered messages now. if (!session.getTransacted()) { deliverAcks(); - if (session.isDupsOkAcknowledge()) { + if (isAutoAcknowledgeBatch()) { acknowledge(); } } @@ -771,7 +779,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); - if (!session.isDupsOkAcknowledge()) { + if (!isAutoAcknowledgeBatch()) { synchronized(deliveredMessages) { deliveredMessages.addFirst(md); } @@ -795,7 +803,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC stats.onMessage(); if (session.getTransacted()) { // Do nothing. - } else if (session.isAutoAcknowledge()) { + } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { @@ -820,7 +828,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } deliveryingAcknowledgements.set(false); } - } else if (session.isDupsOkAcknowledge()) { + } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); @@ -1081,7 +1089,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } afterMessageIsConsumed(md, expired); } catch (RuntimeException e) { - if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) { + if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { // Redeliver the message } else { // Transacted or Client ack: Deliver the