NO-JIRA Adding a test to verify TX in OpenWire Large Message Handling

This commit is contained in:
Clebert Suconic 2023-02-08 09:27:29 -05:00
parent cfb585eaf6
commit 1caa406bbf
1 changed files with 83 additions and 0 deletions

View File

@ -25,7 +25,13 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -40,9 +46,13 @@ import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OpenWireLargeMessageTest extends BasicOpenWireTest {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public OpenWireLargeMessageTest() {
super();
}
@ -178,6 +188,79 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
}
@Test
public void testSendReceiveLargeMessageTX() throws Exception {
int NUMBER_OF_MESSAGES = 400;
int TX_SIZE = 100;
ExecutorService executorService = Executors.newFixedThreadPool(1);
runAfter(executorService::shutdownNow);
AtomicInteger errors = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
// Create 1MB Message
String largeString;
{
String randomString = "This is a random String " + RandomUtil.randomString();
StringBuffer largeBuffer = new StringBuffer();
while (largeBuffer.length() < 1024 * 1024) {
largeBuffer.append(randomString);
}
largeString = largeBuffer.toString();
}
executorService.execute(() -> {
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(lmAddress.toString());
MessageConsumer consumer = session.createConsumer(queue);
for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
TextMessage m = (TextMessage) consumer.receive(5000);
assertEquals(largeString, m.getText());
if (received > 0 && received % TX_SIZE == 0) {
logger.info("Received {} messages", received);
session.commit();
}
}
session.commit();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latch.countDown();
}
});
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(lmAddress.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
TextMessage message = session.createTextMessage(largeString);
producer.send(message);
if (sent > 0 && sent % TX_SIZE == 0) {
logger.info("Sent {} messages", sent);
session.commit();
}
}
session.commit();
}
latch.await(1, TimeUnit.MINUTES);
Assert.assertEquals(0, errors.get());
}
@Override
protected void extraServerConfig(Configuration serverConfig) {