This commit is contained in:
Timothy Bish 2018-02-22 16:28:56 -05:00
commit 8d343ade10
7 changed files with 241 additions and 55 deletions

View File

@ -152,10 +152,20 @@ public interface Stomp {
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
/**
* Backwards compatibility for STOMP clients that were using 5.x
*/
String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = "activemq.subscriptionName";
String SUBSCRIPTION_TYPE = "subscription-type";
String NO_LOCAL = "no-local";
/**
* Backwards compatibility for STOMP clients that were using 5.x
*/
String ACTIVEMQ_NO_LOCAL = "activemq.noLocal";
public interface AckModeValues {
String AUTO = "auto";
@ -176,6 +186,11 @@ public interface Stomp {
String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
/**
* Backwards compatibility for STOMP clients that were using 5.x
*/
String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = "activemq.subscriptionName";
}
interface Connect {

View File

@ -16,11 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -30,10 +31,9 @@ import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public abstract class VersionedStompFrameHandler {
protected StompConnection connection;
@ -266,10 +266,15 @@ public abstract class VersionedStompFrameHandler {
if (durableSubscriptionName == null) {
durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
if (durableSubscriptionName == null) {
durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
}
RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
} else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp.v10;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@ -30,8 +32,6 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener {
public StompFrameHandlerV10(StompConnection connection,
@ -52,27 +52,36 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
connection.setClientID(clientID);
if (connection.validateUser(login, passcode, connection)) {
connection.setValid(true);
try {
connection.setClientID(clientID);
if (connection.validateUser(login, passcode, connection)) {
connection.setValid(true);
response = new StompFrameV10(Stomp.Responses.CONNECTED);
// Create session after validating user - this will cache the session in the
// protocol manager
connection.getSession();
if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString());
response = new StompFrameV10(Stomp.Responses.CONNECTED);
if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
response.addHeader(Stomp.Headers.Connected.VERSION, StompVersions.V1_0.toString());
}
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
if (requestID != null) {
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
} else {
// not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
response.setBody(responseText);
response.setNeedsDisconnect(true);
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
}
response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
if (requestID != null) {
response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
} else {
//not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
String responseText = "Security Error occurred: User name [" + login + "] or password is invalid";
response.setBody(responseText);
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
} catch (ActiveMQStompException e) {
response = e.getFrame();
}
return response;
}
@ -91,6 +100,9 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
}
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
}
String subscriptionID = null;
if (id != null) {

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp.v11;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -37,8 +39,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements FrameEventListener {
protected static final char ESC_CHAR = '\\';
@ -72,6 +72,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
if (connection.validateUser(login, passcode, connection)) {
connection.setValid(true);
// Create session after validating user - this will cache the session in the
// protocol manager
connection.getSession();
response = this.createStompFrame(Stomp.Responses.CONNECTED);
// version
@ -154,6 +158,9 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
}
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
}
String subscriptionID = null;
if (id != null) {

View File

@ -49,12 +49,24 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@ -65,6 +77,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
public class StompPluginTest extends StompTestBase {
@ -73,6 +86,11 @@ public class StompPluginTest extends StompTestBase {
private StompClientConnectionV12 conn;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{"ws+v12.stomp"}, {"tcp+v12.stomp"}});
}
@Override
@Before
public void setUp() throws Exception {
@ -96,57 +114,71 @@ public class StompPluginTest extends StompTestBase {
private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls);
private final AtomicBoolean stompBeforeCreateSession = new AtomicBoolean();
private final AtomicBoolean stompBeforeRemoveSession = new AtomicBoolean();
@Override
protected JMSServerManager createServer() throws Exception {
JMSServerManager server = super.createServer();
server.getActiveMQServer().registerBrokerPlugin(verifier);
server.getActiveMQServer().registerBrokerPlugin(new ActiveMQServerPlugin() {
@Override
public void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues,
OperationContext context, Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if (connection instanceof StompConnection) {
stompBeforeCreateSession.set(true);
}
}
@Override
public void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
if (session.getRemotingConnection() instanceof StompConnection) {
stompBeforeRemoveSession.set(true);
}
}
});
return server;
}
@Test
public void testSendAndReceive() throws Exception {
// subscribe
//StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
try {
URI uri = new URI("ws+v12.stomp://localhost:61613");
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub");
URI uri = new URI(scheme + "://localhost:61613");
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass);
subscribe(newConn, "a-sub");
send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
ClientStompFrame frame = newConn.receiveFrame();
send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
ClientStompFrame frame = newConn.receiveFrame();
System.out.println("received " + frame);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
System.out.println("received " + frame);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
verifier.validatePluginMethodsAtLeast(1, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
verifier.validatePluginMethodsAtLeast(1, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
// unsub
unsubscribe(newConn, "a-sub");
// unsub
unsubscribe(newConn, "a-sub");
newConn.disconnect();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, BEFORE_REMOVE_BINDING, AFTER_REMOVE_BINDING);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING);
} catch (Throwable e) {
fail(e.getMessage());
}
newConn.disconnect();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, BEFORE_REMOVE_BINDING, AFTER_REMOVE_BINDING);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING);
}
@Test
public void testStompAutoCreateAddress() throws Exception {
URI uri = new URI("ws+v12.stomp://localhost:61613");
URI uri = new URI(scheme + "://localhost:61613");
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass);
@ -161,4 +193,22 @@ public class StompPluginTest extends StompTestBase {
}
@Test
public void testConnect() throws Exception {
URI uri = new URI(scheme + "://localhost:61613");
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass);
//Make sure session is created on connect
assertTrue(stompBeforeCreateSession.get());
newConn.disconnect();
Thread.sleep(500);
//Make sure session is removed on disconnect
assertTrue(stompBeforeRemoveSession.get());
}
}

