diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 76c021983b..a772529dd3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3935,19 +3935,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void run() { float queueRate = getRate(); + long queueMessages = getMessageCount(); + if (logger.isDebugEnabled()) { - logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); + logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } if (consumers.size() == 0) { logger.debug("There are no consumers, no need to check slow consumer's rate"); return; - } else if (queueRate < (threshold * consumers.size())) { - if (logger.isDebugEnabled()) { - logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); + } else { + float queueThreshold = threshold * consumers.size(); + + if (queueRate < queueThreshold && queueMessages < queueThreshold) { + if (logger.isDebugEnabled()) { + logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); + } + return; } - return; } for (ConsumerHolder consumerHolder : consumers) { @@ -3955,11 +3961,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); - if (queueRate < threshold) { - if (logger.isDebugEnabled()) { - logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); - } - } else if (consumerRate < threshold) { + if (consumerRate < threshold) { RemotingConnection connection = null; ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); RemotingService remotingService = server.getRemotingService(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index 0e4319790d..8a83845bfa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -59,7 +59,8 @@ public class SlowConsumerTest extends ActiveMQTestBase { private static final Logger logger = Logger.getLogger(SlowConsumerTest.class); - int threshold = 10; + private int threshold = 10; + private long checkPeriod = 1; private boolean isNetty = false; private boolean isPaging = false; @@ -89,7 +90,7 @@ public class SlowConsumerTest extends ActiveMQTestBase { server = createServer(true, isNetty); AddressSettings addressSettings = new AddressSettings(); - addressSettings.setSlowConsumerCheckPeriod(1); + addressSettings.setSlowConsumerCheckPeriod(checkPeriod); addressSettings.setSlowConsumerThreshold(threshold); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); @@ -142,6 +143,67 @@ public class SlowConsumerTest extends ActiveMQTestBase { } } + @Test + public void testSlowConsumerKilledAfterBurst() throws Exception { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + assertPaging(); + + final int numMessages = 3 * threshold; + + for (int i = 0; i < numMessages; i++) { + producer.send(createTextMessage(session, "m" + i)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + for (int i = 0; i < threshold; i++) { + consumer.receiveImmediate().individualAcknowledge(); + } + + Thread.sleep(3 * checkPeriod * 1000); + + try { + consumer.receiveImmediate(); + fail(); + } catch (ActiveMQObjectClosedException e) { + assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED); + } + } + + @Test + public void testSlowConsumerSparedAfterBurst() throws Exception { + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + assertPaging(); + + final int numMessages = 3 * threshold + 1; + + for (int i = 0; i < numMessages; i++) { + producer.send(createTextMessage(session, "m" + i)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + + for (int i = 0; i < 3 * threshold; i++) { + consumer.receiveImmediate().individualAcknowledge(); + } + + Thread.sleep(3 * checkPeriod * 1000); + + assertNotNull(consumer.receiveImmediate()); + } + private void assertPaging() throws Exception { Queue queue = server.locateQueue(QUEUE); if (isPaging) {