ARTEMIS-3283 Fixing Slow Consuemr Test and speeding it up

I am removing some combinations that are not needed here,
and I'm adding another extra test to valiate the rate per minutes
This commit is contained in:
Clebert Suconic 2021-05-25 11:07:17 -04:00
parent 9d98babe2d
commit f6df6083e3
2 changed files with 270 additions and 78 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.settings.impl;
import java.util.concurrent.TimeUnit;
public enum SlowConsumerThresholdMeasurementUnit {
MESSAGES_PER_SECOND(1), MESSAGES_PER_MINUTE(60), MESSAGES_PER_HOUR(3600), MESSAGES_PER_DAY(3600 * 24);
@ -40,6 +42,21 @@ public enum SlowConsumerThresholdMeasurementUnit {
}
}
public static TimeUnit unitOf(int measurementUnitInSeconds) {
switch (measurementUnitInSeconds) {
case 1:
return TimeUnit.SECONDS;
case 60:
return TimeUnit.MINUTES;
case 3600:
return TimeUnit.HOURS;
case 3600 * 24:
return TimeUnit.DAYS;
default:
return null;
}
}
public int getValue() {
return measurementUnitInSeconds;
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -46,55 +44,25 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_DAY;
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_MINUTE;
import static org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
@RunWith(value = Parameterized.class)
public class SlowConsumerTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
private int threshold;
private float consumerRate; // The rate actual consumers will run in the test.
private SlowConsumerThresholdMeasurementUnit unit;
private int threshold = 10;
private long checkPeriod = 1;
private boolean isNetty = false;
private boolean isPaging = false;
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "netty={0}, paging={1}, threshold={2}, threshold_units={3}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{
{true, false, 10, MESSAGES_PER_SECOND},
{false, false, 10, MESSAGES_PER_SECOND},
{true, true, 10, MESSAGES_PER_SECOND},
{false, true, 10, MESSAGES_PER_SECOND},
{true, false, 1, MESSAGES_PER_MINUTE},
{false, false, 1, MESSAGES_PER_MINUTE},
{true, true, 1, MESSAGES_PER_MINUTE},
{false, true, 1, MESSAGES_PER_MINUTE}
});
}
public SlowConsumerTest(boolean isNetty, boolean isPaging, int threshold, SlowConsumerThresholdMeasurementUnit unit) {
this.threshold = threshold;
this.unit = unit;
this.consumerRate = threshold / unit.getValue();
this.isNetty = isNetty;
this.isPaging = isPaging;
}
private boolean isNetty = true;
private ActiveMQServer server;
@ -112,19 +80,12 @@ public class SlowConsumerTest extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
addressSettings.setSlowConsumerThreshold(threshold);
addressSettings.setSlowConsumerThresholdMeasurementUnit(unit);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
if (isPaging) {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(10 * 1024);
addressSettings.setPageSizeBytes(1024);
} else {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
addressSettings.setPageSizeBytes(1024);
}
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(10 * 1024);
addressSettings.setPageSizeBytes(1024);
server.start();
@ -143,8 +104,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
assertPaging();
final int numMessages = 25;
for (int i = 0; i < numMessages; i++) {
@ -172,8 +131,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
assertPaging();
final int numMessages = 3 * threshold;
for (int i = 0; i < numMessages; i++) {
@ -205,8 +162,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
assertPaging();
final int numMessages = 3 * threshold + 1;
for (int i = 0; i < numMessages; i++) {
@ -225,15 +180,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
assertNotNull(consumer.receiveImmediate());
}
private void assertPaging() throws Exception {
Queue queue = server.locateQueue(QUEUE);
if (isPaging) {
Assert.assertTrue(queue.getPageSubscription().isPaging());
} else {
Assert.assertFalse(queue.getPageSubscription().isPaging());
}
}
@Test
public void testSlowConsumerNotification() throws Exception {
@ -245,15 +191,10 @@ public class SlowConsumerTest extends ActiveMQTestBase {
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThreshold(10);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
if (!isPaging) {
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(-1);
}
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
assertPaging();
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
@ -314,8 +255,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
producer.send(createTextMessage(session, "m" + i));
}
assertPaging();
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
@ -375,8 +314,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
t.start();
assertPaging();
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
@ -440,6 +377,241 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
}
@Test
public void testOneMinuteKilledInVM() throws Exception {
testMinuteKilled(false);
}
@Test
public void testOneMinuteKilled() throws Exception {
testMinuteKilled(true);
}
private void testMinuteKilled(boolean netty) throws Exception {
locator.close();
locator = createFactory(netty);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_MINUTE);
addressSettings.setSlowConsumerThreshold(60);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
int messages = 200;
for (int i = 0; i < messages; i++) {
producer.send(session.createMessage(true));
}
session.commit();
ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
FixedRateConsumer consumer = new FixedRateConsumer(40, MESSAGES_PER_MINUTE, receivedMessages, sf, QUEUE, 0);
consumer.start();
Queue queue = server.locateQueue(QUEUE);
Wait.assertEquals(1, queue::getConsumerCount);
try {
Wait.assertEquals(0, queue::getConsumerCount);
} finally {
consumer.stopRunning();
}
}
@Test
public void testDaysKilled() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_DAY);
addressSettings.setSlowConsumerThreshold(TimeUnit.DAYS.toSeconds(1)); // one mesasge per second
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
int messages = 200;
for (int i = 0; i < messages; i++) {
producer.send(session.createMessage(true));
}
session.commit();
ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
FixedRateConsumer consumer = new FixedRateConsumer(30, MESSAGES_PER_MINUTE, receivedMessages, sf, QUEUE, 0);
consumer.start();
Queue queue = server.locateQueue(QUEUE);
Wait.assertEquals(1, queue::getConsumerCount);
try {
Wait.assertEquals(0, queue::getConsumerCount);
} finally {
consumer.stopRunning();
}
}
@Test
public void testDaysKilledPaging() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_DAY);
addressSettings.setSlowConsumerThreshold(TimeUnit.DAYS.toSeconds(1)); // one mesasge per second
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(10 * 1024);
addressSettings.setPageSizeBytes(1024);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
int messages = 200;
Queue queue = server.locateQueue(QUEUE);
queue.getPagingStore().startPaging();
Assert.assertTrue(queue.getPagingStore().isPaging());
for (int i = 0; i < messages; i++) {
producer.send(session.createMessage(true));
}
session.commit();
ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
FixedRateConsumer consumer = new FixedRateConsumer(30, MESSAGES_PER_MINUTE, receivedMessages, sf, QUEUE, 0);
consumer.start();
Wait.assertEquals(1, queue::getConsumerCount);
try {
Wait.assertEquals(0, queue::getConsumerCount);
} finally {
consumer.stopRunning();
}
}
@Test
public void testDaysSurviving() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_DAY);
addressSettings.setSlowConsumerThreshold(TimeUnit.DAYS.toSeconds(1)); // one mesasge per second
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
int messages = 10;
for (int i = 0; i < messages; i++) {
producer.send(session.createMessage(true));
}
session.commit();
ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
FixedRateConsumer consumer = new FixedRateConsumer(70, MESSAGES_PER_MINUTE, receivedMessages, sf, QUEUE, 0);
consumer.start();
Queue queue = server.locateQueue(QUEUE);
Wait.assertEquals(1, queue::getConsumerCount);
try {
Wait.assertEquals(messages, queue::getMessagesAcknowledged);
Wait.assertEquals(messages, () -> receivedMessages.size());
} finally {
consumer.stopRunning();
}
Wait.assertEquals(0, queue::getConsumerCount);
}
@Test
public void testMinuteSurviving() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(2);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_MINUTE);
addressSettings.setSlowConsumerThreshold(60);
//addressSettings.sets
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
int messages = 10;
for (int i = 0; i < messages; i++) {
producer.send(session.createMessage(true));
}
session.commit();
ConcurrentHashSet<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
FixedRateConsumer consumer = new FixedRateConsumer(80, MESSAGES_PER_MINUTE, receivedMessages, sf, QUEUE, 0);
consumer.start();
Queue queue = server.locateQueue(QUEUE);
try {
Wait.assertEquals(messages, queue::getMessagesAcknowledged);
Assert.assertEquals(1, queue.getConsumerCount());
Wait.assertEquals(messages, () -> receivedMessages.size());
} finally {
consumer.stopRunning();
}
Wait.assertEquals(0, queue::getConsumerCount);
}
/**
* This test creates 3 consumers on one queue. A producer sends
* messages at a rate of 2 messages per second. Each consumer
@ -462,14 +634,14 @@ public class SlowConsumerTest extends ActiveMQTestBase {
final int messages = 10 * threshold;
FixedRateProducer producer = new FixedRateProducer(threshold * 2, unit, sf1, QUEUE, messages);
FixedRateProducer producer = new FixedRateProducer(threshold * 2, MESSAGES_PER_SECOND, sf1, QUEUE, messages);
final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf2, QUEUE, 1));
consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf3, QUEUE, 2));
consumers.add(new FixedRateConsumer(threshold, unit, receivedMessages, sf4, QUEUE, 3));
consumers.add(new FixedRateConsumer(threshold, MESSAGES_PER_SECOND, receivedMessages, sf2, QUEUE, 1));
consumers.add(new FixedRateConsumer(threshold, MESSAGES_PER_SECOND, receivedMessages, sf3, QUEUE, 2));
consumers.add(new FixedRateConsumer(threshold, MESSAGES_PER_SECOND, receivedMessages, sf4, QUEUE, 3));
try {
producer.start();
@ -480,9 +652,9 @@ public class SlowConsumerTest extends ActiveMQTestBase {
producer.join(10000);
assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
Assert.assertEquals(3, queue.getConsumerCount());
Wait.assertEquals(messages, receivedMessages::size);
} finally {
producer.stopRunning();
@ -562,6 +734,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
if (m != null) {
receivedMessages.add(m);
m.acknowledge();
session.commit();
logger.debug(" consumer " + id + " acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
}
}
@ -592,7 +765,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate, SlowConsumerThresholdMeasurementUnit unit) throws ActiveMQException {
this.sf = sf;
this.queue = queue;
this.sleepTime = (int) (1000f / (unit.getValue() / rate));
this.sleepTime = (int) (SlowConsumerThresholdMeasurementUnit.unitOf(unit.getValue()).toMillis(1) / rate);
logger.debug(this.getClass() + " has sleepTime = " + sleepTime + " which is " + TimeUnit.MILLISECONDS.toSeconds(sleepTime) + " seconds");
}
protected void prepareWork() throws ActiveMQException {
@ -613,6 +787,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
while (working) {
try {
doWork(count);
logger.debug(this.getClass().getName() + " sleeping " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// expected, nothing to be done