ARTEMIS-5148 Simplifying and making ClusteredLargeMessageInterruptTest more reliable

This commit is contained in:
Clebert Suconic 2024-11-11 19:47:46 -05:00 committed by clebertsuconic
parent fd3df362b1
commit 447b72ae5b
1 changed files with 61 additions and 102 deletions

View File

@ -110,19 +110,13 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
Process serverProcess2;
public ConnectionFactory createConnectionFactory(int broker, String protocol) {
int portUsed = 61616 + broker * 100;
if (protocol.equals("CORE")) {
switch (broker) {
// I need the connections stable in the selected server
case 0:
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
case 1:
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61716?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
default:
logger.warn("undefined argument {}", broker);
throw new IllegalArgumentException("undefined");
}
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:" + portUsed + "?ha=false&useTopologyForLoadBalancing=false&callTimeout=1000");
} else {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + (61616 + broker * 100) + "?ha=false");
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + portUsed);
}
}
@ -145,52 +139,27 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
@Test
public void testLargeMessageAMQPTX() throws Throwable {
testInterrupt("AMQP", true, false);
}
@Test
public void testLargeMessageAMQPTXKill() throws Throwable {
testInterrupt("AMQP", true, true);
testInterrupt("AMQP", true);
}
@Test
public void testInterruptAMQPNonTX() throws Throwable {
testInterrupt("AMQP", false, false);
}
@Test
public void testInterruptAMQPNonTXKill() throws Throwable {
testInterrupt("AMQP", false, true);
testInterrupt("AMQP", false);
}
@Test
public void testInterruptCORETX() throws Throwable {
testInterrupt("CORE", true, false);
}
@Test
public void testInterruptCORETXKill() throws Throwable {
testInterrupt("CORE", true, true);
testInterrupt("CORE", true);
}
@Test
public void testInterruptOPENWIRETX() throws Throwable {
testInterrupt("OPENWIRE", true, false);
}
@Test
public void testInterruptOPENWIRETXKill() throws Throwable {
testInterrupt("OPENWIRE", true, true);
testInterrupt("OPENWIRE", true);
}
@Test
public void testInterruptCORENonTX() throws Throwable {
testInterrupt("CORE", false, false);
}
@Test
public void testInterruptCORENonTXKill() throws Throwable {
testInterrupt("CORE", false, true);
testInterrupt("CORE", false);
}
private CountDownLatch startSendingThreads(Executor executor, String protocol, int broker, int threads, boolean tx, String queueName) {
@ -242,8 +211,7 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
for (int i = 0; i < threads; i++) {
executor.execute(() -> {
int numberOfMessages = 0;
try {
Connection connection = factory.createConnection();
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
@ -265,6 +233,7 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
logger.info("Done sending");
done.countDown();
@ -279,7 +248,7 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
// this test has sleeps as the test will send while still active
// we keep sending all the time.. so the testInterruptLM acts like a controller telling the threads when to stop
private void testInterrupt(String protocol, boolean tx, boolean useKill) throws Throwable {
private void testInterrupt(String protocol, boolean tx) throws Throwable {
final int SENDING_THREADS = 10;
final int CONSUMING_THREADS = 10;
final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server
@ -289,55 +258,50 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 0, CONSUMING_THREADS, tx, queueName);
Thread.sleep(2000);
killProcess(serverProcess, useKill);
runningSend = false;
runningConsumer = false;
assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
logger.info("All receivers and senders are done!!!");
serverProcess = startServer0();
Thread.sleep(2000);
sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
killProcess(serverProcess2, useKill);
assertTrue(serverProcess2.waitFor(10, TimeUnit.SECONDS));
runningSend = false;
runningConsumer = false;
assertTrue(sendDone.await(1, TimeUnit.MINUTES));
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
serverProcess2 = startServer1();
sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
Thread.sleep(2000);
runningSend = false;
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000);
File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
File lmFolder2 = new File(getServerLocation(SERVER_NAME_1) + "/data/large-messages");
Wait.waitFor(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0 && lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0);
{
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
runningConsumer = false;
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
// let it producing for a while
Thread.sleep(2000);
runningSend = false;
assertTrue(sendDone.await(1, TimeUnit.MINUTES));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer0();
runningConsumer = false;
assertTrue(receiverDone.await(1, TimeUnit.MINUTES));
long timeout = System.currentTimeMillis() + 60_000;
ConnectionFactory factory = createConnectionFactory(1, protocol);
// This will flush all messages, making sure everything is consumed.
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
connection.start();
while (System.currentTimeMillis() < timeout) {
TextMessage message = (TextMessage)consumer.receive(100);
if (message == null) {
if (lmFolder.listFiles().length == 0 && lmFolder2.listFiles().length == 0) {
break;
}
} else {
assertTrue(message.getText().startsWith(largebody));
}
}
}
}
logger.info("All receivers and senders are done!!!");
// no need to use wait here, the previous check should have checked that already
assertEquals(0, lmFolder.listFiles().length);
assertEquals(0, lmFolder2.listFiles().length);
assertEquals(0, errors.get());
@ -353,12 +317,8 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
testInterruptFailOnBridge("CORE", false);
}
private void killProcess(Process process, boolean useKill) throws Exception {
if (useKill) {
Runtime.getRuntime().exec("kill -SIGINT " + process.pid());
} else {
process.destroyForcibly();
}
private void killProcess(Process process) throws Exception {
process.destroyForcibly();
}
@ -375,26 +335,25 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS + CONSUMING_THREADS);
runAfter(executorService::shutdownNow);
// only start the sender for a while
CountDownLatch sendDone = startSendingThreads(executorService, protocol, 0, SENDING_THREADS, tx, queueName);
Thread.sleep(2000);
runningSend = runningConsumer = false;
killProcess(serverProcess, false);
assertTrue(serverProcess.waitFor(10, TimeUnit.MINUTES));
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
assertTrue(sendDone.await(1, TimeUnit. MINUTES));
sendDone = startSendingThreads(executorService, protocol, 1, SENDING_THREADS, tx, queueName);
CountDownLatch receiverDone = startConsumingThreads(executorService, protocol, 1, CONSUMING_THREADS, tx, queueName);
killProcess(serverProcess, false);
assertTrue(serverProcess.waitFor(10, TimeUnit.SECONDS));
killProcess(serverProcess);
assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES));
serverProcess = startServer0();
Thread.sleep(5000);
runningSend = false;
assertTrue(sendDone.await(10, TimeUnit.SECONDS));
assertTrue(sendDone.await(1, TimeUnit.MINUTES));
QueueControl queueControl1 = getQueueControl(server1URI, builderServer1, queueName, queueName, RoutingType.ANYCAST, 5000);
QueueControl queueControl2 = getQueueControl(server2URI, builderServer2, queueName, queueName, RoutingType.ANYCAST, 5000);
@ -405,7 +364,7 @@ public class ClusteredLargeMessageInterruptTest extends SoakTestBase {
Wait.assertTrue(() -> queueControl1.getMessageCount() == 0 && queueControl2.getMessageCount() == 0);
runningConsumer = false;
assertTrue(receiverDone.await(10, TimeUnit.SECONDS));
assertTrue(receiverDone.await(1, TimeUnit.MINUTES));
Wait.assertEquals(0, () -> lmFolder.listFiles().length);