From 8d29742d406500bd86e50e57928e7e9ac8598440 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 21 Jun 2022 13:24:30 -0400 Subject: [PATCH] ARTEMIS-3862 Adding Topic Durable Subscription to RemoveSubscriptionRaceTest This commit just contains test additions --- .../client/RemoveSubscriptionRaceTest.java | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java index 0911d69e5b..6517219425 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java @@ -62,25 +62,35 @@ public class RemoveSubscriptionRaceTest extends ActiveMQTestBase { @Test public void testCreateSubscriptionCoreNoFiles() throws Exception { - internalTest("core", false, 5, 1000); + internalTest("core", false, 5, 1000, false); } @Test public void testCreateSubscriptionAMQPNoFiles() throws Exception { - internalTest("amqp", false, 5, 1000); + internalTest("amqp", false, 5, 1000, false); } @Test public void testCreateSubscriptionCoreRealFiles() throws Exception { - internalTest("core", true, 2, 200); + internalTest("core", true, 2, 200, false); } @Test public void testCreateSubscriptionAMQPRealFiles() throws Exception { - internalTest("amqp", true, 2, 200); + internalTest("amqp", true, 2, 200, false); } - public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages) throws Exception { + @Test + public void testCreateSubscriptionCoreRealFilesDurable() throws Exception { + internalTest("core", true, 2, 200, true); + } + + @Test + public void testCreateSubscriptionAMQPRealFilesDurable() throws Exception { + internalTest("amqp", true, 2, 200, true); + } + + public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages, boolean durableSub) throws Exception { server = createServer(realFiles, true); server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST)); server.getConfiguration().addQueueConfiguration(new QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST)); @@ -99,19 +109,32 @@ public class RemoveSubscriptionRaceTest extends ActiveMQTestBase { CyclicBarrier flagStart = new CyclicBarrier(threads + 1); for (int i = 0; i < threads; i++) { + final int threadNumber = i; executorService.execute(() -> { try { flagStart.await(10, TimeUnit.SECONDS); for (int n = 0; n < numberOfMessages && running.get(); n++) { Connection connection = factory.createConnection(); + if (durableSub) { + connection.setClientID("t" + threadNumber); + } connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(SUB_NAME); - MessageConsumer consumer = session.createConsumer(topic); + MessageConsumer consumer; + if (durableSub) { + consumer = session.createDurableSubscriber(topic, "t" + threadNumber); + } else { + consumer = session.createConsumer(topic); + } Message message = consumer.receiveNoWait(); if (message != null) { message.acknowledge(); } + consumer.close(); + if (durableSub) { + session.unsubscribe("t" + threadNumber); + } connection.close(); } } catch (Throwable e) { @@ -150,6 +173,8 @@ public class RemoveSubscriptionRaceTest extends ActiveMQTestBase { Wait.assertEquals(0, this::countAddMessage, 5000, 100); Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100); + + Assert.assertEquals(0, errors.get()); } int countAddMessage() throws Exception {