ARTEMIS-3423 Addressing suggestiongs from Robbie Gemmel on PR #3697

This commit is contained in:
Clebert Suconic 2021-08-18 12:53:22 -04:00
parent 995ee5688b
commit 656114045a
1 changed files with 31 additions and 3 deletions

View File

@ -32,9 +32,11 @@ import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.wildfly.common.Assert;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@ -142,7 +144,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSharedDurableConsumerWithSelector() throws JMSException { public void testSharedDurableConsumerWithSelectorChange() throws Exception {
SimpleString qName = amqpUseCoreSubscriptionNaming ? new SimpleString("SharedConsumer") : new SimpleString("SharedConsumer:global"); SimpleString qName = amqpUseCoreSubscriptionNaming ? new SimpleString("SharedConsumer") : new SimpleString("SharedConsumer:global");
Connection connection = createConnection(true); Connection connection = createConnection(true);
try { try {
@ -150,12 +152,18 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
Topic topic = session.createTopic(getTopicName()); Topic topic = session.createTopic(getTopicName());
MessageConsumer consumer = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b"); MessageConsumer consumer = session.createSharedDurableConsumer(topic, "SharedConsumer", "a='1'");
MessageProducer producer = session.createProducer(session.createTopic(getTopicName()));
Message message = session.createMessage();
message.setStringProperty("a", "1");
producer.send(message);
QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
Wait.assertEquals(1, queue::getMessageCount);
assertEquals(-1, queue.getMaxConsumers()); assertEquals(-1, queue.getMaxConsumers());
consumer.close(); consumer.close();
MessageConsumer consumer2 = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b and b=c"); MessageConsumer consumer2 = session.createSharedDurableConsumer(topic, "SharedConsumer", "a=b and b=c");
queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
Wait.assertEquals(0, queue::getMessageCount);
assertEquals(-1, queue.getMaxConsumers()); assertEquals(-1, queue.getMaxConsumers());
} finally { } finally {
connection.close(); connection.close();
@ -163,7 +171,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testUnSharedDurableConsumerWithSelector() throws JMSException { public void testDurableConsumerWithSelectorChange() throws Exception {
SimpleString qName = new SimpleString("foo.SharedConsumer"); SimpleString qName = new SimpleString("foo.SharedConsumer");
Connection connection = createConnection("foo", true); Connection connection = createConnection("foo", true);
try { try {
@ -172,12 +180,32 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
Topic topic = session.createTopic(getTopicName()); Topic topic = session.createTopic(getTopicName());
MessageConsumer consumer = session.createDurableConsumer(topic, "SharedConsumer", "a=b", false); MessageConsumer consumer = session.createDurableConsumer(topic, "SharedConsumer", "a=b", false);
MessageProducer producer = session.createProducer(session.createTopic(getTopicName()));
Message message = session.createMessage();
message.setStringProperty("a", "1");
message.setStringProperty("b", "1");
producer.send(message);
QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); QueueImpl queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(1, queue.getMaxConsumers()); assertEquals(1, queue.getMaxConsumers());
Wait.assertEquals(1, queue::getMessageCount);
consumer.close(); consumer.close();
MessageConsumer consumer2 = session.createDurableConsumer(topic, "SharedConsumer", "a=b and b=c", false); MessageConsumer consumer2 = session.createDurableConsumer(topic, "SharedConsumer", "a=b and b=c", false);
queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable(); queue = (QueueImpl) server.getPostOffice().getBinding(qName).getBindable();
assertEquals(1, queue.getMaxConsumers()); assertEquals(1, queue.getMaxConsumers());
Wait.assertEquals(0, queue::getMessageCount);
message = session.createMessage();
message.setStringProperty("a", "2");
message.setStringProperty("b", "2");
message.setStringProperty("c", "2");
producer.send(message);
Wait.assertEquals(1, queue::getMessageCount);
connection.start();
Assert.assertNotNull(consumer2.receive(5000));
} finally { } finally {
connection.close(); connection.close();
} }