ARTEMIS-1930 require STOMP durable sub name to unsubscribe

This commit is contained in:
Justin Bertram 2018-06-18 15:19:48 -05:00 committed by Clebert Suconic
parent 4f0bb98667
commit 1ed7a616ee
3 changed files with 49 additions and 15 deletions

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
@ -255,14 +256,12 @@ public class StompSession implements SessionCallback {
SimpleString address = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination);
SimpleString selectorSimple = SimpleString.toSimpleString(selector);
boolean pubSub = false;
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean topic = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (topic) {
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
if (multicast) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
@ -276,8 +275,8 @@ public class StompSession implements SessionCallback {
session.createQueue(address, queueName, selectorSimple, true, false);
}
}
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, topic ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast);
subscriptions.put(consumerID, subscription);
session.start();
return () -> consumer.receiveCredits(receiveCredits);
@ -295,14 +294,15 @@ public class StompSession implements SessionCallback {
iterator.remove();
SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) {
Queue queue = manager.getServer().locateQueue(queueName);
if (sub.isMulticast() && queue != null && (durableSubscriptionName == null && !queue.isDurable())) {
session.deleteQueue(queueName);
}
result = true;
}
}
if (!result && durableSubscriptionName != null && clientID != null) {
if (durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);

View File

@ -29,18 +29,18 @@ public class StompSubscription {
private final SimpleString queueName;
// whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic)
private final boolean pubSub;
// whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
private final boolean multicast;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.pubSub = pubSub;
this.multicast = multicast;
}
// Public --------------------------------------------------------
@ -57,13 +57,13 @@ public class StompSubscription {
return queueName;
}
public boolean isPubSub() {
return pubSub;
public boolean isMulticast() {
return multicast;
}
@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]";
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + "]";
}
}

View File

@ -1343,6 +1343,40 @@ public class StompTest extends StompTestBase {
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}
@Test
public void testDurableUnSubscribeWithoutDurableSubName() throws Exception {
server.getActiveMQServer().getConfiguration().getWildcardConfiguration().setDelimiter('/');
server.getActiveMQServer().getAddressSettingsRepository().addMatch("/topic/#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.MULTICAST).setDefaultQueueRoutingType(RoutingType.MULTICAST));
conn.connect(defUser, defPass, "myclientid");
String subId = UUID.randomUUID().toString();
String durableSubName = UUID.randomUUID().toString();
String receipt = UUID.randomUUID().toString();
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/test.foo")
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableSubName)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
receipt = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE)
.addHeader(Stomp.Headers.Unsubscribe.ID, subId)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, receipt);
frame = conn.sendFrame(frame);
assertEquals(receipt, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
conn.disconnect();
// make sure the durable subscription queue is still there
assertTrue(Wait.waitFor(() -> server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + durableSubName)) != null, 2000, 100));
}
@Test
public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception {
conn.connect(defUser, defPass, "myclientid");