This commit is contained in:
Clebert Suconic 2018-01-17 13:23:23 -05:00
commit addfef2196
2 changed files with 164 additions and 5 deletions

View File

@ -41,9 +41,11 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
@ -197,7 +199,21 @@ public class AMQSession implements SessionCallback {
try {
if (!queueBinding.isExists()) {
if (bindingQuery.isAutoCreateQueues()) {
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, isTemporary);
SimpleString queueNameToUse = queueName;
SimpleString addressToUse = queueName;
RoutingType routingTypeToUse = RoutingType.ANYCAST;
if (CompositeAddress.isFullyQualified(queueName.toString())) {
CompositeAddress compositeAddress = CompositeAddress.getQueueName(queueName.toString());
addressToUse = new SimpleString(compositeAddress.getAddress());
queueNameToUse = new SimpleString(compositeAddress.getQueueName());
if (bindingQuery.getAddressInfo() != null) {
routingTypeToUse = bindingQuery.getAddressInfo().getRoutingType();
} else {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString());
routingTypeToUse = as.getDefaultAddressRoutingType();
}
}
server.createQueue(addressToUse, routingTypeToUse, queueNameToUse, null, true, isTemporary);
connection.addKnownDestination(queueName);
} else {
hasQueue = false;

View File

@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -315,15 +314,159 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
}
@Test
@Ignore("need to figure auto bindings creation")
public void testVirtualTopicFQQNAutoCreate() throws Exception {
public void testVirtualTopicFQQNAutoCreateQueue() throws Exception {
Connection exConn = null;
SimpleString topic = new SimpleString("VirtualTopic.Orders");
SimpleString subscriptionQ = new SimpleString("Consumer.A");
// defaults are false via test setUp
this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.MULTICAST);
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
MessageProducer producer = session.createProducer(destination);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
// only one consumer should get the message
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
assertTrue((messageReceivedA == null || messageReceivedB == null));
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
messageConsumerB.close();
} finally {
if (exConn != null) {
exConn.close();
}
}
}
@Test
public void testVirtualTopicFQQNAutoCreateQAndAddress() throws Exception {
Connection exConn = null;
SimpleString topic = new SimpleString("VirtualTopic.Orders");
SimpleString subscriptionQ = new SimpleString("Consumer.A");
// defaults are false via test setUp
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
MessageProducer producer = session.createProducer(destination);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
// only one consumer should get the message
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
assertTrue((messageReceivedA == null || messageReceivedB == null));
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
messageConsumerB.close();
} finally {
if (exConn != null) {
exConn.close();
}
}
}
@Test
public void testVirtualTopicFQQNConsumerAutoCreateQAndAddress() throws Exception {
Connection exConn = null;
SimpleString topic = new SimpleString("VirtualTopic.Orders");
SimpleString subscriptionQ = new SimpleString("Consumer.A");
// defaults are false via test setUp
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
// only one consumer should get the message
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
assertTrue((messageReceivedA == null || messageReceivedB == null));
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
messageConsumerB.close();
} finally {
if (exConn != null) {
exConn.close();
}
}
}
@Test
public void testVirtualTopicFQQNAutoCreateQWithExistingAddressWithAnyCastDefault() throws Exception {
Connection exConn = null;
SimpleString topic = new SimpleString("VirtualTopic.Orders");
SimpleString subscriptionQ = new SimpleString("Consumer.A");
// defaults are false via test setUp
this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST));
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(false);
// set default to anycast which would fail if used in queue auto creation
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.ANYCAST);
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();