From b5b6e4bea662a3eb7ab28f9b9ebb4cdb04801805 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Mon, 13 Mar 2017 13:52:28 +0000 Subject: [PATCH] ARTEMIS-1034 - non-durable subscription queue not ended on link close https://issues.apache.org/jira/browse/ARTEMIS-1034 --- .../proton/ProtonServerSenderContext.java | 27 ++++++++++++------- .../integration/amqp/ProtonPubSubTest.java | 14 ++++++++++ 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 55ad5508b7..962110e255 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -94,6 +94,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean shared = false; private boolean global = false; private boolean isVolatile = false; + private String tempQueueName; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { super(); @@ -223,6 +224,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // if dynamic we have to create the node (queue) and set the address on the target, the // node is temporary and will be deleted on closing of the session queue = java.util.UUID.randomUUID().toString(); + tempQueueName = queue; try { sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); // protonSession.getServerSession().createQueue(queue, queue, null, true, false); @@ -342,6 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { queue = java.util.UUID.randomUUID().toString(); + tempQueueName = queue; try { sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); } catch (Exception e) { @@ -445,16 +448,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); } else { - String clientId = getClientId(); - String pubId = sender.getName(); - if (pubId.contains("|")) { - pubId = pubId.split("\\|")[0]; - } - String queue = createQueueName(clientId, pubId, shared, global, isVolatile); - result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); - //only delete if it isn't volatile and has no consumers - if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { - sessionSPI.deleteQueue(queue); + if (source.getDurable() == TerminusDurability.NONE && tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { + sessionSPI.removeTemporaryQueue(tempQueueName); + } else { + String clientId = getClientId(); + String pubId = sender.getName(); + if (pubId.contains("|")) { + pubId = pubId.split("\\|")[0]; + } + String queue = createQueueName(clientId, pubId, shared, global, isVolatile); + result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); + //only delete if it isn't volatile and has no consumers + if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { + sessionSPI.deleteQueue(queue); + } } } } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java index 2ae9b8dc06..42f30acc3b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java @@ -31,6 +31,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; @@ -106,6 +107,19 @@ public class ProtonPubSubTest extends ProtonTestBase { } } + @Test + public void testNonDurablePubSubQueueDeleted() throws Exception { + int numMessages = 100; + Topic topic = createTopic(pubAddress); + TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer sub = session.createSubscriber(topic); + Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress)); + assertEquals(2, bindingsForAddress.getBindings().size()); + sub.close(); + Thread.sleep(1000); + assertEquals(1, bindingsForAddress.getBindings().size()); + } + @Test public void testNonDurableMultiplePubSub() throws Exception { int numMessages = 100;