Gary Tully 2009-05-22 21:23:48 +00:00
parent 2a1ec7c99e
commit a707594a8d
4 changed files with 13 additions and 17 deletions

View File

@ -897,13 +897,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
session.sendAck(pendingAck); session.sendAck(pendingAck);
pendingAck=null; pendingAck=null;
additionalWindowSize = deliveredCounter; deliveredCounter = 0;
additionalWindowSize = 0;
// When using DUPS ok, we do a real ack.
if (ackType == MessageAck.STANDARD_ACK_TYPE) {
deliveredCounter = 0;
additionalWindowSize = 0;
}
} }
} }

View File

@ -217,14 +217,12 @@ public class TopicSubscription extends AbstractSubscription {
} else if (ack.isDeliveredAck()) { } else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch // Message was delivered but not acknowledged: update pre-fetch
// counters. // counters.
if (ack.isInTransaction()) { // also. get these for a consumer expired message.
if (destination != null) { if (destination != null && !ack.isInTransaction()) {
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
} destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
} else {
// expired message - expired message in a transacion
dequeueCounter.addAndGet(ack.getMessageCount());
} }
dequeueCounter.addAndGet(ack.getMessageCount());
dispatchMatched(); dispatchMatched();
return; return;
} }

View File

@ -864,6 +864,7 @@ public class JMSConsumerTest extends JmsTestSupport {
} }
public void testAckOfExpired() throws Exception { public void testAckOfExpired() throws Exception {
ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false"); ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
connection = fact.createActiveMQConnection(); connection = fact.createActiveMQConnection();
@ -907,7 +908,9 @@ public class JMSConsumerTest extends JmsTestSupport {
DestinationViewMBean view = createView(destination); DestinationViewMBean view = createView(destination);
assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5); assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
} }
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {

View File

@ -72,7 +72,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = factory.createConnection(); connection = factory.createConnection();
session = connection.createSession(false, session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination); producer = session.createProducer(destination);
producer.setTimeToLive(100); producer.setTimeToLive(100);
consumer = session.createConsumer(destination); consumer = session.createConsumer(destination);
@ -117,7 +117,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
DestinationViewMBean view = createView(destination); DestinationViewMBean view = createView(destination);
assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5); assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
} }
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {