ARTEMIS-5156 Making LargeMessageFrozenTest more reliable

This commit is contained in:
Clebert Suconic 2024-11-14 11:52:20 -05:00 committed by clebertsuconic
parent f3341a5337
commit 215cc5752f
1 changed files with 85 additions and 99 deletions

View File

@ -87,33 +87,13 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
public void testFreeze(String protocol) throws Exception {
startProxy();
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
int NUMBER_OF_MESSAGES = 10;
ConnectionFactory proxiedFactory = createProxiedFactory(protocol);
ConnectionFactory regularfactory = createRegularCF(protocol);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());
String body;
{
StringBuffer buffer = new StringBuffer();
@ -123,17 +103,28 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
body = buffer.toString();
}
int NUMBER_OF_MESSAGES = 10;
try (Connection connection = regularfactory.createConnection()) {
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
}
session.commit();
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
}
session.commit();
Connection connection = proxiedFactory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
boolean failed = false;
for (int repeat = 0; repeat < 5; repeat++) {
@ -152,7 +143,7 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
assertTrue(failed);
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure")));
connection = factory.createConnection();
connection = proxiedFactory.createConnection();
connection.start();
runAfter(connection::close);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
@ -167,7 +158,6 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
}
Wait.assertEquals(0, () -> {
System.gc();
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}
@ -189,26 +179,12 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
public void testRemoveConsumer(String protocol) throws Exception {
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:44444?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:44444?amqp.idleTimeout=300&jms.prefetchPolicy.all=10");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:44444");
}
int NUMBER_OF_MESSAGES = 10;
ConnectionFactory regularCF = createRegularCF(protocol);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Connection connection = factory.createConnection();
Connection connection = regularCF.createConnection();
runAfter(connection::close);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getName());
@ -217,13 +193,11 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 300 * 1024) {
buffer.append("Not so big, but big!!");
buffer.append("BLA BLA BLA... BLAH BLAH BLAH ... ");
}
body = buffer.toString();
}
int NUMBER_OF_MESSAGES = 10;
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage(body));
@ -256,7 +230,7 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
}
server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure")));
connection = factory.createConnection();
connection = regularCF.createConnection();
runAfter(connection::close);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
@ -294,33 +268,11 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
public void testFreezeAutoAck(String protocol) throws Exception {
startProxy();
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
ConnectionFactory proxiedFactory = createProxiedFactory(protocol);
ConnectionFactory regularCF = createRegularCF(protocol);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Connection connection = factory.createConnection();
runAfter(connection::close);
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());
assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());
String body;
{
StringBuffer buffer = new StringBuffer();
@ -330,22 +282,31 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
body = buffer.toString();
}
int NUMBER_OF_MESSAGES = 40;
try (Connection connection = regularCF.createConnection()) {
runAfter(connection::close);
try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
MessageProducer producer = sessionProducer.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(sessionConsumer.createTextMessage(body));
int NUMBER_OF_MESSAGES = 40;
try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = sessionProducer.createQueue(getName());
MessageProducer producer = sessionProducer.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(sessionProducer.createTextMessage(body));
}
sessionProducer.commit();
}
sessionProducer.commit();
}
MessageConsumer consumer = sessionConsumer.createConsumer(queue);
connection.start();
boolean failed = false;
try (Connection connection = proxiedFactory.createConnection()) {
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());
MessageConsumer consumer = sessionConsumer.createConsumer(queue);
connection.start();
assertEquals(1, proxy.getInboundHandlers().size());
assertEquals(1, proxy.getOutbounddHandlers().size());
try {
for (int i = 0; i < 10; i++) {
consumer.receive(5000);
}
@ -363,21 +324,22 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
assertTrue(failed);
connection = factory.createConnection();
connection.start();
runAfter(connection::close);
sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = sessionConsumer.createQueue(getName());
consumer = sessionConsumer.createConsumer(queue);
try (Connection connection = regularCF.createConnection()) {
connection.start();
runAfter(connection::close);
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionConsumer.createQueue(getName());
MessageConsumer consumer = sessionConsumer.createConsumer(queue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
assertEquals(body, message.getText());
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
assertEquals(body, message.getText());
}
assertNull(consumer.receiveNoWait());
}
assertNull(consumer.receiveNoWait());
assertEquals(0L, serverQueue.getMessageCount());
Wait.assertEquals(0, () -> {
@ -385,4 +347,28 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase {
return server.getConfiguration().getLargeMessagesLocation().listFiles().length;
});
}
private static ConnectionFactory createRegularCF(String protocol) {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
}
private static ConnectionFactory createProxiedFactory(String protocol) {
ConnectionFactory factory;
switch (protocol.toUpperCase(Locale.ROOT)) {
case "CORE":
ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod());
assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL());
assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize());
factory = artemisfactory;
break;
case "AMQP":
JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2");
factory = qpidFactory;
break;
default:
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333");
}
return factory;
}
}