diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java index 286809231b..81161ac6e6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java @@ -67,7 +67,7 @@ public class ReconnectTest extends ActiveMQTestBase { } public void internalTestReconnect(final boolean isNetty) throws Exception { - final int pingPeriod = 1000; + final int pingPeriod = 100; ActiveMQServer server = createServer(false, isNetty); @@ -111,10 +111,7 @@ public class ReconnectTest extends ActiveMQTestBase { assertTrue(latch.await(5, TimeUnit.SECONDS)); - // Some time to let possible loops to occur - Thread.sleep(500); - - assertEquals(1, count.get()); + Wait.assertEquals(1, count::get, 5000, 100); locator.close(); } finally { @@ -148,42 +145,39 @@ public class ReconnectTest extends ActiveMQTestBase { ClientSessionInternal session = null; try { - for (int i = 0; i < 100; i++) { - ServerLocator locator = createFactory(isNetty); - locator.setClientFailureCheckPeriod(pingPeriod); - locator.setRetryInterval(1); - locator.setRetryIntervalMultiplier(1d); - locator.setReconnectAttempts(-1); - locator.setConfirmationWindowSize(-1); - ClientSessionFactory factory = createSessionFactory(locator); + ServerLocator locator = createFactory(isNetty); + locator.setClientFailureCheckPeriod(pingPeriod); + locator.setRetryInterval(1); + locator.setRetryIntervalMultiplier(1d); + locator.setReconnectAttempts(-1); + locator.setConfirmationWindowSize(-1); + ClientSessionFactory factory = createSessionFactory(locator); - session = (ClientSessionInternal) factory.createSession(); + session = (ClientSessionInternal) factory.createSession(); - session.addMetaData("meta1", "meta1"); + session.addMetaData("meta1", "meta1"); - ServerSession[] sessions = countMetadata(server, "meta1", 1); - assertEquals(1, sessions.length); + Wait.assertEquals(1, () -> getSessionsWithMetadata(server, "meta1").length); - final AtomicInteger count = new AtomicInteger(0); + ServerSession[] sessions = getSessionsWithMetadata(server, "meta1"); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); - session.addFailoverListener(eventType -> { - if (eventType == FailoverEventType.FAILOVER_COMPLETED) { - latch.countDown(); - } - }); + session.addFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + }); - sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!")); + sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!")); - assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(latch.await(5, TimeUnit.SECONDS)); - sessions = countMetadata(server, "meta1", 1); + Wait.assertEquals(1, () -> getSessionsWithMetadata(server, "meta1").length); - assertEquals(1, sessions.length); + locator.close(); - locator.close(); - } + Wait.assertEquals(0, () -> getSessionsWithMetadata(server, "meta1").length); } finally { try { session.close(); @@ -195,19 +189,12 @@ public class ReconnectTest extends ActiveMQTestBase { } - private ServerSession[] countMetadata(ActiveMQServer server, String parameter, int expected) throws Exception { + private ServerSession[] getSessionsWithMetadata(ActiveMQServer server, String parameter) throws Exception { List sessionList = new LinkedList<>(); - for (int i = 0; i < 10 && sessionList.size() != expected; i++) { - sessionList.clear(); - for (ServerSession sess : server.getSessions()) { - if (sess.getMetaData(parameter) != null) { - sessionList.add(sess); - } - } - - if (sessionList.size() != expected) { - Thread.sleep(100); + for (ServerSession sess : server.getSessions()) { + if (sess.getMetaData(parameter) != null) { + sessionList.add(sess); } } @@ -353,7 +340,7 @@ public class ReconnectTest extends ActiveMQTestBase { final long retryInterval = 50; final double retryMultiplier = 1d; final int reconnectAttempts = 1; - ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); + ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); final CountDownLatch latch = new CountDownLatch(1); sf.addFailoverListener(eventType -> { @@ -397,8 +384,11 @@ public class ReconnectTest extends ActiveMQTestBase { final int reconnectAttempts = 10; ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); + runAfter(sf::close); ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true); + runAfter(session::close); + SimpleString queueName1 = SimpleString.of("my_queue_one"); SimpleString addressName1 = SimpleString.of("my_address_one"); @@ -412,16 +402,22 @@ public class ReconnectTest extends ActiveMQTestBase { Wait.assertEquals(1, () -> getConsumerCount(server, session)); - Set serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); - ServerConsumer serverConsumer = serverConsumers.iterator().next(); - assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID()); - + Wait.assertTrue(() -> matchConsumers(server, session, clientConsumer2), 5000, 100); session.close(); sf.close(); server.stop(); } + boolean matchConsumers(ActiveMQServer server, ClientSessionInternal session, ClientConsumer clientConsumer) { + Set serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); + if (serverConsumers.size() != 1) { + return false; + } + ServerConsumer serverConsumer = serverConsumers.iterator().next(); + return clientConsumer.getConsumerContext().getId() == serverConsumer.getID(); + } + private int getConsumerCount(ActiveMQServer server, ClientSessionInternal session) { ServerSession serverSession = server.getSessionByID(session.getName()); if (serverSession == null) {