ARTEMIS-1011 adjust slow-consumer detection logic

Adjust slow-consumer detection logic to use the number of messages in
the queue and not just the number of messages added since the last
check. This means the getRate() method now returns the rate of messages
which it *could* have dispatched since the last check rather than the
rate at which it received messages. This is a more reliable metric to
ensure the slow-consumer detection logic doesn't flag a consumer as
slow unfairly. Although the reliability will come at a performance cost
since getMessageCount() must lock the queue.
This commit is contained in:
Justin Bertram 2017-03-09 08:11:00 -06:00 committed by Clebert Suconic
parent 4fabbc8041
commit 19ebbfb5f0
3 changed files with 70 additions and 13 deletions

View File

@ -232,8 +232,6 @@ public class QueueImpl implements Queue {
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
private ScheduledFuture slowConsumerReaperFuture;
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
@ -2816,13 +2814,11 @@ public class QueueImpl implements Queue {
@Override
public float getRate() {
long locaMessageAdded = getMessagesAdded();
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
if (timeSlice == 0) {
messagesAddedSnapshot.getAndSet(locaMessageAdded);
return 0.0f;
}
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
}
// Inner classes
@ -3131,17 +3127,19 @@ public class QueueImpl implements Queue {
@Override
public void run() {
float queueRate = getRate();
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
Set<Consumer> consumersSet = getConsumers();
if (consumersSet.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
} else if (queueRate < (threshold * consumersSet.size())) {
}
float queueRate = getRate();
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
if (queueRate < (threshold * consumersSet.size())) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}

View File

@ -18,8 +18,23 @@ By default the server will not detect slow consumers. If slow consumer
detection is desired then see [queue attributes chapter](queue-attributes.md)
for more details.
The calculation to determine whether or not a consumer is slow only
inspects the number of messages a particular consumer has
The calculation to determine whether or not a consumer is slow inspects two notable
metrics:
1. The queue's message count.
2. The number of messages a consumer has acknowledged.
The queue's message count is inspected to ensure that the queue actually has had enough
messages to actually satisfy the consumer's threshold. For example, it would not be
fair to mark a consumer as "slow" if the queue received no messages. This is also notable
because in order to get an accurate message count the queue must be locked which can
negatively impact performance in high-throughput use-cases. Therefore slow-consumer
detection is only recommended on queues where it is absolutely necessary and in those
cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not
running so often as to negatively impact performance.
Finally, the algorithm inspects the number of messages a particular consumer has
*acknowledged*. It doesn't take into account whether or not flow control
has been enabled on the consumer, whether or not the consumer is
streaming a large message, etc. Keep this in mind when configuring slow

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
@ -243,6 +244,49 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
}
@Test
public void testSlowConsumerWithBurst() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(true, true));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 20;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
assertPaging();
final Queue queue = server.locateQueue(QUEUE);
queue.getRate();
logger.info("Creating consumer...");
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
forceGC();
return queue.getConsumerCount() == 0;
}
}, 3000, 100);
try {
consumer.receive(500);
fail("Consumer should have been killed since it's slow!");
} catch (ActiveMQObjectClosedException e) {
// ignore
} catch (Exception e) {
fail("Wrong exception thrown");
}
}
@Test
public void testFastThenSlowConsumerSpared() throws Exception {
locator.setAckBatchSize(0);