This commit is contained in:
Clebert Suconic 2017-03-09 17:27:12 -05:00
commit 2b7807f7e0
3 changed files with 66 additions and 13 deletions

View File

@ -232,8 +232,6 @@ public class QueueImpl implements Queue {
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
private ScheduledFuture slowConsumerReaperFuture;
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
@ -2816,13 +2814,11 @@ public class QueueImpl implements Queue {
@Override
public float getRate() {
long locaMessageAdded = getMessagesAdded();
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
if (timeSlice == 0) {
messagesAddedSnapshot.getAndSet(locaMessageAdded);
return 0.0f;
}
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
}
// Inner classes
@ -3131,17 +3127,19 @@ public class QueueImpl implements Queue {
@Override
public void run() {
float queueRate = getRate();
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
Set<Consumer> consumersSet = getConsumers();
if (consumersSet.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
} else if (queueRate < (threshold * consumersSet.size())) {
}
float queueRate = getRate();
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
if (queueRate < (threshold * consumersSet.size())) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}

View File

@ -18,8 +18,23 @@ By default the server will not detect slow consumers. If slow consumer
detection is desired then see [queue attributes chapter](queue-attributes.md)
for more details.
The calculation to determine whether or not a consumer is slow only
inspects the number of messages a particular consumer has
The calculation to determine whether or not a consumer is slow inspects two notable
metrics:
1. The queue's message count.
2. The number of messages a consumer has acknowledged.
The queue's message count is inspected to ensure that the queue actually has had enough
messages to actually satisfy the consumer's threshold. For example, it would not be
fair to mark a consumer as "slow" if the queue received no messages. This is also notable
because in order to get an accurate message count the queue must be locked which can
negatively impact performance in high-throughput use-cases. Therefore slow-consumer
detection is only recommended on queues where it is absolutely necessary and in those
cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not
running so often as to negatively impact performance.
Finally, the algorithm inspects the number of messages a particular consumer has
*acknowledged*. It doesn't take into account whether or not flow control
has been enabled on the consumer, whether or not the consumer is
streaming a large message, etc. Keep this in mind when configuring slow

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
@ -243,6 +244,45 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
}
@Test
public void testSlowConsumerWithBurst() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(true, true));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
final int numMessages = 20;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
assertPaging();
final Queue queue = server.locateQueue(QUEUE);
queue.getRate();
logger.info("Creating consumer...");
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
Wait.waitFor(consumer::isClosed, 3000, 100);
Assert.assertTrue(consumer.isClosed());
try {
consumer.receive(500);
fail("Consumer should have been killed since it's slow!");
} catch (ActiveMQObjectClosedException e) {
// ignore
} catch (Exception e) {
fail("Wrong exception thrown");
}
}
@Test
public void testFastThenSlowConsumerSpared() throws Exception {
locator.setAckBatchSize(0);