[ARTEMIS-1241] check for FQQN to parse out address and queue for auto creation
This commit is contained in:
parent
9e781be44e
commit
ea266cd74b
|
@ -41,9 +41,11 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
|
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||||
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
@ -197,7 +199,21 @@ public class AMQSession implements SessionCallback {
|
||||||
try {
|
try {
|
||||||
if (!queueBinding.isExists()) {
|
if (!queueBinding.isExists()) {
|
||||||
if (bindingQuery.isAutoCreateQueues()) {
|
if (bindingQuery.isAutoCreateQueues()) {
|
||||||
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, isTemporary);
|
SimpleString queueNameToUse = queueName;
|
||||||
|
SimpleString addressToUse = queueName;
|
||||||
|
RoutingType routingTypeToUse = RoutingType.ANYCAST;
|
||||||
|
if (CompositeAddress.isFullyQualified(queueName.toString())) {
|
||||||
|
CompositeAddress compositeAddress = CompositeAddress.getQueueName(queueName.toString());
|
||||||
|
addressToUse = new SimpleString(compositeAddress.getAddress());
|
||||||
|
queueNameToUse = new SimpleString(compositeAddress.getQueueName());
|
||||||
|
if (bindingQuery.getAddressInfo() != null) {
|
||||||
|
routingTypeToUse = bindingQuery.getAddressInfo().getRoutingType();
|
||||||
|
} else {
|
||||||
|
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString());
|
||||||
|
routingTypeToUse = as.getDefaultAddressRoutingType();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server.createQueue(addressToUse, routingTypeToUse, queueNameToUse, null, true, isTemporary);
|
||||||
connection.addKnownDestination(queueName);
|
connection.addKnownDestination(queueName);
|
||||||
} else {
|
} else {
|
||||||
hasQueue = false;
|
hasQueue = false;
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
@ -315,15 +314,159 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore("need to figure auto bindings creation")
|
public void testVirtualTopicFQQNAutoCreateQueue() throws Exception {
|
||||||
public void testVirtualTopicFQQNAutoCreate() throws Exception {
|
|
||||||
Connection exConn = null;
|
Connection exConn = null;
|
||||||
|
|
||||||
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||||
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
||||||
|
|
||||||
|
// defaults are false via test setUp
|
||||||
|
this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
|
||||||
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.MULTICAST);
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||||
|
exFact.setWatchTopicAdvisories(false);
|
||||||
|
exConn = exFact.createConnection();
|
||||||
|
exConn.start();
|
||||||
|
|
||||||
|
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(topic.toString());
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
|
||||||
|
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
|
||||||
|
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
// only one consumer should get the message
|
||||||
|
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
|
||||||
|
|
||||||
|
assertTrue((messageReceivedA == null || messageReceivedB == null));
|
||||||
|
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
|
||||||
|
assertEquals("This is a text message", text);
|
||||||
|
|
||||||
|
messageConsumerA.close();
|
||||||
|
messageConsumerB.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (exConn != null) {
|
||||||
|
exConn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVirtualTopicFQQNAutoCreateQAndAddress() throws Exception {
|
||||||
|
Connection exConn = null;
|
||||||
|
|
||||||
|
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||||
|
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
||||||
|
|
||||||
|
// defaults are false via test setUp
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||||
|
exFact.setWatchTopicAdvisories(false);
|
||||||
|
exConn = exFact.createConnection();
|
||||||
|
exConn.start();
|
||||||
|
|
||||||
|
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(topic.toString());
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
|
||||||
|
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
|
||||||
|
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
// only one consumer should get the message
|
||||||
|
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
|
||||||
|
|
||||||
|
assertTrue((messageReceivedA == null || messageReceivedB == null));
|
||||||
|
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
|
||||||
|
assertEquals("This is a text message", text);
|
||||||
|
|
||||||
|
messageConsumerA.close();
|
||||||
|
messageConsumerB.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (exConn != null) {
|
||||||
|
exConn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVirtualTopicFQQNConsumerAutoCreateQAndAddress() throws Exception {
|
||||||
|
Connection exConn = null;
|
||||||
|
|
||||||
|
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||||
|
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
||||||
|
|
||||||
|
// defaults are false via test setUp
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||||
|
exFact.setWatchTopicAdvisories(false);
|
||||||
|
exConn = exFact.createConnection();
|
||||||
|
exConn.start();
|
||||||
|
|
||||||
|
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTopic(topic.toString());
|
||||||
|
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
|
||||||
|
|
||||||
|
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
|
||||||
|
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
TextMessage message = session.createTextMessage("This is a text message");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
// only one consumer should get the message
|
||||||
|
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
|
||||||
|
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
|
||||||
|
|
||||||
|
assertTrue((messageReceivedA == null || messageReceivedB == null));
|
||||||
|
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
|
||||||
|
assertEquals("This is a text message", text);
|
||||||
|
|
||||||
|
messageConsumerA.close();
|
||||||
|
messageConsumerB.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (exConn != null) {
|
||||||
|
exConn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVirtualTopicFQQNAutoCreateQWithExistingAddressWithAnyCastDefault() throws Exception {
|
||||||
|
Connection exConn = null;
|
||||||
|
|
||||||
|
SimpleString topic = new SimpleString("VirtualTopic.Orders");
|
||||||
|
SimpleString subscriptionQ = new SimpleString("Consumer.A");
|
||||||
|
|
||||||
|
// defaults are false via test setUp
|
||||||
|
this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(false);
|
||||||
|
|
||||||
|
// set default to anycast which would fail if used in queue auto creation
|
||||||
|
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.ANYCAST);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||||
|
|
Loading…
Reference in New Issue