This closes #960

This commit is contained in:
Clebert Suconic 2017-01-16 11:37:12 -05:00
commit 40c86b2eed
2 changed files with 230 additions and 2 deletions

View File

@ -3132,7 +3132,20 @@ public class QueueImpl implements Queue {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
} }
for (Consumer consumer : getConsumers()) {
Set<Consumer> consumersSet = getConsumers();
if (consumersSet.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
} else if (queueRate < (threshold * consumersSet.size())) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}
return;
}
for (Consumer consumer : consumersSet) {
if (consumer instanceof ServerConsumerImpl) { if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate(); float consumerRate = serverConsumer.getRate();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -42,7 +43,10 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -52,6 +56,9 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class SlowConsumerTest extends ActiveMQTestBase { public class SlowConsumerTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
int threshold = 10;
private boolean isNetty = false; private boolean isNetty = false;
private boolean isPaging = false; private boolean isPaging = false;
@ -82,7 +89,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings(); AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(1); addressSettings.setSlowConsumerCheckPeriod(1);
addressSettings.setSlowConsumerThreshold(10); addressSettings.setSlowConsumerThreshold(threshold);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
if (isPaging) { if (isPaging) {
@ -347,4 +354,212 @@ public class SlowConsumerTest extends ActiveMQTestBase {
assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED); assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
} }
} }
/**
* This test creates 3 consumers on one queue. A producer sends
* messages at a rate of 2 mesages per second. Each consumer
* consumes messages at rate of 1 message per second. The slow
* consumer threshold is 1 message per second.
* Based on the above settings, at least one of the consumers
* will be removed during the test, but at least one of the
* consumers will remain and all messages will be received.
*/
@Test
public void testMultipleConsumersOneQueue() throws Exception {
locator.setAckBatchSize(0);
Queue queue = server.locateQueue(QUEUE);
ClientSessionFactory sf1 = createSessionFactory(locator);
ClientSessionFactory sf2 = createSessionFactory(locator);
ClientSessionFactory sf3 = createSessionFactory(locator);
ClientSessionFactory sf4 = createSessionFactory(locator);
final int messages = 10 * threshold;
FixedRateProducer producer = new FixedRateProducer(threshold * 2, sf1, QUEUE, messages);
final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf2, QUEUE, 1));
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf3, QUEUE, 2));
consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf4, QUEUE, 3));
try {
producer.start();
for (FixedRateConsumer consumer : consumers) {
consumer.start();
}
producer.join(10000);
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
Assert.assertEquals(3, queue.getConsumerCount());
} finally {
producer.stopRunning();
Assert.assertFalse(producer.failed);
for (FixedRateConsumer consumer : consumers) {
consumer.stopRunning();
Assert.assertFalse(consumer.failed);
}
logger.debug("***report messages received: " + receivedMessages.size());
logger.debug("***consumers left: " + consumers.size());
}
}
private class FixedRateProducer extends FixedRateClient {
int messages;
ClientProducer producer;
FixedRateProducer(int rate, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
super(sf, queue, rate);
this.messages = messages;
}
@Override
protected void prepareWork() throws ActiveMQException {
super.prepareWork();
this.producer = session.createProducer(queue);
}
@Override
protected void doWork(int count) throws Exception {
if (count < messages) {
ClientMessage m = createTextMessage(session, "msg" + count);
producer.send(m);
logger.debug("producer sent a message " + count);
} else {
this.working = false;
}
}
@Override
public String toString() {
return "Producer";
}
}
private class FixedRateConsumer extends FixedRateClient {
Set<FixedRateConsumer> consumers;
ClientConsumer consumer;
final Set<ClientMessage> receivedMessages;
int id;
FixedRateConsumer(int rate,
Set<ClientMessage> receivedMessages,
ClientSessionFactory sf,
SimpleString queue,
int id) throws ActiveMQException {
super(sf, queue, rate);
this.id = id;
this.receivedMessages = receivedMessages;
}
@Override
protected void prepareWork() throws ActiveMQException {
super.prepareWork();
this.consumer = session.createConsumer(queue);
this.session.start();
}
@Override
protected void doWork(int count) throws Exception {
ClientMessage m = this.consumer.receive(1000);
logger.debug("consumer " + id + " got m: " + m);
if (m != null) {
receivedMessages.add(m);
m.acknowledge();
logger.debug(" consumer " + id + " acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
}
}
@Override
protected void handleError(int count, Exception e) {
failed = true;
System.err.println("Got error receiving message " + count + " remove self " + this.id);
e.printStackTrace();
}
@Override
public String toString() {
return "Consumer " + id;
}
}
private abstract class FixedRateClient extends Thread {
protected ClientSessionFactory sf;
protected SimpleString queue;
protected ClientSession session;
protected final int sleepTime;
protected volatile boolean working;
boolean failed;
FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate) throws ActiveMQException {
this.sf = sf;
this.queue = queue;
this.sleepTime = 1000 / rate;
}
protected void prepareWork() throws ActiveMQException {
this.session = addClientSession(sf.createSession(true, true));
}
@Override
public void run() {
working = true;
try {
prepareWork();
} catch (ActiveMQException e) {
logger.debug("got error in prepareWork(), aborting...");
e.printStackTrace();
return;
}
int count = 0;
while (working) {
try {
doWork(count);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// expected, nothing to be done
} catch (Exception e) {
failed = true;
handleError(count, e);
working = false;
} finally {
count++;
}
}
}
protected abstract void doWork(int count) throws Exception;
protected void handleError(int count, Exception e) {
}
public void stopRunning() {
working = false;
try {
session.close();
this.interrupt();
join(5000);
if (isAlive()) {
fail("Interrupt is not working on Working Thread");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
handleError(0, e);
}
}
}
} }