ARTEMIS-741 fix subscription queue leak on STOMP

This commit is contained in:
jbertram 2016-09-21 19:19:59 -05:00 committed by Clebert Suconic
parent c86e41d4fa
commit 2dcf8de0de
3 changed files with 115 additions and 26 deletions

View File

@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
@ -276,7 +275,8 @@ public class StompSession implements SessionCallback {
String destination,
String selector,
String ack) throws Exception {
SimpleString queue = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination);
boolean pubSub = false;
int receiveCredits = consumerCredits;
if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
receiveCredits = -1;
@ -284,27 +284,27 @@ public class StompSession implements SessionCallback {
if (destination.startsWith("jms.topic")) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
}
queue = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
QueueQueryResult query = session.executeQueueQuery(queue);
if (!query.isExists()) {
session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector), false, true);
queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) == null) {
session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), false, true);
}
}
else {
queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector), true, false);
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), true, false);
}
((ServerSessionImpl) session).createConsumer(consumerID, queue, null, false, false, receiveCredits);
session.createConsumer(consumerID, queueName, null, false, false, receiveCredits);
}
else {
((ServerSessionImpl) session).createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false, false, receiveCredits);
session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, receiveCredits);
}
StompSubscription subscription = new StompSubscription(subscriptionID, ack);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
subscriptions.put(consumerID, subscription);
session.start();
@ -320,10 +320,9 @@ public class StompSession implements SessionCallback {
StompSubscription sub = entry.getValue();
if (id != null && id.equals(sub.getID())) {
iterator.remove();
SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
SimpleString queueName = SimpleString.toSimpleString(id);
QueueQueryResult query = session.executeQueueQuery(queueName);
if (query.isExists()) {
if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);
}
result = true;
@ -332,8 +331,7 @@ public class StompSession implements SessionCallback {
if (!result && durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
QueueQueryResult query = session.executeQueueQuery(queueName);
if (query.isExists()) {
if (manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);
}
result = true;

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
import org.apache.activemq.artemis.api.core.SimpleString;
public class StompSubscription {
// Constants -----------------------------------------------------
@ -25,13 +27,20 @@ public class StompSubscription {
private final String ack;
private final SimpleString queueName;
// whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic)
private final boolean pubSub;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public StompSubscription(String subID, String ack) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.pubSub = pubSub;
}
// Public --------------------------------------------------------
@ -44,17 +53,17 @@ public class StompSubscription {
return subID;
}
@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + "]";
public SimpleString getQueueName() {
return queueName;
}
// Package protected ---------------------------------------------
public boolean isPubSub() {
return pubSub;
}
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]";
}
}

View File

@ -1282,6 +1282,7 @@ public class StompTest extends StompTestBase {
@Test
public void testSubscribeToTopic() throws Exception {
final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
@ -1301,6 +1302,19 @@ public class StompTest extends StompTestBase {
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
return true;
}
else {
return false;
}
}
}, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
sendMessage(getName(), topic);
frame = receiveFrame(10000);
@ -1326,6 +1340,74 @@ public class StompTest extends StompTestBase {
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
@Test
public void testSubscribeToQueue() throws Exception {
final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(100000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:" +
getQueuePrefix() +
getQueueName() +
"\n" +
"receipt: 12\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
// wait for SUBSCRIBE's receipt
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
return true;
}
else {
return false;
}
}
}, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
sendMessage(getName(), queue);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
frame = "UNSUBSCRIBE\n" + "destination:" +
getQueuePrefix() +
getQueueName() +
"\n" +
"receipt: 1234\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
// wait for UNSUBSCRIBE's receipt
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
sendMessage(getName(), queue);
frame = receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}