ARTEMIS-1034 - non-durable subscription queue not ended on link close

https://issues.apache.org/jira/browse/ARTEMIS-1034
This commit is contained in:
Andy Taylor 2017-03-13 13:52:28 +00:00 committed by Justin Bertram
parent e13e014c6d
commit b5b6e4bea6
2 changed files with 31 additions and 10 deletions

View File

@ -94,6 +94,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean shared = false;
private boolean global = false;
private boolean isVolatile = false;
private String tempQueueName;
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
super();
@ -223,6 +224,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// if dynamic we have to create the node (queue) and set the address on the target, the
// node is temporary and will be deleted on closing of the session
queue = java.util.UUID.randomUUID().toString();
tempQueueName = queue;
try {
sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
// protonSession.getServerSession().createQueue(queue, queue, null, true, false);
@ -342,6 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
queue = java.util.UUID.randomUUID().toString();
tempQueueName = queue;
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
} catch (Exception e) {
@ -445,16 +448,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName);
} else {
String clientId = getClientId();
String pubId = sender.getName();
if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0];
}
String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
if (source.getDurable() == TerminusDurability.NONE && tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
sessionSPI.removeTemporaryQueue(tempQueueName);
} else {
String clientId = getClientId();
String pubId = sender.getName();
if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0];
}
String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
}
}
}
} else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {

View File

@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
@ -106,6 +107,19 @@ public class ProtonPubSubTest extends ProtonTestBase {
}
}
@Test
public void testNonDurablePubSubQueueDeleted() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sub = session.createSubscriber(topic);
Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress));
assertEquals(2, bindingsForAddress.getBindings().size());
sub.close();
Thread.sleep(1000);
assertEquals(1, bindingsForAddress.getBindings().size());
}
@Test
public void testNonDurableMultiplePubSub() throws Exception {
int numMessages = 100;