diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java index c5a99b5f56..fa0e1a3442 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java @@ -44,6 +44,10 @@ import org.junit.Test; public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase { + private static final int PACE_TIME = 500; // Wait in millisecond for each worker + private static final int NUMBER_OF_WORKERS = 5; + private static final int NUMBER_OF_RESTARTS = 5; + private static final Logger logger = Logger.getLogger(PageCleanupWhileReplicaCatchupTest.class); volatile boolean running = true; @@ -82,18 +86,17 @@ public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase { @Test(timeout = 160_000) public void testPageCleanup() throws Throwable { - int numberOfWorkers = 20; - Worker[] workers = new Worker[numberOfWorkers]; + Worker[] workers = new Worker[NUMBER_OF_WORKERS]; - for (int i = 0; i < 20; i++) { + for (int i = 0; i < NUMBER_OF_WORKERS; i++) { liveServer.getServer().addAddressInfo(new AddressInfo("WORKER_" + i).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); liveServer.getServer().createQueue(new QueueConfiguration("WORKER_" + i).setRoutingType(RoutingType.ANYCAST).setDurable(true)); workers[i] = new Worker("WORKER_" + i); workers[i].start(); } - for (int i = 0; i < 25; i++) { + for (int i = 0; i < NUMBER_OF_RESTARTS; i++) { logger.debug("Starting replica " + i); backupServer.start(); Wait.assertTrue(backupServer.getServer()::isReplicaSync); @@ -106,26 +109,15 @@ public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase { worker.join(); } - Throwable toThrow = null; for (Worker worker : workers) { if (worker.throwable != null) { - worker.queue.getPagingStore().getCursorProvider().scheduleCleanup(); - Thread.sleep(2000); - worker.queue.getPagingStore().getCursorProvider().cleanup(); - - // This is more a debug statement in case there is an issue with the test - System.out.println("PagingStore(" + worker.queueName + ")::isPaging() = " + worker.queue.getPagingStore().isPaging() + " after test failure " + worker.throwable.getMessage()); - toThrow = worker.throwable; + throw new RuntimeException("Worker " + worker.queueName + " failed", worker.throwable); } } - if (toThrow != null) { - throw toThrow; - } - for (Worker worker : workers) { PagingStoreImpl storeImpl = (PagingStoreImpl)worker.queue.getPagingStore(); - Assert.assertTrue("Store impl " + worker.queueName + " had more files than expected on " + storeImpl.getFolder(), storeImpl.getNumberOfFiles() <= 1); + Wait.assertFalse(storeImpl::isPaging, 5000, 100); } } @@ -156,11 +148,10 @@ public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase { for (int i = 0; i < 10; i++) { producer.send(session.createTextMessage("hello " + i)); } - Wait.assertTrue(queue.getPagingStore()::isPaging); for (int i = 0; i < 10; i++) { Assert.assertNotNull(consumer.receive(5000)); } - Wait.assertFalse("Waiting for !Paging on " + queueName + " with folder " + queue.getPagingStore().getFolder(), queue.getPagingStore()::isPaging); + Thread.sleep(500); // this is just pacing producing and consuming } } } catch (Throwable e) {