This commit is contained in:
Justin Bertram 2017-03-13 10:33:13 -05:00
commit c54dfd3055
2 changed files with 31 additions and 10 deletions

View File

@ -94,6 +94,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean shared = false; private boolean shared = false;
private boolean global = false; private boolean global = false;
private boolean isVolatile = false; private boolean isVolatile = false;
private String tempQueueName;
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
super(); super();
@ -223,6 +224,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// if dynamic we have to create the node (queue) and set the address on the target, the // if dynamic we have to create the node (queue) and set the address on the target, the
// node is temporary and will be deleted on closing of the session // node is temporary and will be deleted on closing of the session
queue = java.util.UUID.randomUUID().toString(); queue = java.util.UUID.randomUUID().toString();
tempQueueName = queue;
try { try {
sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
// protonSession.getServerSession().createQueue(queue, queue, null, true, false); // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
@ -342,6 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} else { } else {
queue = java.util.UUID.randomUUID().toString(); queue = java.util.UUID.randomUUID().toString();
tempQueueName = queue;
try { try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
} catch (Exception e) { } catch (Exception e) {
@ -445,16 +448,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.isExists() && source.getDynamic()) { if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName); sessionSPI.deleteQueue(queueName);
} else { } else {
String clientId = getClientId(); if (source.getDurable() == TerminusDurability.NONE && tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
String pubId = sender.getName(); sessionSPI.removeTemporaryQueue(tempQueueName);
if (pubId.contains("|")) { } else {
pubId = pubId.split("\\|")[0]; String clientId = getClientId();
} String pubId = sender.getName();
String queue = createQueueName(clientId, pubId, shared, global, isVolatile); if (pubId.contains("|")) {
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); pubId = pubId.split("\\|")[0];
//only delete if it isn't volatile and has no consumers }
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
sessionSPI.deleteQueue(queue); result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
}
} }
} }
} else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {

View File

@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After; import org.junit.After;
@ -106,6 +107,19 @@ public class ProtonPubSubTest extends ProtonTestBase {
} }
} }
@Test
public void testNonDurablePubSubQueueDeleted() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sub = session.createSubscriber(topic);
Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress));
assertEquals(2, bindingsForAddress.getBindings().size());
sub.close();
Thread.sleep(1000);
assertEquals(1, bindingsForAddress.getBindings().size());
}
@Test @Test
public void testNonDurableMultiplePubSub() throws Exception { public void testNonDurableMultiplePubSub() throws Exception {
int numMessages = 100; int numMessages = 100;