mirror of https://github.com/apache/activemq.git
AMQ-9159 - Add a test case to verify inflight message stats for wildcard
consumer when a destination is removed
This commit is contained in:
parent
2b81946e34
commit
f6e26085cf
|
@ -66,7 +66,9 @@ public abstract class AbstractInflightMessageSizeTest {
|
||||||
protected boolean useTopicSubscriptionInflightStats;
|
protected boolean useTopicSubscriptionInflightStats;
|
||||||
final protected int ackType;
|
final protected int ackType;
|
||||||
final protected boolean optimizeAcknowledge;
|
final protected boolean optimizeAcknowledge;
|
||||||
final protected String destName = "testDest";
|
final protected String destNamePrefix = "testDest";
|
||||||
|
final protected String destName = "testDest.1";
|
||||||
|
final protected String destName2 = "testDest.2";
|
||||||
|
|
||||||
//use 10 second wait for assertions instead of the 30 default
|
//use 10 second wait for assertions instead of the 30 default
|
||||||
protected final long WAIT_DURATION = 10 * 1000;
|
protected final long WAIT_DURATION = 10 * 1000;
|
||||||
|
@ -367,8 +369,62 @@ public abstract class AbstractInflightMessageSizeTest {
|
||||||
getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testInflightMessageSizeRemoveDestination() throws Exception {
|
||||||
|
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||||
|
//Close as we will re-create with a wildcard sub to get messages from 2 destinations
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
consumer = getMessageConsumer(destNamePrefix + ".>");
|
||||||
|
sendMessages(10);
|
||||||
|
sendMessages(10, getActiveMQDestination(destName2));
|
||||||
|
Destination amqDestination2 = TestSupport.getDestination(brokerService, getActiveMQDestination(destName2));
|
||||||
|
final Subscription subscription = getSubscription();
|
||||||
|
|
||||||
|
//Wait for the 10 messages to get dispatched and then close the consumer to test cleanup
|
||||||
|
assertTrue("Should be 10 in flight messages",
|
||||||
|
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Should be 10 in flight messages",
|
||||||
|
Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Inflight message size should be 20",
|
||||||
|
Wait.waitFor(() -> subscription.getInFlightSize() == 20, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Inflight message size should be greater than 0",
|
||||||
|
Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
|
||||||
|
//remove 1 destination, leaving 10 in flight
|
||||||
|
brokerService.getBroker().removeDestination(brokerService.getAdminConnectionContext(), getActiveMQDestination(), 1000);
|
||||||
|
|
||||||
|
//Make sure all the stats are updated after 1 destination removal
|
||||||
|
assertTrue("Destination inflight message count should be 0",
|
||||||
|
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Destination inflight message count should still be 10",
|
||||||
|
Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Inflight message size should be 10",
|
||||||
|
Wait.waitFor(() -> subscription.getInFlightSize() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Inflight message size should be greater than 0",
|
||||||
|
Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
|
||||||
|
//remove second dest
|
||||||
|
brokerService.getBroker().removeDestination(brokerService.getAdminConnectionContext(), getActiveMQDestination(destName2), 1000);
|
||||||
|
|
||||||
|
assertTrue("Destination inflight message count should be 0",
|
||||||
|
Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Inflight message size should be 0",
|
||||||
|
Wait.waitFor(() -> subscription.getInFlightSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
assertTrue("Inflight message size should be 0",
|
||||||
|
Wait.waitFor(() -> subscription.getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||||
|
}
|
||||||
|
|
||||||
protected long sendMessages(int count) throws JMSException {
|
protected long sendMessages(int count) throws JMSException {
|
||||||
return sendMessages(count, null);
|
return sendMessages(count, null, dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long sendMessages(int count, javax.jms.Destination dest) throws JMSException {
|
||||||
|
return sendMessages(count, null, dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long sendMessages(int count, Integer ttl) throws JMSException {
|
||||||
|
return sendMessages(count, ttl, dest);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -377,7 +433,7 @@ public abstract class AbstractInflightMessageSizeTest {
|
||||||
* @param count
|
* @param count
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
protected long sendMessages(int count, Integer ttl) throws JMSException {
|
protected long sendMessages(int count, Integer ttl, javax.jms.Destination dest) throws JMSException {
|
||||||
MessageProducer producer = session.createProducer(dest);
|
MessageProducer producer = session.createProducer(dest);
|
||||||
if (ttl != null) {
|
if (ttl != null) {
|
||||||
producer.setTimeToLive(ttl);
|
producer.setTimeToLive(ttl);
|
||||||
|
@ -412,10 +468,22 @@ public abstract class AbstractInflightMessageSizeTest {
|
||||||
|
|
||||||
protected abstract Subscription getSubscription();
|
protected abstract Subscription getSubscription();
|
||||||
|
|
||||||
protected abstract ActiveMQDestination getActiveMQDestination();
|
protected ActiveMQDestination getActiveMQDestination() {
|
||||||
|
return getActiveMQDestination(destName);
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract MessageConsumer getMessageConsumer() throws JMSException;
|
protected abstract ActiveMQDestination getActiveMQDestination(String destName);
|
||||||
|
|
||||||
protected abstract javax.jms.Destination getDestination() throws JMSException ;
|
protected MessageConsumer getMessageConsumer() throws JMSException {
|
||||||
|
return getMessageConsumer(destName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract MessageConsumer getMessageConsumer(String destName) throws JMSException;
|
||||||
|
|
||||||
|
protected javax.jms.Destination getDestination() throws JMSException {
|
||||||
|
return getDestination(destName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract javax.jms.Destination getDestination(String destName) throws JMSException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.activemq.broker.region.Topic;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.util.SubscriptionKey;
|
import org.apache.activemq.util.SubscriptionKey;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@ -40,8 +42,8 @@ public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflight
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MessageConsumer getMessageConsumer() throws JMSException {
|
protected MessageConsumer getMessageConsumer(String destName) throws JMSException {
|
||||||
return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1");
|
return session.createDurableSubscriber(getDestination(destName), "sub1");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -50,13 +52,22 @@ public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflight
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected javax.jms.Topic getDestination() throws JMSException {
|
protected javax.jms.Topic getDestination(String destName) throws JMSException {
|
||||||
return session.createTopic(destName);
|
return session.createTopic(destName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActiveMQDestination getActiveMQDestination() {
|
protected ActiveMQDestination getActiveMQDestination(String destName) {
|
||||||
return new ActiveMQTopic(destName);
|
return new ActiveMQTopic(destName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testInflightMessageSizeRemoveDestination() throws Exception {
|
||||||
|
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||||
|
//Close as we will re-create with a wildcard sub
|
||||||
|
consumer.close();
|
||||||
|
session.unsubscribe("sub1");
|
||||||
|
super.testInflightMessageSizeRemoveDestination();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,22 +40,22 @@ public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMe
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MessageConsumer getMessageConsumer() throws JMSException {
|
protected MessageConsumer getMessageConsumer(String destName) throws JMSException {
|
||||||
return session.createConsumer(dest);
|
return session.createConsumer(getDestination(destName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Subscription getSubscription() {
|
protected Subscription getSubscription() {
|
||||||
return ((Queue)amqDestination).getConsumers().get(0);
|
return amqDestination.getConsumers().get(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Destination getDestination() throws JMSException {
|
protected Destination getDestination(String destName) throws JMSException {
|
||||||
return session.createQueue(destName);
|
return session.createQueue(destName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActiveMQDestination getActiveMQDestination() {
|
protected ActiveMQDestination getActiveMQDestination(String destName) {
|
||||||
return new ActiveMQQueue(destName);
|
return new ActiveMQQueue(destName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,8 +43,8 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MessageConsumer getMessageConsumer() throws JMSException {
|
protected MessageConsumer getMessageConsumer(String destName) throws JMSException {
|
||||||
return session.createConsumer(dest);
|
return session.createConsumer(getDestination(destName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -53,12 +53,12 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Destination getDestination() throws JMSException {
|
protected Destination getDestination(String destName) throws JMSException {
|
||||||
return session.createTopic(destName);
|
return session.createTopic(destName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActiveMQDestination getActiveMQDestination() {
|
protected ActiveMQDestination getActiveMQDestination(String destName) {
|
||||||
return new ActiveMQTopic(destName);
|
return new ActiveMQTopic(destName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue