diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java index f1000ccf01..fa02ee3f27 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java @@ -152,10 +152,20 @@ public interface Stomp { String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name"; + /** + * Backwards compatibility for STOMP clients that were using 5.x + */ + String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = "activemq.subscriptionName"; + String SUBSCRIPTION_TYPE = "subscription-type"; String NO_LOCAL = "no-local"; + /** + * Backwards compatibility for STOMP clients that were using 5.x + */ + String ACTIVEMQ_NO_LOCAL = "activemq.noLocal"; + public interface AckModeValues { String AUTO = "auto"; @@ -176,6 +186,11 @@ public interface Stomp { String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name"; String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name"; + + /** + * Backwards compatibility for STOMP clients that were using 5.x + */ + String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = "activemq.subscriptionName"; } interface Connect { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 2259a1712c..3cb5ab87d3 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.protocol.stomp; +import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; + import java.nio.charset.StandardCharsets; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -30,10 +31,9 @@ import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.ExecutorFactory; -import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; - public abstract class VersionedStompFrameHandler { protected StompConnection connection; @@ -266,10 +266,15 @@ public abstract class VersionedStompFrameHandler { if (durableSubscriptionName == null) { durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); } + if (durableSubscriptionName == null) { + durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME); + } RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION)); boolean noLocal = false; if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); + } else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) { + noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); } return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java index e632c2be56..a6785b71e2 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.stomp.v10; +import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; + import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -30,8 +32,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.utils.ExecutorFactory; -import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; - public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener { public StompFrameHandlerV10(StompConnection connection, @@ -52,27 +52,36 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID); String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID); - connection.setClientID(clientID); - if (connection.validateUser(login, passcode, connection)) { - connection.setValid(true); + try { + connection.setClientID(clientID); + if (connection.validateUser(login, passcode, connection)) { + connection.setValid(true); - response = new StompFrameV10(Stomp.Responses.CONNECTED); + // Create session after validating user - this will cache the session in the + // protocol manager + connection.getSession(); - if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) { - response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString()); + response = new StompFrameV10(Stomp.Responses.CONNECTED); + + if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) { + response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString()); + } + + response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString()); + + if (requestID != null) { + response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID); + } + } else { + // not valid + response = new StompFrameV10(Stomp.Responses.ERROR); + String responseText = "Security Error occurred: User name [" + login + "] or password is invalid"; + response.setBody(responseText); + response.setNeedsDisconnect(true); + response.addHeader(Stomp.Headers.Error.MESSAGE, responseText); } - - response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString()); - - if (requestID != null) { - response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID); - } - } else { - //not valid - response = new StompFrameV10(Stomp.Responses.ERROR); - String responseText = "Security Error occurred: User name [" + login + "] or password is invalid"; - response.setBody(responseText); - response.addHeader(Stomp.Headers.Error.MESSAGE, responseText); + } catch (ActiveMQStompException e) { + response = e.getFrame(); } return response; } @@ -91,6 +100,9 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements if (durableSubscriptionName == null) { durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME); } + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME); + } String subscriptionID = null; if (id != null) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index 89918983b3..67fb34b1e6 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.stomp.v11; +import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; + import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -37,8 +39,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.utils.ExecutorFactory; -import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; - public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements FrameEventListener { protected static final char ESC_CHAR = '\\'; @@ -72,6 +72,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements if (connection.validateUser(login, passcode, connection)) { connection.setValid(true); + // Create session after validating user - this will cache the session in the + // protocol manager + connection.getSession(); + response = this.createStompFrame(Stomp.Responses.CONNECTED); // version @@ -154,6 +158,9 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements if (durableSubscriptionName == null) { durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME); } + if (durableSubscriptionName == null) { + durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME); + } String subscriptionID = null; if (id != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java index f527e794c1..e426e3aa94 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java @@ -49,12 +49,24 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; @@ -65,6 +77,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runners.Parameterized; public class StompPluginTest extends StompTestBase { @@ -73,6 +86,11 @@ public class StompPluginTest extends StompTestBase { private StompClientConnectionV12 conn; + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new Object[][]{{"ws+v12.stomp"}, {"tcp+v12.stomp"}}); + } + @Override @Before public void setUp() throws Exception { @@ -96,57 +114,71 @@ public class StompPluginTest extends StompTestBase { private final Map methodCalls = new HashMap<>(); private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + private final AtomicBoolean stompBeforeCreateSession = new AtomicBoolean(); + private final AtomicBoolean stompBeforeRemoveSession = new AtomicBoolean(); @Override protected JMSServerManager createServer() throws Exception { JMSServerManager server = super.createServer(); server.getActiveMQServer().registerBrokerPlugin(verifier); + server.getActiveMQServer().registerBrokerPlugin(new ActiveMQServerPlugin() { + + @Override + public void beforeCreateSession(String name, String username, int minLargeMessageSize, + RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, + boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, + OperationContext context, Map prefixes) throws ActiveMQException { + + if (connection instanceof StompConnection) { + stompBeforeCreateSession.set(true); + } + } + + @Override + public void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException { + if (session.getRemotingConnection() instanceof StompConnection) { + stompBeforeRemoveSession.set(true); + } + } + }); return server; } @Test public void testSendAndReceive() throws Exception { - // subscribe - //StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); - try { - URI uri = new URI("ws+v12.stomp://localhost:61613"); - StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri); - newConn.connect(defUser, defPass); - subscribe(newConn, "a-sub"); + URI uri = new URI(scheme + "://localhost:61613"); + StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri); + newConn.connect(defUser, defPass); + subscribe(newConn, "a-sub"); - send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!"); - ClientStompFrame frame = newConn.receiveFrame(); + send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!"); + ClientStompFrame frame = newConn.receiveFrame(); - System.out.println("received " + frame); - Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + System.out.println("received " + frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); - verifier.validatePluginMethodsAtLeast(1, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, - AFTER_DELIVER); + verifier.validatePluginMethodsAtLeast(1, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); - // unsub - unsubscribe(newConn, "a-sub"); + // unsub + unsubscribe(newConn, "a-sub"); - newConn.disconnect(); - - verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, BEFORE_REMOVE_BINDING, AFTER_REMOVE_BINDING); - verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, - AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, - AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, - MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, - AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING); - - } catch (Throwable e) { - fail(e.getMessage()); - } + newConn.disconnect(); + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, BEFORE_REMOVE_BINDING, AFTER_REMOVE_BINDING); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING); } @Test public void testStompAutoCreateAddress() throws Exception { - URI uri = new URI("ws+v12.stomp://localhost:61613"); + URI uri = new URI(scheme + "://localhost:61613"); StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri); newConn.connect(defUser, defPass); @@ -161,4 +193,22 @@ public class StompPluginTest extends StompTestBase { } + @Test + public void testConnect() throws Exception { + + URI uri = new URI(scheme + "://localhost:61613"); + StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri); + newConn.connect(defUser, defPass); + + //Make sure session is created on connect + assertTrue(stompBeforeCreateSession.get()); + + newConn.disconnect(); + + Thread.sleep(500); + + //Make sure session is removed on disconnect + assertTrue(stompBeforeRemoveSession.get()); + + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 3d36d3fb90..e6be06d3e7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1215,6 +1215,35 @@ public class StompTest extends StompTestBase { conn.disconnect(); } + @Test + public void testDurableSubscriberWithReconnectionLegacy() throws Exception { + conn.connect(defUser, defPass, "myclientid"); + subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false); + + conn.disconnect(); + + Thread.sleep(500); + + // send the message when the durable subscriber is disconnected + sendJmsMessage(getName(), topic); + + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass, "myclientid"); + + subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false); + + ClientStompFrame frame = conn.receiveFrame(3000); + assertNotNull("Should have received a message from the durable subscription", frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + Assert.assertEquals(getName(), frame.getBody()); + + unsubscribeLegacyActiveMQ(conn, null, getTopicPrefix() + getTopicName(), true, true); + + conn.disconnect(); + } + @Test public void testDurableSubscriber() throws Exception { conn.connect(defUser, defPass, "myclientid"); @@ -1227,6 +1256,18 @@ public class StompTest extends StompTestBase { conn.disconnect(); } + @Test + public void testDurableSubscriberLegacySubscriptionHeader() throws Exception { + conn.connect(defUser, defPass, "myclientid"); + subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false); + ClientStompFrame response = subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false); + + // creating a subscriber with the same durable-subscriber-name must fail + Assert.assertEquals(Stomp.Responses.ERROR, response.getCommand()); + + conn.disconnect(); + } + @Test public void testDurableUnSubscribe() throws Exception { conn.connect(defUser, defPass, "myclientid"); @@ -1247,6 +1288,26 @@ public class StompTest extends StompTestBase { assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); } + @Test + public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception { + conn.connect(defUser, defPass, "myclientid"); + subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false); + conn.disconnect(); + Thread.sleep(500); + + assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); + + conn.destroy(); + conn = StompClientConnectionFactory.createClientConnection(uri); + + conn.connect(defUser, defPass, "myclientid"); + unsubscribeLegacyActiveMQ(conn, getName(), getTopicPrefix() + getTopicName(), false, true); + conn.disconnect(); + Thread.sleep(500); + + assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName()))); + } + @Test public void testSubscribeToTopicWithNoLocal() throws Exception { conn.connect(defUser, defPass); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index ec166dd388..2b1302ff9d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -441,10 +441,29 @@ public abstract class StompTestBase extends ActiveMQTestBase { return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false); } + public static ClientStompFrame subscribeTopic(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + boolean receipt, + boolean noLocal) throws IOException, InterruptedException { + return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal); + } + + public static ClientStompFrame subscribeTopicLegacyActiveMQ(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + boolean receipt, + boolean noLocal) throws IOException, InterruptedException { + return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal); + } + public static ClientStompFrame subscribeTopic(StompClientConnection conn, String subscriptionId, String ack, String durableId, + String durableIdHeader, boolean receipt, boolean noLocal) throws IOException, InterruptedException { ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) @@ -457,7 +476,7 @@ public abstract class StompTestBase extends ActiveMQTestBase { frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack); } if (durableId != null) { - frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId); + frame.addHeader(durableIdHeader, durableId); } String uuid = UUID.randomUUID().toString(); if (receipt) { @@ -491,14 +510,31 @@ public abstract class StompTestBase extends ActiveMQTestBase { return unsubscribe(conn, subscriptionId, null, receipt, false); } + public static ClientStompFrame unsubscribe(StompClientConnection conn, + String subscriptionId, + String destination, + boolean receipt, + boolean durable) throws IOException, InterruptedException { + return unsubscribe(conn, subscriptionId, Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, destination, receipt, durable); + } + + public static ClientStompFrame unsubscribeLegacyActiveMQ(StompClientConnection conn, + String subscriptionId, + String destination, + boolean receipt, + boolean durable) throws IOException, InterruptedException { + return unsubscribe(conn, subscriptionId, Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, destination, receipt, durable); + } + public static ClientStompFrame unsubscribe(StompClientConnection conn, String subscriptionId, + String subscriptionIdHeader, String destination, boolean receipt, boolean durable) throws IOException, InterruptedException { ClientStompFrame frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE); if (durable && subscriptionId != null) { - frame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subscriptionId); + frame.addHeader(subscriptionIdHeader, subscriptionId); } else if (!durable && subscriptionId != null) { frame.addHeader(Stomp.Headers.Unsubscribe.ID, subscriptionId); }