diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 8f1dc48957..4372d3ebc5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -649,10 +649,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); session.asyncSendPacket(ack); - // Adjust the counters - deliveredCounter -= deliveredMessages.size(); + // Adjust the window size. additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - + rollbackCounter = 0; + redeliveryDelay = 0; + } else { // stop the delivery of messages. @@ -684,6 +685,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + deliveredCounter -= deliveredMessages.size(); deliveredMessages.clear(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java b/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java index 9be5844716..c0fbafd5bf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java @@ -161,5 +161,7 @@ public class MessageDispatchChannel { } } - + public String toString() { + return list.toString(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java index a8475c14b0..329b1aa51e 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -36,7 +36,7 @@ import javax.jms.Topic; public abstract class DeadLetterTestSupport extends TestSupport { protected int messageCount = 10; - protected long timeToLive = 250; + protected long timeToLive = 0; protected Connection connection; protected Session session; protected MessageConsumer consumer;