diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index b1285985a1..7451288e44 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -165,7 +165,7 @@ public class GlobalPagingTest extends PagingTest { session.start(); - assertEquals(numberOfMessages * 2, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount); // The consumer has to be created after the getMessageCount(queue) assertion // otherwise delivery could alter the messagecount and give us a false failure @@ -182,7 +182,7 @@ public class GlobalPagingTest extends PagingTest { } session.commit(); - assertEquals(0, getMessageCount(queue)); + Wait.assertEquals(0, queue::getMessageCount); } protected void sendFewMessages(int numberOfMessages, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 40826394ed..bfa1158f0f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -453,7 +453,7 @@ public class PagingTest extends ActiveMQTestBase { session.start(); - assertEquals(numberOfMessages * 2, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount); // The consumer has to be created after the getMessageCount(queue) assertion // otherwise delivery could alter the messagecount and give us a false failure @@ -476,7 +476,7 @@ public class PagingTest extends ActiveMQTestBase { locator.close(); - assertEquals(0, getMessageCount(queue)); + Wait.assertEquals(0, queue::getMessageCount); waitForNotPaging(queue); @@ -520,7 +520,7 @@ public class PagingTest extends ActiveMQTestBase { Wait.assertEquals(0, purgeQueue::getMessageCount); - Assert.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore().getAddressSize()); + Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize); MessageConsumer consumer = session.createConsumer(jmsQueue); @@ -583,7 +583,7 @@ public class PagingTest extends ActiveMQTestBase { server.stop(); server.start(); - Assert.assertEquals(0, server.getPagingManager().getTransactions().size()); + Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size()); } // First page is complete but it wasn't deleted @@ -1133,7 +1133,7 @@ public class PagingTest extends ActiveMQTestBase { queue = server.locateQueue(PagingTest.ADDRESS); - assertEquals(0, getMessageCount(queue)); + Wait.assertEquals(0, queue::getMessageCount); timeout = System.currentTimeMillis() + 10000; while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) { @@ -1154,7 +1154,7 @@ public class PagingTest extends ActiveMQTestBase { server.start(); - final int numberOfMessages = 5000; + final int numberOfMessages = 500; final int numberOfTX = 10; @@ -1162,7 +1162,7 @@ public class PagingTest extends ActiveMQTestBase { locator = createInVMNonHALocator(); - locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false); sf = createSessionFactory(locator); @@ -1213,7 +1213,7 @@ public class PagingTest extends ActiveMQTestBase { Queue queue = server.locateQueue(ADDRESS); - assertEquals(numberOfMessages, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); LinkedList xids = new LinkedList<>(); @@ -1247,7 +1247,7 @@ public class PagingTest extends ActiveMQTestBase { sessionCheck.close(); - assertEquals(numberOfMessages, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); sf.close(); locator.close(); @@ -1270,7 +1270,7 @@ public class PagingTest extends ActiveMQTestBase { session.start(); - assertEquals(numberOfMessages, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); ClientMessage msg = consumer.receive(5000); if (msg != null) { @@ -1318,7 +1318,7 @@ public class PagingTest extends ActiveMQTestBase { locator.close(); - assertEquals(0, getMessageCount(queue)); + Wait.assertEquals(0, queue::getMessageCount); waitForNotPaging(queue); } @@ -1457,7 +1457,7 @@ public class PagingTest extends ActiveMQTestBase { Queue queue = server.locateQueue(ADDRESS); - assertEquals(numberOfMessages, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); int msgReceived = 0; ClientSession sessionConsumer = sf.createSession(false, false, false); @@ -1488,7 +1488,7 @@ public class PagingTest extends ActiveMQTestBase { locator.close(); - assertEquals(0, getMessageCount(queue)); + Wait.assertEquals(0, queue::getMessageCount); long timeout = System.currentTimeMillis() + 5000; while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) { @@ -1623,7 +1623,7 @@ public class PagingTest extends ActiveMQTestBase { Queue queue = server.locateQueue(ADDRESS); - assertEquals(numberOfMessages, getMessageCount(queue)); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); int msgReceived = 0; ClientSession sessionConsumer = sf.createSession(false, false, false); @@ -1654,7 +1654,7 @@ public class PagingTest extends ActiveMQTestBase { locator.close(); - assertEquals(0, getMessageCount(queue)); + Wait.assertEquals(0, queue::getMessageCount); long timeout = System.currentTimeMillis() + 5000; while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) { @@ -1707,10 +1707,6 @@ public class PagingTest extends ActiveMQTestBase { locator = createInVMNonHALocator(); sf = createSessionFactory(locator); - queue = server.locateQueue(ADDRESS); - - // assertEquals(numberOfMessages, getMessageCount(queue)); - msgReceived = 0; sessionConsumer = sf.createSession(false, false, false); sessionConsumer.start(); @@ -1825,7 +1821,7 @@ public class PagingTest extends ActiveMQTestBase { } - Assert.assertEquals(3, queue.getPageSubscription().getPagingStore().getCurrentWritingPage()); + Wait.assertEquals(3, queue.getPageSubscription().getPagingStore()::getCurrentWritingPage); ClientConsumer consumer = session.createConsumer(ADDRESS); session.start(); @@ -2427,7 +2423,7 @@ public class PagingTest extends ActiveMQTestBase { } } - assertEquals(0, server.getPagingManager().getTransactions().size()); + Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size()); } finally { running.set(false); @@ -2584,7 +2580,7 @@ public class PagingTest extends ActiveMQTestBase { Thread.sleep(500); } - assertEquals(0, server.getPagingManager().getTransactions().size()); + Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size()); } @@ -3496,9 +3492,9 @@ public class PagingTest extends ActiveMQTestBase { ClientSession session = sf.createSession(true, true, 0); session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); - Queue queue = server.locateQueue(ADDRESS); + final Queue queue1 = server.locateQueue(ADDRESS); - queue.getPageSubscription().getPagingStore().startPaging(); + queue1.getPageSubscription().getPagingStore().startPaging(); ClientProducer producer = session.createProducer(PagingTest.ADDRESS); @@ -3516,19 +3512,19 @@ public class PagingTest extends ActiveMQTestBase { producer.send(message); session.commit(); if (i < 19) { - queue.getPageSubscription().getPagingStore().forceAnotherPage(); + queue1.getPageSubscription().getPagingStore().forceAnotherPage(); } } - Assert.assertEquals(20, queue.getPageSubscription().getPagingStore().getCurrentWritingPage()); + Wait.assertEquals(20, ()->queue1.getPageSubscription().getPagingStore().getCurrentWritingPage()); // This will force a scenario where the pages are cleaned up. When restarting we need to check if the current page is complete // if it is complete we must move to another page avoiding races on cleanup // which could happen during a crash / restart long tx = server.getStorageManager().generateID(); for (int i = 1; i <= 20; i++) { - server.getStorageManager().storePageCompleteTransactional(tx, queue.getID(), new PagePositionImpl(i, 1)); + server.getStorageManager().storePageCompleteTransactional(tx, queue1.getID(), new PagePositionImpl(i, 1)); } server.getStorageManager().commit(tx); @@ -3543,7 +3539,7 @@ public class PagingTest extends ActiveMQTestBase { server.start(); - queue = server.locateQueue(ADDRESS); + Queue queue = server.locateQueue(ADDRESS); locator = createInVMNonHALocator(); sf = createSessionFactory(locator); @@ -3825,7 +3821,7 @@ public class PagingTest extends ActiveMQTestBase { Assert.assertNull(consumer.receiveImmediate()); - Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); + Wait.assertEquals(0, ()->server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); for (int i = 0; i < numberOfMessages; i++) { byte[] body = new byte[2048]; @@ -3881,7 +3877,7 @@ public class PagingTest extends ActiveMQTestBase { session.close(); - Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); + Wait.assertEquals(0, ()->server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); } @Test @@ -4397,7 +4393,7 @@ public class PagingTest extends ActiveMQTestBase { // It's async, so need to wait a bit for it happening assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging()); - assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages()); + Wait.assertEquals(1, ()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages()); } @Test @@ -4541,8 +4537,6 @@ public class PagingTest extends ActiveMQTestBase { assertNotNull(message); message.acknowledge(); - // assertEquals(msg, message.getIntProperty("propTest").intValue()); - System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest")); } @@ -5298,8 +5292,7 @@ public class PagingTest extends ActiveMQTestBase { producer.send(message); Queue q = (Queue) server.getPostOffice().getBinding(ADDRESS).getBindable(); - Wait.waitFor(() -> 3 == getMessageCount(q)); - Assert.assertEquals(3, getMessageCount(q)); + Wait.assertEquals(3, q::getMessageCount); // send a message with a dup ID that should fail b/c the address is full SimpleString dupID1 = new SimpleString("abcdefg"); @@ -5308,7 +5301,7 @@ public class PagingTest extends ActiveMQTestBase { validateExceptionOnSending(producer, message); - Assert.assertEquals(3, getMessageCount(q)); + Wait.assertEquals(3, q::getMessageCount); ClientConsumer consumer = session.createConsumer(ADDRESS); @@ -5321,11 +5314,11 @@ public class PagingTest extends ActiveMQTestBase { session.commit(); // to make sure it's on the server (roundtrip) consumer.close(); - Assert.assertEquals(2, getMessageCount(q)); + Wait.assertEquals(2, q::getMessageCount); producer.send(message); - Assert.assertEquals(3, getMessageCount(q)); + Wait.assertEquals(3, q::getMessageCount); consumer = session.createConsumer(ADDRESS); @@ -5743,7 +5736,7 @@ public class PagingTest extends ActiveMQTestBase { session.commit(); Queue queue = server.locateQueue(PagingTest.ADDRESS); - Assert.assertEquals(numberOfMessages, queue.getMessageCount()); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); store.forceAnotherPage(); @@ -5765,7 +5758,7 @@ public class PagingTest extends ActiveMQTestBase { store.getCursorProvider().cleanup(); - Assert.assertEquals(0, queue.getMessageCount()); + Wait.assertEquals(0, queue::getMessageCount); long timeout = System.currentTimeMillis() + 5000; while (store.isPaging() && timeout > System.currentTimeMillis()) { @@ -5778,7 +5771,7 @@ public class PagingTest extends ActiveMQTestBase { locator.close(); - Assert.assertEquals(1, store.getNumberOfPages()); + Wait.assertEquals(1, store::getNumberOfPages); } finally { try { @@ -6265,8 +6258,8 @@ public class PagingTest extends ActiveMQTestBase { session.close(); Queue queue = server.locateQueue(PagingTest.ADDRESS); - assertEquals(numberOfMessages, getMessageCount(queue)); - assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages()); + Wait.assertEquals(numberOfMessages, queue::getMessageCount); + Wait.assertEquals(1, ()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages()); sf.close();