NO-JIRA Adding Large Message into MessagesExpiredPagingTest

This commit is contained in:
Clebert Suconic 2021-08-24 17:22:52 -04:00
parent e05221eba4
commit edf688e706
1 changed files with 38 additions and 17 deletions

View File

@ -18,10 +18,10 @@ package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -116,30 +116,48 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase {
} }
} }
@Test
public void testSendReceiveCORELarge() throws Exception {
testSendReceive("CORE", 50, 20, 10, 500 * 1024);
}
@Test @Test
public void testSendReceiveCORE() throws Exception { public void testSendReceiveCORE() throws Exception {
testSendReceive("CORE"); testSendReceive("CORE", 5000, 1000, 100, 0);
} }
@Test @Test
public void testSendReceiveAMQP() throws Exception { public void testSendReceiveAMQP() throws Exception {
testSendReceive("AMQP"); testSendReceive("AMQP", 5000, 1000, 100, 0);
}
@Test
public void testSendReceiveAMQPLarge() throws Exception {
testSendReceive("AMQP", 50, 20, 10, 500 * 1024);
} }
@Test @Test
public void testSendReceiveOpenWire() throws Exception { public void testSendReceiveOpenWire() throws Exception {
testSendReceive("OPENWIRE"); testSendReceive("OPENWIRE", 5000, 1000, 100, 0);
} }
public void testSendReceive(String protocol) throws Exception { public void testSendReceive(String protocol, int numberOfMessages, int numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
String extraBody;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
extraBody = buffer.toString();
}
Consumer[] consumers = new Consumer[NUMBER_OF_QUEUES]; Consumer[] consumers = new Consumer[NUMBER_OF_QUEUES];
for (int i = 0; i < NUMBER_OF_QUEUES; i++) { for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
consumers[i] = new Consumer(factory, "q" + i); consumers[i] = new Consumer(factory, "q" + i, bodySize);
consumers[i].start(); consumers[i].start();
} }
@ -149,7 +167,7 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase {
MessageProducer producer = session.createProducer(jmsTopic); MessageProducer producer = session.createProducer(jmsTopic);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("hello")); producer.send(session.createTextMessage("hello" + extraBody));
} }
// just validating basic queue consumption working // just validating basic queue consumption working
@ -161,31 +179,31 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase {
c.consumedDelta.set(0); c.consumedDelta.set(0);
} }
producer.setTimeToLive(10); producer.setTimeToLive(10);
for (int i = 0; i < 5000; i++) { for (int i = 0; i < numberOfMessages; i++) {
if (i > 0 && i % 100 == 0) { if (i > 0 && i % pagingInterval == 0) {
for (Consumer c : consumers) { for (Consumer c : consumers) {
Wait.waitFor(() -> !queues[0].getPagingStore().isPaging(), 1000, 100, false); Wait.waitFor(() -> !queues[0].getPagingStore().isPaging(), 1000, 100, false);
producer.setTimeToLive(TimeUnit.HOURS.toMillis(1)); producer.setTimeToLive(TimeUnit.HOURS.toMillis(1));
producer.send(session.createTextMessage("hello")); producer.send(session.createTextMessage("hello" + extraBody));
Wait.assertTrue(() -> c.consumedDelta.get() > 0); Wait.assertTrue(() -> c.consumedDelta.get() > 0);
producer.setTimeToLive(10); producer.setTimeToLive(10);
c.consumedDelta.set(0); c.consumedDelta.set(0);
} }
queues[0].getPagingStore().forceAnotherPage(); queues[0].getPagingStore().forceAnotherPage();
} }
producer.send(session.createTextMessage("hello")); producer.send(session.createTextMessage("hello" + extraBody));
} }
producer.setTimeToLive(300); producer.setTimeToLive(300);
for (int i = 0; i < 5000; i++) { for (int i = 0; i < numberOfMessageSecondWave; i++) {
if (i > 0 && i % 100 == 0) { if (i > 0 && i % pagingInterval == 0) {
queues[0].getPagingStore().forceAnotherPage(); queues[0].getPagingStore().forceAnotherPage();
} }
producer.send(session.createTextMessage("hello")); producer.send(session.createTextMessage("hello" + extraBody));
} }
producer.setTimeToLive(TimeUnit.HOURS.toMillis(1)); producer.setTimeToLive(TimeUnit.HOURS.toMillis(1));
producer.send(session.createTextMessage("hello again")); // something not expiring producer.send(session.createTextMessage("hello again" + extraBody)); // something not expiring
for (Consumer c : consumers) { for (Consumer c : consumers) {
Wait.assertTrue(() -> c.consumedDelta.get() > 0); Wait.assertTrue(() -> c.consumedDelta.get() > 0);
@ -206,15 +224,17 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase {
private class Consumer extends Thread { private class Consumer extends Thread {
final ConnectionFactory factory; final ConnectionFactory factory;
final int minimalSize;
AtomicInteger consumedDelta = new AtomicInteger(0); AtomicInteger consumedDelta = new AtomicInteger(0);
AtomicInteger consumed = new AtomicInteger(0); AtomicInteger consumed = new AtomicInteger(0);
AtomicInteger errors = new AtomicInteger(0); AtomicInteger errors = new AtomicInteger(0);
final String queueName; final String queueName;
Consumer(ConnectionFactory factory, String queueName) { Consumer(ConnectionFactory factory, String queueName, int minimalSize) {
this.factory = factory; this.factory = factory;
this.queueName = queueName; this.queueName = queueName;
this.minimalSize = minimalSize;
} }
@Override @Override
@ -227,8 +247,9 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase {
MessageConsumer consumer = session.createConsumer(jmsQueue); MessageConsumer consumer = session.createConsumer(jmsQueue);
connection.start(); connection.start();
while (running.get()) { while (running.get()) {
Message message = consumer.receive(500); TextMessage message = (TextMessage)consumer.receive(500);
if (message != null) { if (message != null) {
Assert.assertTrue(message.getText().length() > minimalSize);
consumed.incrementAndGet(); consumed.incrementAndGet();
consumedDelta.incrementAndGet(); consumedDelta.incrementAndGet();
Thread.sleep(2); Thread.sleep(2);