View File

@ -1215,6 +1215,35 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
@Test
public void testDurableSubscriberWithReconnectionLegacy() throws Exception {
conn.connect(defUser, defPass, "myclientid");
subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
conn.disconnect();
Thread.sleep(500);
// send the message when the durable subscriber is disconnected
sendJmsMessage(getName(), topic);
conn.destroy();
conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, "myclientid");
subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
ClientStompFrame frame = conn.receiveFrame(3000);
assertNotNull("Should have received a message from the durable subscription", frame);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
Assert.assertEquals(getName(), frame.getBody());
unsubscribeLegacyActiveMQ(conn, null, getTopicPrefix() + getTopicName(), true, true);
conn.disconnect();
}
@Test
public void testDurableSubscriber() throws Exception {
conn.connect(defUser, defPass, "myclientid");
@ -1227,6 +1256,18 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
@Test
public void testDurableSubscriberLegacySubscriptionHeader() throws Exception {
conn.connect(defUser, defPass, "myclientid");
subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
ClientStompFrame response = subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
// creating a subscriber with the same durable-subscriber-name must fail
Assert.assertEquals(Stomp.Responses.ERROR, response.getCommand());
conn.disconnect();
}
@Test
public void testDurableUnSubscribe() throws Exception {
conn.connect(defUser, defPass, "myclientid");
@ -1247,6 +1288,26 @@ public class StompTest extends StompTestBase {
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}
@Test
public void testDurableUnSubscribeLegacySubscriptionHeader() throws Exception {
conn.connect(defUser, defPass, "myclientid");
subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
conn.disconnect();
Thread.sleep(500);
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
conn.destroy();
conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass, "myclientid");
unsubscribeLegacyActiveMQ(conn, getName(), getTopicPrefix() + getTopicName(), false, true);
conn.disconnect();
Thread.sleep(500);
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}
@Test
public void testSubscribeToTopicWithNoLocal() throws Exception {
conn.connect(defUser, defPass);

View File

@ -441,10 +441,29 @@ public abstract class StompTestBase extends ActiveMQTestBase {
return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false);
}
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
String subscriptionId,
String ack,
String durableId,
boolean receipt,
boolean noLocal) throws IOException, InterruptedException {
return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
}
public static ClientStompFrame subscribeTopicLegacyActiveMQ(StompClientConnection conn,
String subscriptionId,
String ack,
String durableId,
boolean receipt,
boolean noLocal) throws IOException, InterruptedException {
return subscribeTopic(conn, subscriptionId, ack, durableId, Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
}
public static ClientStompFrame subscribeTopic(StompClientConnection conn,
String subscriptionId,
String ack,
String durableId,
String durableIdHeader,
boolean receipt,
boolean noLocal) throws IOException, InterruptedException {
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
@ -457,7 +476,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack);
}
if (durableId != null) {
frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId);
frame.addHeader(durableIdHeader, durableId);
}
String uuid = UUID.randomUUID().toString();
if (receipt) {
@ -491,14 +510,31 @@ public abstract class StompTestBase extends ActiveMQTestBase {
return unsubscribe(conn, subscriptionId, null, receipt, false);
}
public static ClientStompFrame unsubscribe(StompClientConnection conn,
String subscriptionId,
String destination,
boolean receipt,
boolean durable) throws IOException, InterruptedException {
return unsubscribe(conn, subscriptionId, Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, destination, receipt, durable);
}
public static ClientStompFrame unsubscribeLegacyActiveMQ(StompClientConnection conn,
String subscriptionId,
String destination,
boolean receipt,
boolean durable) throws IOException, InterruptedException {
return unsubscribe(conn, subscriptionId, Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, destination, receipt, durable);
}
public static ClientStompFrame unsubscribe(StompClientConnection conn,
String subscriptionId,
String subscriptionIdHeader,
String destination,
boolean receipt,
boolean durable) throws IOException, InterruptedException {
ClientStompFrame frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
if (durable && subscriptionId != null) {
frame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subscriptionId);
frame.addHeader(subscriptionIdHeader, subscriptionId);
} else if (!durable && subscriptionId != null) {
frame.addHeader(Stomp.Headers.Unsubscribe.ID, subscriptionId);
}