diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageFrozenTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageFrozenTest.java index b2de198560..72f2491a5f 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageFrozenTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageFrozenTest.java @@ -87,33 +87,13 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { public void testFreeze(String protocol) throws Exception { startProxy(); - ConnectionFactory factory; - switch (protocol.toUpperCase(Locale.ROOT)) { - case "CORE": - ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000"); - assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod()); - assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL()); - assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize()); - factory = artemisfactory; - break; - case "AMQP": - JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2"); - factory = qpidFactory; - break; - default: - factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333"); - } + int NUMBER_OF_MESSAGES = 10; + + ConnectionFactory proxiedFactory = createProxiedFactory(protocol); + ConnectionFactory regularfactory = createRegularCF(protocol); org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true)); - Connection connection = factory.createConnection(); - runAfter(connection::close); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue(getName()); - - assertEquals(1, proxy.getInboundHandlers().size()); - assertEquals(1, proxy.getOutbounddHandlers().size()); - String body; { StringBuffer buffer = new StringBuffer(); @@ -123,17 +103,28 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { body = buffer.toString(); } - int NUMBER_OF_MESSAGES = 10; + try (Connection connection = regularfactory.createConnection()) { + runAfter(connection::close); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getName()); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage(body)); + } + session.commit(); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { - producer.send(session.createTextMessage(body)); } - session.commit(); + + Connection connection = proxiedFactory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getName()); + + assertEquals(1, proxy.getInboundHandlers().size()); + assertEquals(1, proxy.getOutbounddHandlers().size()); MessageConsumer consumer = session.createConsumer(queue); connection.start(); - boolean failed = false; for (int repeat = 0; repeat < 5; repeat++) { @@ -152,7 +143,7 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { assertTrue(failed); server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure"))); - connection = factory.createConnection(); + connection = proxiedFactory.createConnection(); connection.start(); runAfter(connection::close); session = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -167,7 +158,6 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { } Wait.assertEquals(0, () -> { - System.gc(); return server.getConfiguration().getLargeMessagesLocation().listFiles().length; }); } @@ -189,26 +179,12 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { public void testRemoveConsumer(String protocol) throws Exception { - ConnectionFactory factory; - switch (protocol.toUpperCase(Locale.ROOT)) { - case "CORE": - ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:44444?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000"); - assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod()); - assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL()); - assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize()); - factory = artemisfactory; - break; - case "AMQP": - JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:44444?amqp.idleTimeout=300&jms.prefetchPolicy.all=10"); - factory = qpidFactory; - break; - default: - factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:44444"); - } + int NUMBER_OF_MESSAGES = 10; + ConnectionFactory regularCF = createRegularCF(protocol); org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true)); - Connection connection = factory.createConnection(); + Connection connection = regularCF.createConnection(); runAfter(connection::close); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(getName()); @@ -217,13 +193,11 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { { StringBuffer buffer = new StringBuffer(); while (buffer.length() < 300 * 1024) { - buffer.append("Not so big, but big!!"); + buffer.append("BLA BLA BLA... BLAH BLAH BLAH ... "); } body = buffer.toString(); } - int NUMBER_OF_MESSAGES = 10; - MessageProducer producer = session.createProducer(queue); for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { producer.send(session.createTextMessage(body)); @@ -256,7 +230,7 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { } server.getRemotingService().getConnections().forEach(r -> r.fail(new ActiveMQException("forced failure"))); - connection = factory.createConnection(); + connection = regularCF.createConnection(); runAfter(connection::close); session = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -294,33 +268,11 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { public void testFreezeAutoAck(String protocol) throws Exception { startProxy(); - ConnectionFactory factory; - switch (protocol.toUpperCase(Locale.ROOT)) { - case "CORE": - ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000"); - assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod()); - assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL()); - assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize()); - factory = artemisfactory; - break; - case "AMQP": - JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2"); - factory = qpidFactory; - break; - default: - factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333"); - } + ConnectionFactory proxiedFactory = createProxiedFactory(protocol); + ConnectionFactory regularCF = createRegularCF(protocol); org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true)); - Connection connection = factory.createConnection(); - runAfter(connection::close); - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = sessionConsumer.createQueue(getName()); - - assertEquals(1, proxy.getInboundHandlers().size()); - assertEquals(1, proxy.getOutbounddHandlers().size()); - String body; { StringBuffer buffer = new StringBuffer(); @@ -330,22 +282,31 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { body = buffer.toString(); } - int NUMBER_OF_MESSAGES = 40; + try (Connection connection = regularCF.createConnection()) { + runAfter(connection::close); - try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) { - MessageProducer producer = sessionProducer.createProducer(queue); - for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { - producer.send(sessionConsumer.createTextMessage(body)); + int NUMBER_OF_MESSAGES = 40; + + try (Session sessionProducer = connection.createSession(true, Session.AUTO_ACKNOWLEDGE)) { + Queue queue = sessionProducer.createQueue(getName()); + MessageProducer producer = sessionProducer.createProducer(queue); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(sessionProducer.createTextMessage(body)); + } + sessionProducer.commit(); } - sessionProducer.commit(); } - MessageConsumer consumer = sessionConsumer.createConsumer(queue); - connection.start(); - boolean failed = false; + try (Connection connection = proxiedFactory.createConnection()) { + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = sessionConsumer.createQueue(getName()); + MessageConsumer consumer = sessionConsumer.createConsumer(queue); + connection.start(); + + assertEquals(1, proxy.getInboundHandlers().size()); + assertEquals(1, proxy.getOutbounddHandlers().size()); - try { for (int i = 0; i < 10; i++) { consumer.receive(5000); } @@ -363,21 +324,22 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { assertTrue(failed); - connection = factory.createConnection(); - connection.start(); - runAfter(connection::close); - sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - queue = sessionConsumer.createQueue(getName()); - consumer = sessionConsumer.createConsumer(queue); + try (Connection connection = regularCF.createConnection()) { + connection.start(); + runAfter(connection::close); + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = sessionConsumer.createQueue(getName()); + MessageConsumer consumer = sessionConsumer.createConsumer(queue); - for (int i = 0; i < numberOfMessages; i++) { - TextMessage message = (TextMessage) consumer.receive(5000); - assertNotNull(message); - assertEquals(body, message.getText()); + for (int i = 0; i < numberOfMessages; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + assertNotNull(message); + assertEquals(body, message.getText()); + } + + assertNull(consumer.receiveNoWait()); } - assertNull(consumer.receiveNoWait()); - assertEquals(0L, serverQueue.getMessageCount()); Wait.assertEquals(0, () -> { @@ -385,4 +347,28 @@ public class LargeMessageFrozenTest extends ActiveMQTestBase { return server.getConfiguration().getLargeMessagesLocation().listFiles().length; }); } + + private static ConnectionFactory createRegularCF(String protocol) { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + } + + private static ConnectionFactory createProxiedFactory(String protocol) { + ConnectionFactory factory; + switch (protocol.toUpperCase(Locale.ROOT)) { + case "CORE": + ActiveMQConnectionFactory artemisfactory = new ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000"); + assertEquals(100, artemisfactory.getServerLocator().getClientFailureCheckPeriod()); + assertEquals(1000, artemisfactory.getServerLocator().getConnectionTTL()); + assertEquals(1000, artemisfactory.getServerLocator().getConsumerWindowSize()); + factory = artemisfactory; + break; + case "AMQP": + JmsConnectionFactory qpidFactory = new JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=1000&jms.prefetchPolicy.all=2"); + factory = qpidFactory; + break; + default: + factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:33333"); + } + return factory; + } }