git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@777463 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-05-22 10:46:20 +00:00
parent 461af7c9e1
commit b997d257d5
4 changed files with 94 additions and 5 deletions

View File

@ -779,8 +779,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return;
}
if (messageExpired) {
// do nothing since STANDARD_ACK will be sent
return;
synchronized (deliveredMessages) {
deliveredMessages.remove(md);
}
stats.getExpiredMessageCount().increment();
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
stats.onMessage();
if (session.getTransacted()) {

View File

@ -217,9 +217,13 @@ public class TopicSubscription extends AbstractSubscription {
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
dequeueCounter.addAndGet(ack.getMessageCount());
if (destination != null) {
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
if (ack.isInTransaction()) {
if (destination != null) {
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
}
} else {
// expired message - expired message in a transacion
dequeueCounter.addAndGet(ack.getMessageCount());
}
dispatchMatched();
return;

View File

@ -136,6 +136,16 @@ public class JMSEndpointStatsImpl extends StatsImpl {
}
}
@Override
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
messageCount.setEnabled(enabled);
messageRateTime.setEnabled(enabled);
pendingMessageCount.setEnabled(enabled);
expiredMessageCount.setEnabled(enabled);
messageWaitTime.setEnabled(enabled);
}
public void dump(IndentPrinter out) {
out.printIndent();
out.println(messageCount);

View File

@ -19,6 +19,7 @@ package org.apache.activemq;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -35,8 +37,13 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
@ -850,4 +857,69 @@ public class JMSConsumerTest extends JmsTestSupport {
redispatchSession.close();
}
public void initCombosForTestAckOfExpired() {
addCombinationValues("destinationType",
new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
}
public void testAckOfExpired() throws Exception {
ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
connection = fact.createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ?
session.createQueue("test") : session.createTopic("test"));
MessageConsumer consumer = session.createConsumer(destination);
connection.setStatsEnabled(true);
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(destination);
producer.setTimeToLive(1000);
final int count = 4;
for (int i = 0; i < count; i++) {
TextMessage message = sendSession.createTextMessage("" + i);
producer.send(message);
}
// let first bunch in queue expire
Thread.sleep(1000);
producer.setTimeToLive(0);
for (int i = 0; i < count; i++) {
TextMessage message = sendSession.createTextMessage("no expiry" + i);
producer.send(message);
}
ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
for(int i=0; i<count; i++) {
TextMessage msg = (TextMessage) amqConsumer.receive();
assertNotNull(msg);
assertTrue(msg.getText().contains("no expiry"));
// force an ack when there are expired messages
amqConsumer.acknowledge();
}
assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
DestinationViewMBean view = createView(destination);
assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
}
return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
}
}