ARTEMIS-2882 better support for JMS topics + FQQN

Support FQQN with JMS topics when sending to or consuming from a
specific subscription. This applies to JMS over core, OpenWire,
and AMQP.
This commit is contained in:
Justin Bertram 2020-08-20 10:00:56 -05:00
parent a2bf85f529
commit 582a430213
5 changed files with 125 additions and 4 deletions

View File

@ -383,7 +383,6 @@ public class ActiveMQSession implements QueueSession, TopicSession {
void checkDestination(ActiveMQDestination destination) throws JMSException {
SimpleString address = destination.getSimpleAddress();
// TODO: What to do with FQQN
if (!destination.isCreated()) {
try {
ClientSession.AddressQuery addressQuery = session.addressQuery(address);
@ -419,6 +418,19 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Destination " + address + " does not exist, address exists but autoCreateQueues=" + addressQuery.isAutoCreateQueues());
}
}
} else if (CompositeAddress.isFullyQualified(address)) { // it could be a topic using FQQN
ClientSession.QueueQuery queueQuery = session.queueQuery(address);
if (!queueQuery.isExists()) {
if (addressQuery.isAutoCreateQueues()) {
if (destination.isTemporary()) {
createTemporaryQueue(destination, RoutingType.MULTICAST, address, null, addressQuery);
} else {
createQueue(destination, RoutingType.MULTICAST, address, null, true, true, addressQuery);
}
} else {
throw new InvalidDestinationException("Destination " + address + " does not exist, address exists but autoCreateQueues=" + addressQuery.isAutoCreateQueues());
}
}
}
} catch (ActiveMQQueueExistsException thatsOK) {
// nothing to be done
@ -837,7 +849,22 @@ public class ActiveMQSession implements QueueSession, TopicSession {
queueName = new SimpleString(UUID.randomUUID().toString());
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
} else {
if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) {
if (response.isAutoCreateQueues()) {
try {
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response);
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
}
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
}
queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
}
consumer = createClientConsumer(dest, queueName, null);
autoDeleteQueueName = queueName;

View File

@ -524,7 +524,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
if (queueName != null) {
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false);
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true);
if (!result.isExists()) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
} else {

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
@ -213,7 +214,17 @@ public class AMQConsumer {
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setInternal(internalAddress));
}
} else {
queueName = new SimpleString(UUID.randomUUID().toString());
/*
* The consumer may be using FQQN in which case the queue might already exist.
*/
if (CompositeAddress.isFullyQualified(physicalName)) {
queueName = CompositeAddress.extractQueueName(SimpleString.toSimpleString(physicalName));
if (session.getCoreServer().locateQueue(queueName) != null) {
return queueName;
}
} else {
queueName = new SimpleString(UUID.randomUUID().toString());
}
session.getCoreSession().createQueue(new QueueConfiguration(queueName).setAddress(address).setFilterString(selector).setDurable(false).setTemporary(true).setInternal(internalAddress));
}

View File

@ -68,6 +68,7 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
@Test
public void testFQQNTopicWhenQueueDoesNotExist() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false));
Exception e = null;
String queueName = "testQueue";
@ -87,6 +88,47 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
}
@Test
public void testTopicFQQNSendAndConsumeAutoCreate() throws Exception {
internalTopicFQQNSendAndConsume(true);
}
@Test
public void testTopicFQQNSendAndConsumeManualCreate() throws Exception {
internalTopicFQQNSendAndConsume(false);
}
private void internalTopicFQQNSendAndConsume(boolean autocreate) throws Exception {
if (autocreate) {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true));
} else {
server.createQueue(new QueueConfiguration(anycastQ1).setAddress(multicastAddress).setDurable(false));
}
try (Connection connection = createConnection(false)) {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(CompositeAddress.toFullyQualified(multicastAddress, anycastQ1).toString());
MessageConsumer consumer1 = session.createConsumer(topic);
MessageConsumer consumer2 = session.createConsumer(topic);
MessageConsumer consumer3 = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
//only 1 consumer receives the message as they're all connected to the same FQQN
Message m = consumer1.receive(2000);
assertNotNull(m);
m = consumer2.receiveNoWait();
assertNull(m);
m = consumer3.receiveNoWait();
assertNull(m);
}
}
@Test
public void testConsumeQueueToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {

View File

@ -123,6 +123,47 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
}
}
@Test
public void testTopicFQQNSendAndConsumeAutoCreate() throws Exception {
internalTopicFQQNSendAndConsume(true);
}
@Test
public void testTopicFQQNSendAndConsumeManualCreate() throws Exception {
internalTopicFQQNSendAndConsume(false);
}
private void internalTopicFQQNSendAndConsume(boolean autocreate) throws Exception {
if (autocreate) {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true));
} else {
server.createQueue(new QueueConfiguration(anycastQ1).setAddress(multicastAddress).setDurable(false));
}
try (Connection connection = factory.createConnection()) {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(CompositeAddress.toFullyQualified(multicastAddress, anycastQ1).toString());
MessageConsumer consumer1 = session.createConsumer(topic);
MessageConsumer consumer2 = session.createConsumer(topic);
MessageConsumer consumer3 = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
//only 1 consumer receives the message as they're all connected to the same FQQN
Message m = consumer1.receive(2000);
assertNotNull(m);
m = consumer2.receiveNoWait();
assertNull(m);
m = consumer3.receiveNoWait();
assertNull(m);
}
}
@Test
public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception {