diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HeuristicXATest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HeuristicXATest.java index 8f089c3e26..e32c014221 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HeuristicXATest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HeuristicXATest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -138,19 +139,7 @@ public class HeuristicXATest extends ActiveMQTestBase { } if (isCommit) { - Assert.assertEquals(1, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); - - session = sf.createSession(false, false, false); - - session.start(); - ClientConsumer consumer = session.createConsumer(ADDRESS); - msg = consumer.receive(1000); - Assert.assertNotNull(msg); - msg.acknowledge(); - Assert.assertEquals(body, msg.getBodyBuffer().readString()); - - session.commit(); - session.close(); + assertMessageInQueueThenReceiveAndCheckContent(server, sf); } Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); @@ -213,19 +202,7 @@ public class HeuristicXATest extends ActiveMQTestBase { Assert.assertEquals(0, preparedTransactions.length); if (isCommit) { - Assert.assertEquals(1, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); - - session = sf.createSession(false, false, false); - - session.start(); - ClientConsumer consumer = session.createConsumer(ADDRESS); - msg = consumer.receive(1000); - Assert.assertNotNull(msg); - msg.acknowledge(); - Assert.assertEquals(body, msg.getBodyBuffer().readString()); - - session.commit(); - session.close(); + assertMessageInQueueThenReceiveAndCheckContent(server, sf); } Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); @@ -303,19 +280,7 @@ public class HeuristicXATest extends ActiveMQTestBase { Assert.assertEquals(0, preparedTransactions.length); if (heuristicCommit) { - Assert.assertEquals(1, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); - - session = sf.createSession(false, false, false); - - session.start(); - ClientConsumer consumer = session.createConsumer(ADDRESS); - msg = consumer.receive(1000); - Assert.assertNotNull(msg); - msg.acknowledge(); - Assert.assertEquals(body, msg.getBodyBuffer().readString()); - - session.commit(); - session.close(); + assertMessageInQueueThenReceiveAndCheckContent(server, sf); } Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); @@ -345,6 +310,22 @@ public class HeuristicXATest extends ActiveMQTestBase { session.close(); } + private void assertMessageInQueueThenReceiveAndCheckContent(ActiveMQServer server, ClientSessionFactory sf) throws Exception { + Wait.assertEquals(1, () -> getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())), 5 * 1000, 100); + + ClientSession session = sf.createSession(false, false, false); + + session.start(); + ClientConsumer consumer = session.createConsumer(ADDRESS); + ClientMessage msg = consumer.receive(1000); + Assert.assertNotNull(msg); + msg.acknowledge(); + Assert.assertEquals(body, msg.getBodyBuffer().readString()); + + session.commit(); + session.close(); + } + @Test public void testForgetHeuristicCommitAndRestart() throws Exception { doForgetHeuristicCompletedTxAndRestart(true);