This commit is contained in:
Clebert Suconic 2019-05-31 12:21:29 -04:00
commit f0c023eec1
2 changed files with 76 additions and 12 deletions

View File

@ -3935,31 +3935,33 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void run() {
float queueRate = getRate();
long queueMessages = getMessageCount();
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
if (consumers.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
} else if (queueRate < (threshold * consumers.size())) {
} else {
float queueThreshold = threshold * consumers.size();
if (queueRate < queueThreshold && queueMessages < queueThreshold) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}
return;
}
}
for (ConsumerHolder consumerHolder : consumers) {
Consumer consumer = consumerHolder.consumer();
if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate();
if (queueRate < threshold) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}
} else if (consumerRate < threshold) {
if (consumerRate < threshold) {
RemotingConnection connection = null;
ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
RemotingService remotingService = server.getRemotingService();

View File

@ -59,7 +59,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
int threshold = 10;
private int threshold = 10;
private long checkPeriod = 1;
private boolean isNetty = false;
private boolean isPaging = false;
@ -89,7 +90,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
server = createServer(true, isNetty);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(1);
addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
addressSettings.setSlowConsumerThreshold(threshold);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
@ -142,6 +143,67 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
}
@Test
public void testSlowConsumerKilledAfterBurst() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
assertPaging();
final int numMessages = 3 * threshold;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
for (int i = 0; i < threshold; i++) {
consumer.receiveImmediate().individualAcknowledge();
}
Thread.sleep(3 * checkPeriod * 1000);
try {
consumer.receiveImmediate();
fail();
} catch (ActiveMQObjectClosedException e) {
assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
}
}
@Test
public void testSlowConsumerSparedAfterBurst() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
assertPaging();
final int numMessages = 3 * threshold + 1;
for (int i = 0; i < numMessages; i++) {
producer.send(createTextMessage(session, "m" + i));
}
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
for (int i = 0; i < 3 * threshold; i++) {
consumer.receiveImmediate().individualAcknowledge();
}
Thread.sleep(3 * checkPeriod * 1000);
assertNotNull(consumer.receiveImmediate());
}
private void assertPaging() throws Exception {
Queue queue = server.locateQueue(QUEUE);
if (isPaging) {