AMQ-7496 - Properly decrement inflight message size on message

expiration

Also clean up some of the handling of inflight metrics in Prefetch
subscription
This commit is contained in:
Christopher L. Shannon (cshannon) 2020-06-10 09:10:09 -04:00
parent c8d4ebfe7a
commit cc0bcdd5dc
2 changed files with 76 additions and 67 deletions

View File

@ -174,7 +174,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
pending.remove(); pending.remove();
createMessageDispatch(node, node.getMessage()); createMessageDispatch(node, node.getMessage());
dispatched.add(node); dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
onDispatch(node, node.getMessage()); onDispatch(node, node.getMessage());
} }
return; return;
@ -224,7 +223,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Don't remove the nodes until we are committed. // Don't remove the nodes until we are committed.
if (!context.isInTransaction()) { if (!context.isInTransaction()) {
getSubscriptionStatistics().getDequeues().increment(); getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
removeList.add(node); removeList.add(node);
contractPrefetchExtension(1); contractPrefetchExtension(1);
} else { } else {
@ -240,7 +238,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
for (final MessageReference node : removeList) { for (final MessageReference node : removeList) {
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); decrementPrefetchCounter(node);
} }
// this only happens after a reconnect - get an ack which is not // this only happens after a reconnect - get an ack which is not
// valid // valid
@ -256,9 +254,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Don't remove the nodes until we are committed - immediateAck option // Don't remove the nodes until we are committed - immediateAck option
if (!context.isInTransaction()) { if (!context.isInTransaction()) {
getSubscriptionStatistics().getDequeues().increment(); getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); decrementPrefetchCounter(node);
contractPrefetchExtension(1); contractPrefetchExtension(1);
} else { } else {
registerRemoveSync(context, node); registerRemoveSync(context, node);
@ -306,7 +303,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
regionDestination.messageExpired(context, this, node); regionDestination.messageExpired(context, this, node);
} }
iter.remove(); iter.remove();
nodeDest.getDestinationStatistics().getInflight().decrement(); decrementPrefetchCounter(node);
if (ack.getLastMessageId().equals(messageId)) { if (ack.getLastMessageId().equals(messageId)) {
contractPrefetchExtension(1); contractPrefetchExtension(1);
@ -364,8 +361,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (inAckRange) { if (inAckRange) {
sendToDLQ(context, node, ack.getPoisonCause()); sendToDLQ(context, node, ack.getPoisonCause());
Destination nodeDest = (Destination) node.getRegionDestination(); Destination nodeDest = (Destination) node.getRegionDestination();
nodeDest.getDestinationStatistics()
.getInflight().decrement();
removeList.add(node); removeList.add(node);
getSubscriptionStatistics().getDequeues().increment(); getSubscriptionStatistics().getDequeues().increment();
index++; index++;
@ -380,7 +375,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
for (final MessageReference node : removeList) { for (final MessageReference node : removeList) {
dispatched.remove(node); dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); decrementPrefetchCounter(node);
} }
if (!callDispatchMatched) { if (!callDispatchMatched) {
throw new JMSException( throw new JMSException(
@ -416,8 +411,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (dispatched.remove(node)) { if (dispatched.remove(node)) {
// if consumer is removed, dispatched will be empty and inflight will // if consumer is removed, dispatched will be empty and inflight will
// already have been adjusted // already have been adjusted
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); decrementPrefetchCounter(node);
nodeDest.getDestinationStatistics().getInflight().decrement();
} }
} }
contractPrefetchExtension(1); contractPrefetchExtension(1);
@ -708,9 +702,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
MessageDispatch md = createMessageDispatch(node, message); MessageDispatch md = createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) { if (node != QueueMessageReference.NULL_MESSAGE) {
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node); dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); getSubscriptionStatistics().getDispatched().increment();
} }
if (getPrefetchSize() == 0) { if (getPrefetchSize() == 0) {
while (true) { while (true) {
@ -737,7 +730,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (nodeDest != null) { if (nodeDest != null) {
if (node != QueueMessageReference.NULL_MESSAGE) { if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment(); nodeDest.getDestinationStatistics().getDispatched().increment();
nodeDest.getDestinationStatistics().getInflight().increment(); incrementPrefetchCounter(node);
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
} }
} }
@ -759,7 +752,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (nodeDest != null) { if (nodeDest != null) {
if (node != QueueMessageReference.NULL_MESSAGE) { if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment(); nodeDest.getDestinationStatistics().getDispatched().increment();
nodeDest.getDestinationStatistics().getInflight().increment(); incrementPrefetchCounter(node);
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
} }
} }
@ -861,4 +854,14 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
LOG.trace("Caught exception during dispatch after prefetch change.", e); LOG.trace("Caught exception during dispatch after prefetch change.", e);
} }
} }
private void incrementPrefetchCounter(final MessageReference node) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().increment();
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
private void decrementPrefetchCounter(final MessageReference node) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.statistics; package org.apache.activemq.statistics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.Arrays; import java.util.Arrays;
@ -36,6 +37,7 @@ import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -151,21 +153,14 @@ public abstract class AbstractInflightMessageSizeTest {
final long size = sendMessages(10); final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be greater than the content length sent",
@Override Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
public boolean isSatisified() throws Exception { assertTrue("Inflight message count should equal number of messages sent",
return getSubscription().getInFlightMessageSize() > size; Wait.waitFor(() -> getSubscription().getDispatchedCounter() == 10));
}
}));
receiveMessages(10); receiveMessages(10);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
} }
/** /**
@ -176,30 +171,26 @@ public abstract class AbstractInflightMessageSizeTest {
@Test(timeout=31000) @Test(timeout=31000)
public void testInflightMessageSizePrefetchFilled() throws Exception { public void testInflightMessageSizePrefetchFilled() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats); Assume.assumeTrue(useTopicSubscriptionInflightStats);
//turn off extension to make the test reliable
((AbstractSubscription)getSubscription()).setUsePrefetchExtension(false);
final long size = sendMessages(prefetch); final long size = sendMessages(prefetch);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be greater than content length",
@Override Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
public boolean isSatisified() throws Exception { assertTrue("Inflight message count should equal number of messages sent",
return getSubscription().getInFlightMessageSize() > size; Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch));
}
}));
final long inFlightSize = getSubscription().getInFlightMessageSize(); final long inFlightSize = getSubscription().getInFlightMessageSize();
sendMessages(10); sendMessages(10);
//Prefetch has been filled, so the size should not change with 10 more messages //Prefetch has been filled, so the size should not change with 10 more messages
assertEquals("Inflight message size should not change", inFlightSize, getSubscription().getInFlightMessageSize()); assertTrue("Inflight message count should equal number of messages sent",
Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch));
assertTrue("Inflight message size should not change", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize));
receiveMessages(prefetch + 10); receiveMessages(prefetch + 10);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
} }
/** /**
@ -213,33 +204,22 @@ public abstract class AbstractInflightMessageSizeTest {
final long size = sendMessages(prefetch - 10); final long size = sendMessages(prefetch - 10);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be greater than content length",
@Override Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
public boolean isSatisified() throws Exception { assertTrue("Inflight message count should equal number of messages sent",
return getSubscription().getInFlightMessageSize() > size; Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch - 10));
}
}));
//capture the inflight size and send 10 more messages //capture the inflight size and send 10 more messages
final long inFlightSize = getSubscription().getInFlightMessageSize(); final long inFlightSize = getSubscription().getInFlightMessageSize();
sendMessages(10); sendMessages(10);
//Prefetch has NOT been filled, so the size should rise with 10 more messages //Prefetch has NOT been filled, so the size should rise with 10 more messages
assertTrue("Inflight message size should be greater than previous inlight size", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be greater than previous inlight size",
@Override Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > inFlightSize));
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() > inFlightSize;
}
}));
receiveMessages(prefetch); receiveMessages(prefetch);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
} }
@ -256,12 +236,10 @@ public abstract class AbstractInflightMessageSizeTest {
final long size = sendMessages(10); final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() { assertTrue("Inflight message size should be greater than the content length sent",
@Override Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
public boolean isSatisified() throws Exception { assertTrue("Inflight message count should equal number of messages sent",
return getSubscription().getInFlightMessageSize() > size; Wait.waitFor(() -> getSubscription().getDispatchedCounter() == 10));
}
}));
long inFlightSize = getSubscription().getInFlightMessageSize(); long inFlightSize = getSubscription().getInFlightMessageSize();
@ -270,7 +248,32 @@ public abstract class AbstractInflightMessageSizeTest {
} }
session.rollback(); session.rollback();
assertEquals("Inflight message size should not change on rollback", inFlightSize, getSubscription().getInFlightMessageSize()); assertTrue("Inflight message size should not change on rollback",
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize));
}
@Test(timeout=30000)
public void testInflightMessageSizeConsumerExpiration() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
Assume.assumeFalse(optimizeAcknowledge);
int ttl = 500;
int messageCount = 10;
//Send 10 messages with a TTL of 500 ms which is long enough to be paged in and then wait for TTL to pass
sendMessages(10, ttl);
Thread.sleep(ttl * 2);
//Make sure we can't receive and all 10 messages were expired
//verify in flight size and count is now 0
assertNull(consumer.receive(10));
assertTrue("Expired count is wrong", Wait.waitFor(() -> brokerService.getDestination(getActiveMQDestination())
.getDestinationStatistics().getExpired().getCount() == messageCount));
assertTrue("Inflight message count should be 0", Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0));
assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
}
protected long sendMessages(int count) throws JMSException {
return sendMessages(count, null);
} }
/** /**
@ -279,8 +282,11 @@ public abstract class AbstractInflightMessageSizeTest {
* @param count * @param count
* @throws JMSException * @throws JMSException
*/ */
protected long sendMessages(int count) throws JMSException { protected long sendMessages(int count, Integer ttl) throws JMSException {
MessageProducer producer = session.createProducer(dest); MessageProducer producer = session.createProducer(dest);
if (ttl != null) {
producer.setTimeToLive(ttl);
}
long totalSize = 0; long totalSize = 0;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
Random r = new Random(); Random r = new Random();