This closes #1086
This commit is contained in:
commit
f1891f1627
|
@ -233,6 +233,8 @@ public class QueueImpl implements Queue {
|
||||||
|
|
||||||
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
|
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
|
||||||
|
|
||||||
|
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
|
||||||
|
|
||||||
private ScheduledFuture slowConsumerReaperFuture;
|
private ScheduledFuture slowConsumerReaperFuture;
|
||||||
|
|
||||||
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
|
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
|
||||||
|
@ -2815,11 +2817,13 @@ public class QueueImpl implements Queue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public float getRate() {
|
public float getRate() {
|
||||||
|
long locaMessageAdded = getMessagesAdded();
|
||||||
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
|
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
|
||||||
if (timeSlice == 0) {
|
if (timeSlice == 0) {
|
||||||
|
messagesAddedSnapshot.getAndSet(locaMessageAdded);
|
||||||
return 0.0f;
|
return 0.0f;
|
||||||
}
|
}
|
||||||
return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
|
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inner classes
|
// Inner classes
|
||||||
|
@ -3135,19 +3139,17 @@ public class QueueImpl implements Queue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
Set<Consumer> consumersSet = getConsumers();
|
|
||||||
|
|
||||||
if (consumersSet.size() == 0) {
|
|
||||||
logger.debug("There are no consumers, no need to check slow consumer's rate");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
float queueRate = getRate();
|
float queueRate = getRate();
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
|
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queueRate < (threshold * consumersSet.size())) {
|
Set<Consumer> consumersSet = getConsumers();
|
||||||
|
|
||||||
|
if (consumersSet.size() == 0) {
|
||||||
|
logger.debug("There are no consumers, no need to check slow consumer's rate");
|
||||||
|
return;
|
||||||
|
} else if (queueRate < (threshold * consumersSet.size())) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
|
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,23 +18,8 @@ By default the server will not detect slow consumers. If slow consumer
|
||||||
detection is desired then see [queue attributes chapter](queue-attributes.md)
|
detection is desired then see [queue attributes chapter](queue-attributes.md)
|
||||||
for more details.
|
for more details.
|
||||||
|
|
||||||
The calculation to determine whether or not a consumer is slow inspects two notable
|
The calculation to determine whether or not a consumer is slow only
|
||||||
metrics:
|
inspects the number of messages a particular consumer has
|
||||||
|
|
||||||
1. The queue's message count.
|
|
||||||
|
|
||||||
2. The number of messages a consumer has acknowledged.
|
|
||||||
|
|
||||||
The queue's message count is inspected to ensure that the queue actually has had enough
|
|
||||||
messages to actually satisfy the consumer's threshold. For example, it would not be
|
|
||||||
fair to mark a consumer as "slow" if the queue received no messages. This is also notable
|
|
||||||
because in order to get an accurate message count the queue must be locked which can
|
|
||||||
negatively impact performance in high-throughput use-cases. Therefore slow-consumer
|
|
||||||
detection is only recommended on queues where it is absolutely necessary and in those
|
|
||||||
cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not
|
|
||||||
running so often as to negatively impact performance.
|
|
||||||
|
|
||||||
Finally, the algorithm inspects the number of messages a particular consumer has
|
|
||||||
*acknowledged*. It doesn't take into account whether or not flow control
|
*acknowledged*. It doesn't take into account whether or not flow control
|
||||||
has been enabled on the consumer, whether or not the consumer is
|
has been enabled on the consumer, whether or not the consumer is
|
||||||
streaming a large message, etc. Keep this in mind when configuring slow
|
streaming a large message, etc. Keep this in mind when configuring slow
|
||||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.TimeUtils;
|
import org.apache.activemq.artemis.utils.TimeUtils;
|
||||||
|
@ -244,45 +243,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSlowConsumerWithBurst() throws Exception {
|
|
||||||
ClientSessionFactory sf = createSessionFactory(locator);
|
|
||||||
|
|
||||||
ClientSession session = addClientSession(sf.createSession(true, true));
|
|
||||||
|
|
||||||
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
|
|
||||||
|
|
||||||
final int numMessages = 20;
|
|
||||||
|
|
||||||
for (int i = 0; i < numMessages; i++) {
|
|
||||||
producer.send(createTextMessage(session, "m" + i));
|
|
||||||
}
|
|
||||||
|
|
||||||
assertPaging();
|
|
||||||
|
|
||||||
final Queue queue = server.locateQueue(QUEUE);
|
|
||||||
|
|
||||||
queue.getRate();
|
|
||||||
|
|
||||||
logger.info("Creating consumer...");
|
|
||||||
|
|
||||||
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
|
|
||||||
session.start();
|
|
||||||
|
|
||||||
Wait.waitFor(consumer::isClosed, 3000, 100);
|
|
||||||
|
|
||||||
Assert.assertTrue(consumer.isClosed());
|
|
||||||
|
|
||||||
try {
|
|
||||||
consumer.receive(500);
|
|
||||||
fail("Consumer should have been killed since it's slow!");
|
|
||||||
} catch (ActiveMQObjectClosedException e) {
|
|
||||||
// ignore
|
|
||||||
} catch (Exception e) {
|
|
||||||
fail("Wrong exception thrown");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFastThenSlowConsumerSpared() throws Exception {
|
public void testFastThenSlowConsumerSpared() throws Exception {
|
||||||
locator.setAckBatchSize(0);
|
locator.setAckBatchSize(0);
|
||||||
|
|
Loading…
Reference in New Issue