This closes #653 ARTEMIS-548 Stomp durable sub unsubscrbe

This commit is contained in:
Andy Taylor 2016-07-25 16:23:06 +01:00
commit ef3df355ea
10 changed files with 198 additions and 55 deletions

View File

@ -139,8 +139,11 @@ public interface Stomp {
String SELECTOR = "selector";
@Deprecated
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
String NO_LOCAL = "no-local";
public interface AckModeValues {
@ -159,7 +162,10 @@ public interface Stomp {
String ID = "id";
@Deprecated
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
}
public interface Connect {

View File

@ -655,9 +655,9 @@ public final class StompConnection implements RemotingConnection {
}
}
public void unsubscribe(String subscriptionID, String durableSubscriberName) throws ActiveMQStompException {
public void unsubscribe(String subscriptionID, String durableSubscriptionName) throws ActiveMQStompException {
try {
manager.unsubscribe(this, subscriptionID, durableSubscriberName);
manager.unsubscribe(this, subscriptionID, durableSubscriptionName);
}
catch (ActiveMQStompException e) {
throw e;

View File

@ -385,15 +385,14 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
stompSession.addSubscription(consumerID, subscriptionID, clientID, durableSubscriptionName, destination, selector, ack);
stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack);
}
public void unsubscribe(StompConnection connection,
String subscriptionID,
String durableSubscriberName) throws Exception {
StompSession stompSession = getSession(connection);
boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName);
boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName, connection.getClientID());
if (!unsubscribed) {
throw new ActiveMQStompException(connection, "Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
}

View File

@ -306,8 +306,10 @@ public class StompSession implements SessionCallback {
session.start();
}
public boolean unsubscribe(String id, String durableSubscriptionName) throws Exception {
public boolean unsubscribe(String id, String durableSubscriptionName, String clientID) throws Exception {
boolean result = false;
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, StompSubscription> entry = iterator.next();
long consumerID = entry.getKey();
@ -315,21 +317,25 @@ public class StompSession implements SessionCallback {
if (id != null && id.equals(sub.getID())) {
iterator.remove();
session.closeConsumer(consumerID);
SimpleString queueName;
if (durableSubscriptionName != null && durableSubscriptionName.trim().length() != 0) {
queueName = SimpleString.toSimpleString(id + "." + durableSubscriptionName);
}
else {
queueName = SimpleString.toSimpleString(id);
}
SimpleString queueName = SimpleString.toSimpleString(id);
QueueQueryResult query = session.executeQueueQuery(queueName);
if (query.isExists()) {
session.deleteQueue(queueName);
}
return true;
result = true;
}
}
return false;
if (!result && durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
QueueQueryResult query = session.executeQueueQuery(queueName);
if (query.isExists()) {
session.deleteQueue(queueName);
}
result = true;
}
return result;
}
boolean containsSubscription(String subscriptionID) {

View File

@ -251,6 +251,9 @@ public abstract class VersionedStompFrameHandler {
String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = request.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
boolean noLocal = false;
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {

View File

@ -92,7 +92,10 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
StompFrame response = null;
String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
}
String subscriptionID = null;
if (id != null) {
@ -108,7 +111,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
}
try {
connection.unsubscribe(subscriptionID, durableSubscriberName);
connection.unsubscribe(subscriptionID, durableSubscriptionName);
}
catch (ActiveMQStompException e) {
return e.getFrame();

View File

@ -153,19 +153,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
StompFrame response = null;
//unsubscribe in 1.1 only needs id header
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
}
String subscriptionID = null;
if (id != null) {
subscriptionID = id;
}
else {
else if (durableSubscriptionName == null) {
response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
return response;
}
try {
connection.unsubscribe(subscriptionID, durableSubscriberName);
connection.unsubscribe(subscriptionID, durableSubscriptionName);
}
catch (ActiveMQStompException e) {
response = e.getFrame();

View File

@ -1432,6 +1432,60 @@ public class StompTest extends StompTestBase {
sendFrame(frame);
}
@Test
public void testDurableUnSubscribe() throws Exception {
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(1000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
getTopicPrefix() +
getTopicName() +
"\n" +
"receipt: 12\n" +
"durable-subscriber-name: " +
getName() +
"\n" +
"\n\n" +
Stomp.NULL;
sendFrame(subscribeFrame);
// wait for SUBSCRIBE's receipt
frame = receiveFrame(1000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
waitForFrameToTakeEffect();
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
reconnect(100);
frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(1000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
getTopicPrefix() +
getTopicName() +
"\n" +
"durable-subscriber-name: " +
getName() +
"\n" +
"\n\n" +
Stomp.NULL;
sendFrame(unsubscribeFrame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
waitForFrameToTakeEffect();
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}
@Test
public void testSubscribeToTopicWithNoLocal() throws Exception {

View File

@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
@ -48,6 +49,7 @@ import org.junit.Test;
public class StompV11Test extends StompV11TestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
public static final String CLIENT_ID = "myclientid";
private StompClientConnection connV11;
@ -1333,7 +1335,7 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testTwoSubscribers() throws Exception {
connV11.connect(defUser, defPass, "myclientid");
connV11.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV11, "sub1", "auto", null);
@ -1535,7 +1537,7 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testDurableSubscriberWithReconnection() throws Exception {
connV11.connect(defUser, defPass, "myclientid");
connV11.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV11, "sub1", "auto", getName());
@ -1553,7 +1555,7 @@ public class StompV11Test extends StompV11TestBase {
connV11.destroy();
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11.connect(defUser, defPass, "myclientid");
connV11.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV11, "sub1", "auto", getName());
@ -1569,6 +1571,30 @@ public class StompV11Test extends StompV11TestBase {
connV11.disconnect();
}
@Test
public void testDurableUnSubscribe() throws Exception {
connV11.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV11, null, "auto", getName());
connV11.disconnect();
connV11.destroy();
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11.connect(defUser, defPass, CLIENT_ID);
this.unsubscribe(connV11, getName(), false, true);
long start = System.currentTimeMillis();
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) {
Thread.sleep(100);
}
assertNull(server.getActiveMQServer().locateQueue(queueName));
connV11.disconnect();
}
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -2364,8 +2390,10 @@ public class StompV11Test extends StompV11TestBase {
boolean receipt,
boolean noLocal) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
if (subId != null) {
subFrame.addHeader("id", subId);
}
if (ack != null) {
subFrame.addHeader("ack", ack);
}
@ -2386,18 +2414,14 @@ public class StompV11Test extends StompV11TestBase {
}
}
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
subFrame.addHeader("id", subId);
conn.sendFrame(subFrame);
private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean durable) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
if (durable) {
subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId);
}
else {
subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId);
}
private void unsubscribe(StompClientConnection conn,
String subId,
boolean receipt) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
subFrame.addHeader("id", subId);
if (receipt) {
subFrame.addHeader("receipt", "4321");
@ -2412,6 +2436,16 @@ public class StompV11Test extends StompV11TestBase {
}
}
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
unsubscribe(conn, subId, false, false);
}
private void unsubscribe(StompClientConnection conn,
String subId,
boolean receipt) throws IOException, InterruptedException {
unsubscribe(conn, subId, receipt, false);
}
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
connV11.connect(defUser, defPass);

View File

@ -31,6 +31,9 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
@ -38,7 +41,6 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
import org.apache.activemq.artemis.tests.integration.stomp.v11.StompV11TestBase;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -50,6 +52,7 @@ import org.junit.Test;
public class StompV12Test extends StompV11TestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
public static final String CLIENT_ID = "myclientid";
private StompClientConnectionV12 connV12;
@ -1325,7 +1328,7 @@ public class StompV12Test extends StompV11TestBase {
@Test
public void testTwoSubscribers() throws Exception {
connV12.connect(defUser, defPass, "myclientid");
connV12.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV12, "sub1", "auto", null);
@ -1529,7 +1532,7 @@ public class StompV12Test extends StompV11TestBase {
@Test
public void testDurableSubscriberWithReconnection() throws Exception {
connV12.connect(defUser, defPass, "myclientid");
connV12.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV12, "sub1", "auto", getName());
@ -1547,7 +1550,7 @@ public class StompV12Test extends StompV11TestBase {
connV12.destroy();
connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
connV12.connect(defUser, defPass, "myclientid");
connV12.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV12, "sub1", "auto", getName());
@ -1563,6 +1566,30 @@ public class StompV12Test extends StompV11TestBase {
connV12.disconnect();
}
@Test
public void testDurableUnSubscribe() throws Exception {
connV12.connect(defUser, defPass, CLIENT_ID);
this.subscribeTopic(connV12, null, "auto", getName());
connV12.disconnect();
connV12.destroy();
connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
connV12.connect(defUser, defPass, CLIENT_ID);
this.unsubscribe(connV12, getName(), false, true);
long start = System.currentTimeMillis();
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) {
Thread.sleep(100);
}
assertNull(server.getActiveMQServer().locateQueue(queueName));
connV12.disconnect();
}
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@ -2403,8 +2430,10 @@ public class StompV12Test extends StompV11TestBase {
boolean receipt,
boolean noLocal) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
if (subId != null) {
subFrame.addHeader("id", subId);
}
if (ack != null) {
subFrame.addHeader("ack", ack);
}
@ -2425,18 +2454,14 @@ public class StompV12Test extends StompV11TestBase {
}
}
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
subFrame.addHeader("id", subId);
conn.sendFrame(subFrame);
private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean durable) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
if (durable) {
subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId);
}
else {
subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId);
}
private void unsubscribe(StompClientConnection conn,
String subId,
boolean receipt) throws IOException, InterruptedException {
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
subFrame.addHeader("id", subId);
if (receipt) {
subFrame.addHeader("receipt", "4321");
@ -2446,11 +2471,21 @@ public class StompV12Test extends StompV11TestBase {
if (receipt) {
System.out.println("response: " + f);
Assert.assertEquals("RECEIPT", f.getCommand());
Assert.assertEquals("4321", f.getHeader("receipt-id"));
assertEquals("RECEIPT", f.getCommand());
assertEquals("4321", f.getHeader("receipt-id"));
}
}
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
unsubscribe(conn, subId, false, false);
}
private void unsubscribe(StompClientConnection conn,
String subId,
boolean receipt) throws IOException, InterruptedException {
unsubscribe(conn, subId, receipt, false);
}
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
connV12.connect(defUser, defPass);