ARTEMIS-5166 Improving ReconnectTest

I'm trying to make it faster and less likely to fail over non issues
This commit is contained in:
Clebert Suconic 2024-11-19 15:52:03 -05:00 committed by clebertsuconic
parent e9c06bd9f8
commit 6f779a7908
1 changed files with 42 additions and 46 deletions

View File

@ -67,7 +67,7 @@ public class ReconnectTest extends ActiveMQTestBase {
} }
public void internalTestReconnect(final boolean isNetty) throws Exception { public void internalTestReconnect(final boolean isNetty) throws Exception {
final int pingPeriod = 1000; final int pingPeriod = 100;
ActiveMQServer server = createServer(false, isNetty); ActiveMQServer server = createServer(false, isNetty);
@ -111,10 +111,7 @@ public class ReconnectTest extends ActiveMQTestBase {
assertTrue(latch.await(5, TimeUnit.SECONDS)); assertTrue(latch.await(5, TimeUnit.SECONDS));
// Some time to let possible loops to occur Wait.assertEquals(1, count::get, 5000, 100);
Thread.sleep(500);
assertEquals(1, count.get());
locator.close(); locator.close();
} finally { } finally {
@ -148,42 +145,39 @@ public class ReconnectTest extends ActiveMQTestBase {
ClientSessionInternal session = null; ClientSessionInternal session = null;
try { try {
for (int i = 0; i < 100; i++) { ServerLocator locator = createFactory(isNetty);
ServerLocator locator = createFactory(isNetty); locator.setClientFailureCheckPeriod(pingPeriod);
locator.setClientFailureCheckPeriod(pingPeriod); locator.setRetryInterval(1);
locator.setRetryInterval(1); locator.setRetryIntervalMultiplier(1d);
locator.setRetryIntervalMultiplier(1d); locator.setReconnectAttempts(-1);
locator.setReconnectAttempts(-1); locator.setConfirmationWindowSize(-1);
locator.setConfirmationWindowSize(-1); ClientSessionFactory factory = createSessionFactory(locator);
ClientSessionFactory factory = createSessionFactory(locator);
session = (ClientSessionInternal) factory.createSession(); session = (ClientSessionInternal) factory.createSession();
session.addMetaData("meta1", "meta1"); session.addMetaData("meta1", "meta1");
ServerSession[] sessions = countMetadata(server, "meta1", 1); Wait.assertEquals(1, () -> getSessionsWithMetadata(server, "meta1").length);
assertEquals(1, sessions.length);
final AtomicInteger count = new AtomicInteger(0); ServerSession[] sessions = getSessionsWithMetadata(server, "meta1");
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
session.addFailoverListener(eventType -> { session.addFailoverListener(eventType -> {
if (eventType == FailoverEventType.FAILOVER_COMPLETED) { if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
latch.countDown(); latch.countDown();
} }
}); });
sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!")); sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!"));
assertTrue(latch.await(5, TimeUnit.SECONDS)); assertTrue(latch.await(5, TimeUnit.SECONDS));
sessions = countMetadata(server, "meta1", 1); Wait.assertEquals(1, () -> getSessionsWithMetadata(server, "meta1").length);
assertEquals(1, sessions.length); locator.close();
locator.close(); Wait.assertEquals(0, () -> getSessionsWithMetadata(server, "meta1").length);
}
} finally { } finally {
try { try {
session.close(); session.close();
@ -195,19 +189,12 @@ public class ReconnectTest extends ActiveMQTestBase {
} }
private ServerSession[] countMetadata(ActiveMQServer server, String parameter, int expected) throws Exception { private ServerSession[] getSessionsWithMetadata(ActiveMQServer server, String parameter) throws Exception {
List<ServerSession> sessionList = new LinkedList<>(); List<ServerSession> sessionList = new LinkedList<>();
for (int i = 0; i < 10 && sessionList.size() != expected; i++) { for (ServerSession sess : server.getSessions()) {
sessionList.clear(); if (sess.getMetaData(parameter) != null) {
for (ServerSession sess : server.getSessions()) { sessionList.add(sess);
if (sess.getMetaData(parameter) != null) {
sessionList.add(sess);
}
}
if (sessionList.size() != expected) {
Thread.sleep(100);
} }
} }
@ -353,7 +340,7 @@ public class ReconnectTest extends ActiveMQTestBase {
final long retryInterval = 50; final long retryInterval = 50;
final double retryMultiplier = 1d; final double retryMultiplier = 1d;
final int reconnectAttempts = 1; final int reconnectAttempts = 1;
ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
sf.addFailoverListener(eventType -> { sf.addFailoverListener(eventType -> {
@ -397,8 +384,11 @@ public class ReconnectTest extends ActiveMQTestBase {
final int reconnectAttempts = 10; final int reconnectAttempts = 10;
ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
runAfter(sf::close);
ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true); ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true);
runAfter(session::close);
SimpleString queueName1 = SimpleString.of("my_queue_one"); SimpleString queueName1 = SimpleString.of("my_queue_one");
SimpleString addressName1 = SimpleString.of("my_address_one"); SimpleString addressName1 = SimpleString.of("my_address_one");
@ -412,16 +402,22 @@ public class ReconnectTest extends ActiveMQTestBase {
Wait.assertEquals(1, () -> getConsumerCount(server, session)); Wait.assertEquals(1, () -> getConsumerCount(server, session));
Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); Wait.assertTrue(() -> matchConsumers(server, session, clientConsumer2), 5000, 100);
ServerConsumer serverConsumer = serverConsumers.iterator().next();
assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
session.close(); session.close();
sf.close(); sf.close();
server.stop(); server.stop();
} }
boolean matchConsumers(ActiveMQServer server, ClientSessionInternal session, ClientConsumer clientConsumer) {
Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
if (serverConsumers.size() != 1) {
return false;
}
ServerConsumer serverConsumer = serverConsumers.iterator().next();
return clientConsumer.getConsumerContext().getId() == serverConsumer.getID();
}
private int getConsumerCount(ActiveMQServer server, ClientSessionInternal session) { private int getConsumerCount(ActiveMQServer server, ClientSessionInternal session) {
ServerSession serverSession = server.getSessionByID(session.getName()); ServerSession serverSession = server.getSessionByID(session.getName());
if (serverSession == null) { if (serverSession == null) {