mirror of https://github.com/apache/activemq.git
(AMQ-7496) - Improve inflight message size test accuracy and reliability
This commit is contained in:
parent
cc0bcdd5dc
commit
d539ff77b9
|
@ -68,6 +68,10 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
final protected boolean optimizeAcknowledge;
|
||||
final protected String destName = "testDest";
|
||||
|
||||
//use 10 second wait for assertions instead of the 30 default
|
||||
final protected long WAIT_DURATION = 10 * 1000;
|
||||
final protected long SLEEP_DURATION = 500;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
@ -147,20 +151,27 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
* @throws javax.jms.JMSException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test(timeout=31000)
|
||||
@Test(timeout=60000)
|
||||
public void testInflightMessageSize() throws Exception {
|
||||
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||
|
||||
final long size = sendMessages(10);
|
||||
|
||||
assertTrue("Inflight message size should be greater than the content length sent",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
|
||||
assertTrue("Inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedCounter() == 10));
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Destination inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
|
||||
receiveMessages(10);
|
||||
|
||||
assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
|
||||
assertTrue("Destination inflight message count should be 0",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight message size should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -168,29 +179,37 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=31000)
|
||||
@Test(timeout=60000)
|
||||
public void testInflightMessageSizePrefetchFilled() throws Exception {
|
||||
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||
//turn off extension to make the test reliable
|
||||
((AbstractSubscription)getSubscription()).setUsePrefetchExtension(false);
|
||||
|
||||
final long size = sendMessages(prefetch);
|
||||
|
||||
assertTrue("Inflight message size should be greater than content length",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
|
||||
assertTrue("Inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch));
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == prefetch, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Destination inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == prefetch, WAIT_DURATION, SLEEP_DURATION));
|
||||
|
||||
final long inFlightSize = getSubscription().getInFlightMessageSize();
|
||||
sendMessages(10);
|
||||
|
||||
//Prefetch has been filled, so the size should not change with 10 more messages
|
||||
assertTrue("Inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch));
|
||||
assertTrue("Inflight message size should not change", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize));
|
||||
assertTrue("Destination inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == prefetch, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == prefetch, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight message size should not change", Wait.waitFor(
|
||||
() -> getSubscription().getInFlightMessageSize() == inFlightSize, WAIT_DURATION, SLEEP_DURATION));
|
||||
receiveMessages(prefetch + 10);
|
||||
|
||||
assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
|
||||
assertTrue("Destination inflight message count should be 0",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight message size should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -198,16 +217,18 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=31000)
|
||||
@Test(timeout=60000)
|
||||
public void testInflightMessageSizePrefetchNotFilled() throws Exception {
|
||||
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||
|
||||
final long size = sendMessages(prefetch - 10);
|
||||
|
||||
assertTrue("Inflight message size should be greater than content length",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
|
||||
assertTrue("Inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch - 10));
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == prefetch - 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Destination inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == prefetch - 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
|
||||
//capture the inflight size and send 10 more messages
|
||||
final long inFlightSize = getSubscription().getInFlightMessageSize();
|
||||
|
@ -215,11 +236,16 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
|
||||
//Prefetch has NOT been filled, so the size should rise with 10 more messages
|
||||
assertTrue("Inflight message size should be greater than previous inlight size",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > inFlightSize));
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > inFlightSize, WAIT_DURATION, SLEEP_DURATION));
|
||||
|
||||
receiveMessages(prefetch);
|
||||
|
||||
assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
|
||||
assertTrue("Destination inflight message count should be 0",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight message size should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
}
|
||||
|
||||
|
||||
|
@ -229,7 +255,7 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
* @throws javax.jms.JMSException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test(timeout=31000)
|
||||
@Test(timeout=60000)
|
||||
public void testInflightMessageSizeRollback() throws Exception {
|
||||
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||
Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
|
||||
|
@ -237,9 +263,11 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
final long size = sendMessages(10);
|
||||
|
||||
assertTrue("Inflight message size should be greater than the content length sent",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
|
||||
assertTrue("Inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedCounter() == 10));
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Destination inflight message count should equal number of messages sent",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
|
||||
long inFlightSize = getSubscription().getInFlightMessageSize();
|
||||
|
||||
|
@ -248,14 +276,17 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
}
|
||||
session.rollback();
|
||||
|
||||
assertTrue("Destination inflight message count should not change on rollback",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should not change on rollback",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 10, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight message size should not change on rollback",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize));
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize, WAIT_DURATION, SLEEP_DURATION));
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
@Test(timeout=60000)
|
||||
public void testInflightMessageSizeConsumerExpiration() throws Exception {
|
||||
Assume.assumeTrue(useTopicSubscriptionInflightStats);
|
||||
Assume.assumeFalse(optimizeAcknowledge);
|
||||
|
||||
int ttl = 500;
|
||||
int messageCount = 10;
|
||||
|
@ -267,9 +298,13 @@ public abstract class AbstractInflightMessageSizeTest {
|
|||
//verify in flight size and count is now 0
|
||||
assertNull(consumer.receive(10));
|
||||
assertTrue("Expired count is wrong", Wait.waitFor(() -> brokerService.getDestination(getActiveMQDestination())
|
||||
.getDestinationStatistics().getExpired().getCount() == messageCount));
|
||||
assertTrue("Inflight message count should be 0", Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0));
|
||||
assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
|
||||
.getDestinationStatistics().getExpired().getCount() == messageCount, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Destination inflight message count should be 0",
|
||||
Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight sub dispatched message count should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
assertTrue("Inflight message size should be 0",
|
||||
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
|
||||
}
|
||||
|
||||
protected long sendMessages(int count) throws JMSException {
|
||||
|
|
Loading…
Reference in New Issue