From edf688e706cdafcb13f09e446a48b3f35a8f9236 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 24 Aug 2021 17:22:52 -0400 Subject: [PATCH] NO-JIRA Adding Large Message into MessagesExpiredPagingTest --- .../paging/MessagesExpiredPagingTest.java | 55 +++++++++++++------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java index 80fd501f48..db12264a8c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.tests.integration.paging; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -116,30 +116,48 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase { } } + @Test + public void testSendReceiveCORELarge() throws Exception { + testSendReceive("CORE", 50, 20, 10, 500 * 1024); + } @Test public void testSendReceiveCORE() throws Exception { - testSendReceive("CORE"); + testSendReceive("CORE", 5000, 1000, 100, 0); } @Test public void testSendReceiveAMQP() throws Exception { - testSendReceive("AMQP"); + testSendReceive("AMQP", 5000, 1000, 100, 0); + } + + @Test + public void testSendReceiveAMQPLarge() throws Exception { + testSendReceive("AMQP", 50, 20, 10, 500 * 1024); } @Test public void testSendReceiveOpenWire() throws Exception { - testSendReceive("OPENWIRE"); + testSendReceive("OPENWIRE", 5000, 1000, 100, 0); } - public void testSendReceive(String protocol) throws Exception { + public void testSendReceive(String protocol, int numberOfMessages, int numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception { ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + String extraBody; + { + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < bodySize; i++) { + buffer.append("*"); + } + extraBody = buffer.toString(); + } + Consumer[] consumers = new Consumer[NUMBER_OF_QUEUES]; for (int i = 0; i < NUMBER_OF_QUEUES; i++) { - consumers[i] = new Consumer(factory, "q" + i); + consumers[i] = new Consumer(factory, "q" + i, bodySize); consumers[i].start(); } @@ -149,7 +167,7 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase { MessageProducer producer = session.createProducer(jmsTopic); for (int i = 0; i < 10; i++) { - producer.send(session.createTextMessage("hello")); + producer.send(session.createTextMessage("hello" + extraBody)); } // just validating basic queue consumption working @@ -161,31 +179,31 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase { c.consumedDelta.set(0); } producer.setTimeToLive(10); - for (int i = 0; i < 5000; i++) { - if (i > 0 && i % 100 == 0) { + for (int i = 0; i < numberOfMessages; i++) { + if (i > 0 && i % pagingInterval == 0) { for (Consumer c : consumers) { Wait.waitFor(() -> !queues[0].getPagingStore().isPaging(), 1000, 100, false); producer.setTimeToLive(TimeUnit.HOURS.toMillis(1)); - producer.send(session.createTextMessage("hello")); + producer.send(session.createTextMessage("hello" + extraBody)); Wait.assertTrue(() -> c.consumedDelta.get() > 0); producer.setTimeToLive(10); c.consumedDelta.set(0); } queues[0].getPagingStore().forceAnotherPage(); } - producer.send(session.createTextMessage("hello")); + producer.send(session.createTextMessage("hello" + extraBody)); } producer.setTimeToLive(300); - for (int i = 0; i < 5000; i++) { - if (i > 0 && i % 100 == 0) { + for (int i = 0; i < numberOfMessageSecondWave; i++) { + if (i > 0 && i % pagingInterval == 0) { queues[0].getPagingStore().forceAnotherPage(); } - producer.send(session.createTextMessage("hello")); + producer.send(session.createTextMessage("hello" + extraBody)); } producer.setTimeToLive(TimeUnit.HOURS.toMillis(1)); - producer.send(session.createTextMessage("hello again")); // something not expiring + producer.send(session.createTextMessage("hello again" + extraBody)); // something not expiring for (Consumer c : consumers) { Wait.assertTrue(() -> c.consumedDelta.get() > 0); @@ -206,15 +224,17 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase { private class Consumer extends Thread { final ConnectionFactory factory; + final int minimalSize; AtomicInteger consumedDelta = new AtomicInteger(0); AtomicInteger consumed = new AtomicInteger(0); AtomicInteger errors = new AtomicInteger(0); final String queueName; - Consumer(ConnectionFactory factory, String queueName) { + Consumer(ConnectionFactory factory, String queueName, int minimalSize) { this.factory = factory; this.queueName = queueName; + this.minimalSize = minimalSize; } @Override @@ -227,8 +247,9 @@ public class MessagesExpiredPagingTest extends ActiveMQTestBase { MessageConsumer consumer = session.createConsumer(jmsQueue); connection.start(); while (running.get()) { - Message message = consumer.receive(500); + TextMessage message = (TextMessage)consumer.receive(500); if (message != null) { + Assert.assertTrue(message.getText().length() > minimalSize); consumed.incrementAndGet(); consumedDelta.incrementAndGet(); Thread.sleep(2);