ARTEMIS-2973 - JMS AMQP Shared global subscriber queue not deleted on unsubscribe

https://issues.apache.org/jira/browse/ARTEMIS-2973
This commit is contained in:
Andy Taylor 2020-11-02 12:02:11 +00:00 committed by Clebert Suconic
parent c0b12b14c8
commit dafef2b267
2 changed files with 95 additions and 0 deletions

View File

@ -843,6 +843,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
String clientId = getClientId();
String pubId = sender.getName();
global = hasRemoteDesiredCapability(sender, GLOBAL);
shared = hasRemoteDesiredCapability(sender, SHARED);
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
multicast = true;

View File

@ -27,6 +27,7 @@ import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@ -209,6 +210,99 @@ public class JMSTopicConsumerTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testDurableSharedSubscriptionUnsubscribe() throws Exception {
Connection connection = createConnection("myClientId");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer myDurSub = session.createSharedDurableConsumer(topic, "myDurSub");
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")) != null);
myDurSub.close();
session.unsubscribe("myDurSub");
session.close();
connection.close();
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")) == null);
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testDurableMultipleSharedSubscriptionUnsubscribe() throws Exception {
Connection connection = createConnection("myClientId");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer myDurSub = session.createSharedDurableConsumer(topic, "myDurSub");
MessageConsumer myDurSub2 = session2.createSharedDurableConsumer(topic, "myDurSub");
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")) != null);
myDurSub.close();
try {
session.unsubscribe("myDurSub");
Assert.fail("should throw exception on active durable subs");
} catch (JMSException e) {
//pass
}
myDurSub2.close();
session.unsubscribe("myDurSub");
session.close();
connection.close();
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")) == null);
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testDurableSharedGlobalSubscriptionUnsubscribe() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer myDurSub = session.createSharedDurableConsumer(topic, "myDurSub");
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myDurSub:global")) != null);
myDurSub.close();
session.unsubscribe("myDurSub");
session.close();
connection.close();
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myDurSub:global")) == null);
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testDurableMultipleSharedGlobalSubscriptionUnsubscribe() throws Exception {
Connection connection = createConnection();
Connection connection2 = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
MessageConsumer myDurSub = session.createSharedDurableConsumer(topic, "myDurSub");
MessageConsumer myDurSub2 = session2.createSharedDurableConsumer(topic, "myDurSub");
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myDurSub:global")) != null);
myDurSub.close();
session.unsubscribe("myDurSub");
session.close();
connection.close();
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myDurSub:global")) != null);
myDurSub2.close();
session2.unsubscribe("myDurSub");
session2.close();
connection2.close();
Assert.assertTrue(server.getPostOffice().getBinding(new SimpleString("myDurSub:global")) == null);
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testTemporarySubscriptionDeleted() throws Exception {
Connection connection = createConnection();