ARTEMIS-548 Stomp durable sub unsubscrbe
Implement ability for Stomp clients to unsubscribe durable subscriptions.
This commit is contained in:
parent
6f86e518bd
commit
c4a7ddf9d6
|
@ -139,8 +139,11 @@ public interface Stomp {
|
||||||
|
|
||||||
String SELECTOR = "selector";
|
String SELECTOR = "selector";
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
|
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
|
||||||
|
|
||||||
|
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
|
||||||
|
|
||||||
String NO_LOCAL = "no-local";
|
String NO_LOCAL = "no-local";
|
||||||
|
|
||||||
public interface AckModeValues {
|
public interface AckModeValues {
|
||||||
|
@ -159,7 +162,10 @@ public interface Stomp {
|
||||||
|
|
||||||
String ID = "id";
|
String ID = "id";
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
|
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
|
||||||
|
|
||||||
|
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Connect {
|
public interface Connect {
|
||||||
|
|
|
@ -655,9 +655,9 @@ public final class StompConnection implements RemotingConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsubscribe(String subscriptionID, String durableSubscriberName) throws ActiveMQStompException {
|
public void unsubscribe(String subscriptionID, String durableSubscriptionName) throws ActiveMQStompException {
|
||||||
try {
|
try {
|
||||||
manager.unsubscribe(this, subscriptionID, durableSubscriberName);
|
manager.unsubscribe(this, subscriptionID, durableSubscriptionName);
|
||||||
}
|
}
|
||||||
catch (ActiveMQStompException e) {
|
catch (ActiveMQStompException e) {
|
||||||
throw e;
|
throw e;
|
||||||
|
|
|
@ -385,15 +385,14 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
|
||||||
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
|
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
|
||||||
}
|
}
|
||||||
long consumerID = server.getStorageManager().generateID();
|
long consumerID = server.getStorageManager().generateID();
|
||||||
String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
|
stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack);
|
||||||
stompSession.addSubscription(consumerID, subscriptionID, clientID, durableSubscriptionName, destination, selector, ack);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsubscribe(StompConnection connection,
|
public void unsubscribe(StompConnection connection,
|
||||||
String subscriptionID,
|
String subscriptionID,
|
||||||
String durableSubscriberName) throws Exception {
|
String durableSubscriberName) throws Exception {
|
||||||
StompSession stompSession = getSession(connection);
|
StompSession stompSession = getSession(connection);
|
||||||
boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName);
|
boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName, connection.getClientID());
|
||||||
if (!unsubscribed) {
|
if (!unsubscribed) {
|
||||||
throw new ActiveMQStompException(connection, "Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
|
throw new ActiveMQStompException(connection, "Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,8 +306,10 @@ public class StompSession implements SessionCallback {
|
||||||
session.start();
|
session.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean unsubscribe(String id, String durableSubscriptionName) throws Exception {
|
public boolean unsubscribe(String id, String durableSubscriptionName, String clientID) throws Exception {
|
||||||
|
boolean result = false;
|
||||||
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
|
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
|
||||||
|
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
Map.Entry<Long, StompSubscription> entry = iterator.next();
|
Map.Entry<Long, StompSubscription> entry = iterator.next();
|
||||||
long consumerID = entry.getKey();
|
long consumerID = entry.getKey();
|
||||||
|
@ -315,21 +317,25 @@ public class StompSession implements SessionCallback {
|
||||||
if (id != null && id.equals(sub.getID())) {
|
if (id != null && id.equals(sub.getID())) {
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
session.closeConsumer(consumerID);
|
session.closeConsumer(consumerID);
|
||||||
SimpleString queueName;
|
SimpleString queueName = SimpleString.toSimpleString(id);
|
||||||
if (durableSubscriptionName != null && durableSubscriptionName.trim().length() != 0) {
|
|
||||||
queueName = SimpleString.toSimpleString(id + "." + durableSubscriptionName);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
queueName = SimpleString.toSimpleString(id);
|
|
||||||
}
|
|
||||||
QueueQueryResult query = session.executeQueueQuery(queueName);
|
QueueQueryResult query = session.executeQueueQuery(queueName);
|
||||||
if (query.isExists()) {
|
if (query.isExists()) {
|
||||||
session.deleteQueue(queueName);
|
session.deleteQueue(queueName);
|
||||||
}
|
}
|
||||||
return true;
|
result = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
|
if (!result && durableSubscriptionName != null && clientID != null) {
|
||||||
|
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
|
||||||
|
QueueQueryResult query = session.executeQueueQuery(queueName);
|
||||||
|
if (query.isExists()) {
|
||||||
|
session.deleteQueue(queueName);
|
||||||
|
}
|
||||||
|
result = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean containsSubscription(String subscriptionID) {
|
boolean containsSubscription(String subscriptionID) {
|
||||||
|
|
|
@ -251,6 +251,9 @@ public abstract class VersionedStompFrameHandler {
|
||||||
String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
|
String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
|
||||||
String id = request.getHeader(Stomp.Headers.Subscribe.ID);
|
String id = request.getHeader(Stomp.Headers.Subscribe.ID);
|
||||||
String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
|
String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
|
||||||
|
if (durableSubscriptionName == null) {
|
||||||
|
durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
|
||||||
|
}
|
||||||
boolean noLocal = false;
|
boolean noLocal = false;
|
||||||
|
|
||||||
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
|
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
|
||||||
|
|
|
@ -92,7 +92,10 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
|
||||||
StompFrame response = null;
|
StompFrame response = null;
|
||||||
String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
|
String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
|
||||||
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
|
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
|
||||||
String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
|
String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
|
||||||
|
if (durableSubscriptionName == null) {
|
||||||
|
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
String subscriptionID = null;
|
String subscriptionID = null;
|
||||||
if (id != null) {
|
if (id != null) {
|
||||||
|
@ -108,7 +111,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connection.unsubscribe(subscriptionID, durableSubscriberName);
|
connection.unsubscribe(subscriptionID, durableSubscriptionName);
|
||||||
}
|
}
|
||||||
catch (ActiveMQStompException e) {
|
catch (ActiveMQStompException e) {
|
||||||
return e.getFrame();
|
return e.getFrame();
|
||||||
|
|
|
@ -153,19 +153,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
|
||||||
StompFrame response = null;
|
StompFrame response = null;
|
||||||
//unsubscribe in 1.1 only needs id header
|
//unsubscribe in 1.1 only needs id header
|
||||||
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
|
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
|
||||||
String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
|
String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
|
||||||
|
if (durableSubscriptionName == null) {
|
||||||
|
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
String subscriptionID = null;
|
String subscriptionID = null;
|
||||||
if (id != null) {
|
if (id != null) {
|
||||||
subscriptionID = id;
|
subscriptionID = id;
|
||||||
}
|
}
|
||||||
else {
|
else if (durableSubscriptionName == null) {
|
||||||
response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
|
response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
connection.unsubscribe(subscriptionID, durableSubscriberName);
|
connection.unsubscribe(subscriptionID, durableSubscriptionName);
|
||||||
}
|
}
|
||||||
catch (ActiveMQStompException e) {
|
catch (ActiveMQStompException e) {
|
||||||
response = e.getFrame();
|
response = e.getFrame();
|
||||||
|
|
|
@ -1432,6 +1432,60 @@ public class StompTest extends StompTestBase {
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDurableUnSubscribe() throws Exception {
|
||||||
|
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
frame = receiveFrame(1000);
|
||||||
|
Assert.assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
|
||||||
|
getTopicPrefix() +
|
||||||
|
getTopicName() +
|
||||||
|
"\n" +
|
||||||
|
"receipt: 12\n" +
|
||||||
|
"durable-subscriber-name: " +
|
||||||
|
getName() +
|
||||||
|
"\n" +
|
||||||
|
"\n\n" +
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(subscribeFrame);
|
||||||
|
// wait for SUBSCRIBE's receipt
|
||||||
|
frame = receiveFrame(1000);
|
||||||
|
Assert.assertTrue(frame.startsWith("RECEIPT"));
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
waitForFrameToTakeEffect();
|
||||||
|
|
||||||
|
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
|
||||||
|
|
||||||
|
reconnect(100);
|
||||||
|
frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
frame = receiveFrame(1000);
|
||||||
|
Assert.assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
|
||||||
|
getTopicPrefix() +
|
||||||
|
getTopicName() +
|
||||||
|
"\n" +
|
||||||
|
"durable-subscriber-name: " +
|
||||||
|
getName() +
|
||||||
|
"\n" +
|
||||||
|
"\n\n" +
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(unsubscribeFrame);
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
waitForFrameToTakeEffect();
|
||||||
|
|
||||||
|
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSubscribeToTopicWithNoLocal() throws Exception {
|
public void testSubscribeToTopicWithNoLocal() throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
||||||
|
@ -48,6 +49,7 @@ import org.junit.Test;
|
||||||
public class StompV11Test extends StompV11TestBase {
|
public class StompV11Test extends StompV11TestBase {
|
||||||
|
|
||||||
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||||
|
public static final String CLIENT_ID = "myclientid";
|
||||||
|
|
||||||
private StompClientConnection connV11;
|
private StompClientConnection connV11;
|
||||||
|
|
||||||
|
@ -1333,7 +1335,7 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTwoSubscribers() throws Exception {
|
public void testTwoSubscribers() throws Exception {
|
||||||
connV11.connect(defUser, defPass, "myclientid");
|
connV11.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(connV11, "sub1", "auto", null);
|
this.subscribeTopic(connV11, "sub1", "auto", null);
|
||||||
|
|
||||||
|
@ -1535,7 +1537,7 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDurableSubscriberWithReconnection() throws Exception {
|
public void testDurableSubscriberWithReconnection() throws Exception {
|
||||||
connV11.connect(defUser, defPass, "myclientid");
|
connV11.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(connV11, "sub1", "auto", getName());
|
this.subscribeTopic(connV11, "sub1", "auto", getName());
|
||||||
|
|
||||||
|
@ -1553,7 +1555,7 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
|
|
||||||
connV11.destroy();
|
connV11.destroy();
|
||||||
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
|
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
|
||||||
connV11.connect(defUser, defPass, "myclientid");
|
connV11.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(connV11, "sub1", "auto", getName());
|
this.subscribeTopic(connV11, "sub1", "auto", getName());
|
||||||
|
|
||||||
|
@ -1569,6 +1571,30 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
connV11.disconnect();
|
connV11.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDurableUnSubscribe() throws Exception {
|
||||||
|
connV11.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
|
this.subscribeTopic(connV11, null, "auto", getName());
|
||||||
|
|
||||||
|
connV11.disconnect();
|
||||||
|
connV11.destroy();
|
||||||
|
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
|
||||||
|
connV11.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
|
this.unsubscribe(connV11, getName(), false, true);
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
||||||
|
while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(server.getActiveMQServer().locateQueue(queueName));
|
||||||
|
|
||||||
|
connV11.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJMSXGroupIdCanBeSet() throws Exception {
|
public void testJMSXGroupIdCanBeSet() throws Exception {
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
@ -2364,8 +2390,10 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
boolean receipt,
|
boolean receipt,
|
||||||
boolean noLocal) throws IOException, InterruptedException {
|
boolean noLocal) throws IOException, InterruptedException {
|
||||||
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
|
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
|
||||||
subFrame.addHeader("id", subId);
|
|
||||||
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
|
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
|
||||||
|
if (subId != null) {
|
||||||
|
subFrame.addHeader("id", subId);
|
||||||
|
}
|
||||||
if (ack != null) {
|
if (ack != null) {
|
||||||
subFrame.addHeader("ack", ack);
|
subFrame.addHeader("ack", ack);
|
||||||
}
|
}
|
||||||
|
@ -2386,18 +2414,14 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
|
private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean durable) throws IOException, InterruptedException {
|
||||||
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
|
ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
|
||||||
subFrame.addHeader("id", subId);
|
if (durable) {
|
||||||
|
subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId);
|
||||||
conn.sendFrame(subFrame);
|
}
|
||||||
}
|
else {
|
||||||
|
subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId);
|
||||||
private void unsubscribe(StompClientConnection conn,
|
}
|
||||||
String subId,
|
|
||||||
boolean receipt) throws IOException, InterruptedException {
|
|
||||||
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
|
|
||||||
subFrame.addHeader("id", subId);
|
|
||||||
|
|
||||||
if (receipt) {
|
if (receipt) {
|
||||||
subFrame.addHeader("receipt", "4321");
|
subFrame.addHeader("receipt", "4321");
|
||||||
|
@ -2412,6 +2436,16 @@ public class StompV11Test extends StompV11TestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
|
||||||
|
unsubscribe(conn, subId, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unsubscribe(StompClientConnection conn,
|
||||||
|
String subId,
|
||||||
|
boolean receipt) throws IOException, InterruptedException {
|
||||||
|
unsubscribe(conn, subId, receipt, false);
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
||||||
connV11.connect(defUser, defPass);
|
connV11.connect(defUser, defPass);
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,9 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
|
||||||
|
@ -38,7 +41,6 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
|
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
|
||||||
import org.apache.activemq.artemis.tests.integration.stomp.v11.StompV11TestBase;
|
import org.apache.activemq.artemis.tests.integration.stomp.v11.StompV11TestBase;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -50,6 +52,7 @@ import org.junit.Test;
|
||||||
public class StompV12Test extends StompV11TestBase {
|
public class StompV12Test extends StompV11TestBase {
|
||||||
|
|
||||||
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||||
|
public static final String CLIENT_ID = "myclientid";
|
||||||
|
|
||||||
private StompClientConnectionV12 connV12;
|
private StompClientConnectionV12 connV12;
|
||||||
|
|
||||||
|
@ -1325,7 +1328,7 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTwoSubscribers() throws Exception {
|
public void testTwoSubscribers() throws Exception {
|
||||||
connV12.connect(defUser, defPass, "myclientid");
|
connV12.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(connV12, "sub1", "auto", null);
|
this.subscribeTopic(connV12, "sub1", "auto", null);
|
||||||
|
|
||||||
|
@ -1529,7 +1532,7 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDurableSubscriberWithReconnection() throws Exception {
|
public void testDurableSubscriberWithReconnection() throws Exception {
|
||||||
connV12.connect(defUser, defPass, "myclientid");
|
connV12.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(connV12, "sub1", "auto", getName());
|
this.subscribeTopic(connV12, "sub1", "auto", getName());
|
||||||
|
|
||||||
|
@ -1547,7 +1550,7 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
|
|
||||||
connV12.destroy();
|
connV12.destroy();
|
||||||
connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
|
connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
|
||||||
connV12.connect(defUser, defPass, "myclientid");
|
connV12.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
this.subscribeTopic(connV12, "sub1", "auto", getName());
|
this.subscribeTopic(connV12, "sub1", "auto", getName());
|
||||||
|
|
||||||
|
@ -1563,6 +1566,30 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
connV12.disconnect();
|
connV12.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDurableUnSubscribe() throws Exception {
|
||||||
|
connV12.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
|
this.subscribeTopic(connV12, null, "auto", getName());
|
||||||
|
|
||||||
|
connV12.disconnect();
|
||||||
|
connV12.destroy();
|
||||||
|
connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
|
||||||
|
connV12.connect(defUser, defPass, CLIENT_ID);
|
||||||
|
|
||||||
|
this.unsubscribe(connV12, getName(), false, true);
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
|
||||||
|
while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis() - start) < 5000) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(server.getActiveMQServer().locateQueue(queueName));
|
||||||
|
|
||||||
|
connV12.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJMSXGroupIdCanBeSet() throws Exception {
|
public void testJMSXGroupIdCanBeSet() throws Exception {
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
@ -2403,8 +2430,10 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
boolean receipt,
|
boolean receipt,
|
||||||
boolean noLocal) throws IOException, InterruptedException {
|
boolean noLocal) throws IOException, InterruptedException {
|
||||||
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
|
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
|
||||||
subFrame.addHeader("id", subId);
|
|
||||||
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
|
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
|
||||||
|
if (subId != null) {
|
||||||
|
subFrame.addHeader("id", subId);
|
||||||
|
}
|
||||||
if (ack != null) {
|
if (ack != null) {
|
||||||
subFrame.addHeader("ack", ack);
|
subFrame.addHeader("ack", ack);
|
||||||
}
|
}
|
||||||
|
@ -2425,18 +2454,14 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
|
private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean durable) throws IOException, InterruptedException {
|
||||||
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
|
ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
|
||||||
subFrame.addHeader("id", subId);
|
if (durable) {
|
||||||
|
subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId);
|
||||||
conn.sendFrame(subFrame);
|
}
|
||||||
}
|
else {
|
||||||
|
subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId);
|
||||||
private void unsubscribe(StompClientConnection conn,
|
}
|
||||||
String subId,
|
|
||||||
boolean receipt) throws IOException, InterruptedException {
|
|
||||||
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
|
|
||||||
subFrame.addHeader("id", subId);
|
|
||||||
|
|
||||||
if (receipt) {
|
if (receipt) {
|
||||||
subFrame.addHeader("receipt", "4321");
|
subFrame.addHeader("receipt", "4321");
|
||||||
|
@ -2446,11 +2471,21 @@ public class StompV12Test extends StompV11TestBase {
|
||||||
|
|
||||||
if (receipt) {
|
if (receipt) {
|
||||||
System.out.println("response: " + f);
|
System.out.println("response: " + f);
|
||||||
Assert.assertEquals("RECEIPT", f.getCommand());
|
assertEquals("RECEIPT", f.getCommand());
|
||||||
Assert.assertEquals("4321", f.getHeader("receipt-id"));
|
assertEquals("4321", f.getHeader("receipt-id"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException {
|
||||||
|
unsubscribe(conn, subId, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unsubscribe(StompClientConnection conn,
|
||||||
|
String subId,
|
||||||
|
boolean receipt) throws IOException, InterruptedException {
|
||||||
|
unsubscribe(conn, subId, receipt, false);
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
|
||||||
connV12.connect(defUser, defPass);
|
connV12.connect(defUser, defPass);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue