From b997d257d53979ccbfa61669c0c102251a37e1ac Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 22 May 2009 10:46:20 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2265, revert https://issues.apache.org/activemq/browse/AMQ-2262, inflight count stil needs work git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@777463 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 7 +- .../broker/region/TopicSubscription.java | 10 ++- .../management/JMSEndpointStatsImpl.java | 10 +++ .../org/apache/activemq/JMSConsumerTest.java | 72 +++++++++++++++++++ 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 6617b2baaf..67f635a4a7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -779,8 +779,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return; } if (messageExpired) { - // do nothing since STANDARD_ACK will be sent - return; + synchronized (deliveredMessages) { + deliveredMessages.remove(md); + } + stats.getExpiredMessageCount().increment(); + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } else { stats.onMessage(); if (session.getTransacted()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index b265d70dcb..bfa3476075 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -217,9 +217,13 @@ public class TopicSubscription extends AbstractSubscription { } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. - dequeueCounter.addAndGet(ack.getMessageCount()); - if (destination != null) { - destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); + if (ack.isInTransaction()) { + if (destination != null) { + destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); + } + } else { + // expired message - expired message in a transacion + dequeueCounter.addAndGet(ack.getMessageCount()); } dispatchMatched(); return; diff --git a/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java b/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java index 3dd56e99fc..cc692efe79 100755 --- a/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java @@ -136,6 +136,16 @@ public class JMSEndpointStatsImpl extends StatsImpl { } } + @Override + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + messageCount.setEnabled(enabled); + messageRateTime.setEnabled(enabled); + pendingMessageCount.setEnabled(enabled); + expiredMessageCount.setEnabled(enabled); + messageWaitTime.setEnabled(enabled); + } + public void dump(IndentPrinter out) { out.printIndent(); out.println(messageCount); diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 05a9bddbb3..3a4cf02094 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.activemq; import java.lang.Thread.UncaughtExceptionHandler; import java.util.HashMap; import java.util.Map; +import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -35,8 +37,13 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; import junit.framework.Test; + +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.commons.logging.Log; @@ -849,5 +856,70 @@ public class JMSConsumerTest extends JmsTestSupport { assertNull(redispatchConsumer.receive(500)); redispatchSession.close(); } + + + public void initCombosForTestAckOfExpired() { + addCombinationValues("destinationType", + new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testAckOfExpired() throws Exception { + ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false"); + connection = fact.createActiveMQConnection(); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ? + session.createQueue("test") : session.createTopic("test")); + + MessageConsumer consumer = session.createConsumer(destination); + connection.setStatsEnabled(true); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(destination); + producer.setTimeToLive(1000); + final int count = 4; + for (int i = 0; i < count; i++) { + TextMessage message = sendSession.createTextMessage("" + i); + producer.send(message); + } + + // let first bunch in queue expire + Thread.sleep(1000); + + producer.setTimeToLive(0); + for (int i = 0; i < count; i++) { + TextMessage message = sendSession.createTextMessage("no expiry" + i); + producer.send(message); + } + + ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer; + + for(int i=0; i