ARTEMIS-1794 STOMP clients using same addr w/diff routing types

This commit is contained in:
Justin Bertram 2018-04-10 10:41:15 -05:00 committed by Clebert Suconic
parent e937c9903f
commit d773e8f66b
4 changed files with 221 additions and 34 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -262,7 +263,7 @@ public final class StompConnection implements RemotingConnection {
// TODO this should take a type - send or receive so it knows whether to check the address or the queue
public void checkDestination(String destination) throws ActiveMQStompException {
if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) {
if (!manager.destinationExists(destination)) {
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
@ -272,28 +273,47 @@ public final class StompConnection implements RemotingConnection {
try {
SimpleString simpleQueue = SimpleString.toSimpleString(queue);
if (manager.getServer().getAddressInfo(simpleQueue) == null) {
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
boolean checkAnycast = false;
/**
* If the address doesn't exist then it is created if possible.
* If the address does exist but doesn't support the routing-type then the address is updated if possible.
*/
if (addressInfo == null) {
if (addressSettings.isAutoCreateAddresses()) {
session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
}
// only auto create the queue if the address is ANYCAST
if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
checkAnycast = true;
} else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
if (addressSettings.isAutoCreateAddresses()) {
EnumSet<RoutingType> routingTypes = EnumSet.noneOf(RoutingType.class);
for (RoutingType existingRoutingType : addressInfo.getRoutingTypes()) {
routingTypes.add(existingRoutingType);
}
routingTypes.add(effectiveAddressRoutingType);
manager.getServer().updateAddressInfo(simpleQueue, routingTypes);
}
checkAnycast = true;
}
// only auto create the queue if the address is ANYCAST
if (checkAnycast && effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
}
} catch (ActiveMQQueueExistsException e) {
// ignore
} catch (Exception e) {
ActiveMQStompProtocolLogger.LOGGER.debug("Exception while auto-creating destination", e);
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}
}
public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException {
AddressInfo addressInfo = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)));
AddressInfo addressInfo = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination));
// may be null here if, for example, the management address is being checked
if (addressInfo != null) {

View File

@ -279,8 +279,16 @@ public abstract class VersionedStompFrameHandler {
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
}
public String getDestination(StompFrame request) {
return request.getHeader(Headers.Subscribe.DESTINATION);
public String getDestination(StompFrame request) throws ActiveMQStompException {
return getDestination(request, Headers.Subscribe.DESTINATION);
}
public String getDestination(StompFrame request, String header) throws ActiveMQStompException {
String destination = request.getHeader(header);
if (destination == null) {
return null;
}
return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
}
public StompFrame postprocess(StompFrame request) {

View File

@ -94,32 +94,36 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
@Override
public StompFrame onUnsubscribe(StompFrame request) {
StompFrame response = null;
String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
}
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
}
String subscriptionID = null;
if (id != null) {
subscriptionID = id;
} else {
if (destination == null) {
ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
response = error.getFrame();
return response;
}
subscriptionID = "subscription/" + destination;
}
try {
String destination = getDestination(request, Stomp.Headers.Unsubscribe.DESTINATION);
String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
}
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
}
String subscriptionID = null;
if (id != null) {
subscriptionID = id;
} else {
if (destination == null) {
ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
response = error.getFrame();
return response;
}
subscriptionID = "subscription/" + destination;
}
connection.unsubscribe(subscriptionID, durableSubscriptionName);
} catch (ActiveMQStompException e) {
return e.getFrame();
response = e.getFrame();
} catch (Exception e) {
ActiveMQStompException error = BUNDLE.errorHandleSend(e).setHandler(this);
response = error.getFrame();
}
return response;
}

View File

@ -1573,6 +1573,159 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
/**
* This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
* operations in opposite order. In this test the anycast subscription is created first.
* @throws Exception
*/
@Test
public void testPrefixedAutoCreatedAnycastAndMulticastWithSameName() throws Exception {
int port = 61614;
URI uri = createStompClientUri(scheme, hostname, port);
final String ADDRESS = UUID.randomUUID().toString();
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start();
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass);
// since this queue doesn't exist the broker should create a new ANYCAST address & queue
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
frame = conn.sendFrame(frame);
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS)));
// sending a MULTICAST message should alter the address to support MULTICAST
frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 1", true);
assertFalse(frame.getCommand().equals("ERROR"));
addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
// however, no message should be routed to the ANYCAST queue
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
// sending a message to the ANYCAST queue, should be received
frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 2", true);
assertFalse(frame.getCommand().equals("ERROR"));
frame = conn.receiveFrame(1000);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 2", frame.getBody());
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
// now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription
uuid = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
frame = conn.sendFrame(frame);
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
// send a message which will be routed to the MULTICAST queue
frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 3", true);
assertFalse(frame.getCommand().equals("ERROR"));
// receive that message on the topic subscription
frame = conn.receiveFrame(1000);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 3", frame.getBody());
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
conn.disconnect();
}
/**
* This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
* operations in opposite order. In this test the multicast subscription is created first.
* @throws Exception
*/
@Test
public void testPrefixedAutoCreatedMulticastAndAnycastWithSameName() throws Exception {
int port = 61614;
URI uri = createStompClientUri(scheme, hostname, port);
final String ADDRESS = UUID.randomUUID().toString();
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start();
StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
conn.connect(defUser, defPass);
// since this queue doesn't exist the broker should create a new MULTICAST address
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
frame = conn.sendFrame(frame);
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
// sending an ANYCAST message should alter the address to support ANYCAST and create an ANYCAST queue
frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 1", true);
assertFalse(frame.getCommand().equals("ERROR"));
addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS)));
// however, no message should be routed to the MULTICAST queue
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
// sending a message to the MULTICAST queue, should be received
frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 2", true);
assertFalse(frame.getCommand().equals("ERROR"));
frame = conn.receiveFrame(2000);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 2", frame.getBody());
Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
frame = conn.receiveFrame(1000);
Assert.assertNull(frame);
frame = unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
assertFalse(frame.getCommand().equals("ERROR"));
// now subscribe to the address in an ANYCAST way
uuid = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
frame = conn.sendFrame(frame);
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
// receive that message on the ANYCAST queue
frame = conn.receiveFrame(1000);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
Assert.assertEquals("Hello World 1", frame.getBody());
Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
frame = conn.receiveFrame(2000);
Assert.assertNull(frame);
unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
conn.disconnect();
}
@Test
public void testDotPrefixedSendAndRecieveAnycast() throws Exception {
testPrefixedSendAndRecieve("jms.queue.", RoutingType.ANYCAST);
@ -1626,11 +1779,13 @@ public class StompTest extends StompTestBase {
@Test
public void testMulticastOperationsOnAnycastAddress() throws Exception {
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
testRoutingSemantics(RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName());
}
@Test
public void testAnycastOperationsOnMulticastAddress() throws Exception {
server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
testRoutingSemantics(RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName());
}