ARTEMIS-3862 Adding Topic Durable Subscription to RemoveSubscriptionRaceTest

This commit just contains test additions
This commit is contained in:
Clebert Suconic 2022-06-21 13:24:30 -04:00
parent 78587f0a82
commit 8d29742d40
1 changed files with 31 additions and 6 deletions

View File

@ -62,25 +62,35 @@ public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
@Test @Test
public void testCreateSubscriptionCoreNoFiles() throws Exception { public void testCreateSubscriptionCoreNoFiles() throws Exception {
internalTest("core", false, 5, 1000); internalTest("core", false, 5, 1000, false);
} }
@Test @Test
public void testCreateSubscriptionAMQPNoFiles() throws Exception { public void testCreateSubscriptionAMQPNoFiles() throws Exception {
internalTest("amqp", false, 5, 1000); internalTest("amqp", false, 5, 1000, false);
} }
@Test @Test
public void testCreateSubscriptionCoreRealFiles() throws Exception { public void testCreateSubscriptionCoreRealFiles() throws Exception {
internalTest("core", true, 2, 200); internalTest("core", true, 2, 200, false);
} }
@Test @Test
public void testCreateSubscriptionAMQPRealFiles() throws Exception { public void testCreateSubscriptionAMQPRealFiles() throws Exception {
internalTest("amqp", true, 2, 200); internalTest("amqp", true, 2, 200, false);
} }
public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages) throws Exception { @Test
public void testCreateSubscriptionCoreRealFilesDurable() throws Exception {
internalTest("core", true, 2, 200, true);
}
@Test
public void testCreateSubscriptionAMQPRealFilesDurable() throws Exception {
internalTest("amqp", true, 2, 200, true);
}
public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages, boolean durableSub) throws Exception {
server = createServer(realFiles, true); server = createServer(realFiles, true);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST)); server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST)); server.getConfiguration().addQueueConfiguration(new QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST));
@ -99,19 +109,32 @@ public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
CyclicBarrier flagStart = new CyclicBarrier(threads + 1); CyclicBarrier flagStart = new CyclicBarrier(threads + 1);
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
final int threadNumber = i;
executorService.execute(() -> { executorService.execute(() -> {
try { try {
flagStart.await(10, TimeUnit.SECONDS); flagStart.await(10, TimeUnit.SECONDS);
for (int n = 0; n < numberOfMessages && running.get(); n++) { for (int n = 0; n < numberOfMessages && running.get(); n++) {
Connection connection = factory.createConnection(); Connection connection = factory.createConnection();
if (durableSub) {
connection.setClientID("t" + threadNumber);
}
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(SUB_NAME); Topic topic = session.createTopic(SUB_NAME);
MessageConsumer consumer = session.createConsumer(topic); MessageConsumer consumer;
if (durableSub) {
consumer = session.createDurableSubscriber(topic, "t" + threadNumber);
} else {
consumer = session.createConsumer(topic);
}
Message message = consumer.receiveNoWait(); Message message = consumer.receiveNoWait();
if (message != null) { if (message != null) {
message.acknowledge(); message.acknowledge();
} }
consumer.close();
if (durableSub) {
session.unsubscribe("t" + threadNumber);
}
connection.close(); connection.close();
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -150,6 +173,8 @@ public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
Wait.assertEquals(0, this::countAddMessage, 5000, 100); Wait.assertEquals(0, this::countAddMessage, 5000, 100);
Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100); Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100);
Assert.assertEquals(0, errors.get());
} }
int countAddMessage() throws Exception { int countAddMessage() throws Exception {