NO-JIRA making MirroredSubscriptionTest more challenging

The test is now setting the mirror to sync
it will block until the first subscription is consumed, kill the servers and restart them
check all the counters

and then start another 4 consumers and at the end check all the counters.

Mirror is now sync making the test more useful and challenging.
This commit is contained in:
Clebert Suconic 2023-07-21 18:17:06 -04:00
parent c50d97d60b
commit 04f29e0162
2 changed files with 27 additions and 4 deletions
tests/smoke-tests/src
main/resources/servers/mirrored-subscriptions/broker1
test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection

View File

@ -110,7 +110,7 @@ under the License.
<broker-connections>
<amqp-connection uri="tcp://localhost:61617" name="mirror" retry-interval="100">
<mirror/>
<mirror sync="true"/>
</amqp-connection>
</broker-connections>

View File

@ -56,6 +56,10 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
public void beforeClass() throws Exception {
cleanupData(SERVER_NAME_A);
cleanupData(SERVER_NAME_B);
startServers();
}
private void startServers() throws Exception {
processB = startServer(SERVER_NAME_B, 1, 0);
processA = startServer(SERVER_NAME_A, 0, 0);
@ -64,10 +68,9 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
}
@Test
public void testSend() throws Throwable {
public void testConsumeAll() throws Throwable {
int COMMIT_INTERVAL = 100;
int NUMBER_OF_MESSAGES = 1000;
int NUMBER_OF_MESSAGES = 300;
int CLIENTS = 5;
String mainURI = "tcp://localhost:61616";
String secondURI = "tcp://localhost:61617";
@ -111,6 +114,7 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
for (int i = 0; i < CLIENTS; i++) {
final int clientID = i;
CountDownLatch threadDone = new CountDownLatch(1);
executorService.execute(() -> {
try (Connection connection = cf.createConnection()) {
connection.setClientID("client" + clientID);
@ -132,8 +136,27 @@ public class MirroredSubscriptionTest extends SmokeTestBase {
errors.incrementAndGet();
} finally {
done.countDown();
threadDone.countDown();
}
});
if (clientID == 0) {
// The first execution will block until finished, we will then kill all the servers and make sure
// all the counters are preserved.
Assert.assertTrue(threadDone.await(300, TimeUnit.SECONDS));
processA.destroyForcibly();
processB.destroyForcibly();
Wait.assertFalse(processA::isAlive);
Wait.assertFalse(processB::isAlive);
startServers();
Wait.assertEquals(0, () -> getMessageCount(mainURI, "client0.subscription0"));
Wait.assertEquals(0, () -> getMessageCount(secondURI, "client0.subscription0"));
for (int checkID = 1; checkID < CLIENTS; checkID++) {
int checkFinal = checkID;
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, "client" + checkFinal + ".subscription" + checkFinal), 2000, 100);
Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(secondURI, "client" + checkFinal + ".subscription" + checkFinal), 2000, 100);
}
}
}
Assert.assertTrue(done.await(300, TimeUnit.SECONDS));