diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index bca7eaefda..b0eb678183 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -41,9 +41,11 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.command.ActiveMQDestination; @@ -197,7 +199,21 @@ public class AMQSession implements SessionCallback { try { if (!queueBinding.isExists()) { if (bindingQuery.isAutoCreateQueues()) { - server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, isTemporary); + SimpleString queueNameToUse = queueName; + SimpleString addressToUse = queueName; + RoutingType routingTypeToUse = RoutingType.ANYCAST; + if (CompositeAddress.isFullyQualified(queueName.toString())) { + CompositeAddress compositeAddress = CompositeAddress.getQueueName(queueName.toString()); + addressToUse = new SimpleString(compositeAddress.getAddress()); + queueNameToUse = new SimpleString(compositeAddress.getQueueName()); + if (bindingQuery.getAddressInfo() != null) { + routingTypeToUse = bindingQuery.getAddressInfo().getRoutingType(); + } else { + AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString()); + routingTypeToUse = as.getDefaultAddressRoutingType(); + } + } + server.createQueue(addressToUse, routingTypeToUse, queueNameToUse, null, true, isTemporary); connection.addKnownDestination(queueName); } else { hasQueue = false; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java index ff819ec871..1c20ff3726 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FQQNOpenWireTest.java @@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.CompositeAddress; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -315,15 +314,159 @@ public class FQQNOpenWireTest extends OpenWireTestBase { } @Test - @Ignore("need to figure auto bindings creation") - public void testVirtualTopicFQQNAutoCreate() throws Exception { + public void testVirtualTopicFQQNAutoCreateQueue() throws Exception { Connection exConn = null; SimpleString topic = new SimpleString("VirtualTopic.Orders"); SimpleString subscriptionQ = new SimpleString("Consumer.A"); + // defaults are false via test setUp + this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST)); this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); - this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.MULTICAST); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + MessageProducer producer = session.createProducer(destination); + + Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString()); + + MessageConsumer messageConsumerA = session.createConsumer(destinationFQN); + MessageConsumer messageConsumerB = session.createConsumer(destinationFQN); + + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + // only one consumer should get the message + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA == null || messageReceivedB == null)); + String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + messageConsumerB.close(); + + } finally { + if (exConn != null) { + exConn.close(); + } + } + } + + @Test + public void testVirtualTopicFQQNAutoCreateQAndAddress() throws Exception { + Connection exConn = null; + + SimpleString topic = new SimpleString("VirtualTopic.Orders"); + SimpleString subscriptionQ = new SimpleString("Consumer.A"); + + // defaults are false via test setUp + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + MessageProducer producer = session.createProducer(destination); + + Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString()); + + MessageConsumer messageConsumerA = session.createConsumer(destinationFQN); + MessageConsumer messageConsumerB = session.createConsumer(destinationFQN); + + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + // only one consumer should get the message + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA == null || messageReceivedB == null)); + String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + messageConsumerB.close(); + + } finally { + if (exConn != null) { + exConn.close(); + } + } + } + + @Test + public void testVirtualTopicFQQNConsumerAutoCreateQAndAddress() throws Exception { + Connection exConn = null; + + SimpleString topic = new SimpleString("VirtualTopic.Orders"); + SimpleString subscriptionQ = new SimpleString("Consumer.A"); + + // defaults are false via test setUp + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic.toString()); + Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString()); + + MessageConsumer messageConsumerA = session.createConsumer(destinationFQN); + MessageConsumer messageConsumerB = session.createConsumer(destinationFQN); + + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("This is a text message"); + producer.send(message); + + // only one consumer should get the message + TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000); + TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000); + + assertTrue((messageReceivedA == null || messageReceivedB == null)); + String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText(); + assertEquals("This is a text message", text); + + messageConsumerA.close(); + messageConsumerB.close(); + + } finally { + if (exConn != null) { + exConn.close(); + } + } + } + + @Test + public void testVirtualTopicFQQNAutoCreateQWithExistingAddressWithAnyCastDefault() throws Exception { + Connection exConn = null; + + SimpleString topic = new SimpleString("VirtualTopic.Orders"); + SimpleString subscriptionQ = new SimpleString("Consumer.A"); + + // defaults are false via test setUp + this.server.addAddressInfo(new AddressInfo(topic, RoutingType.MULTICAST)); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true); + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(false); + + // set default to anycast which would fail if used in queue auto creation + this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setDefaultAddressRoutingType(RoutingType.ANYCAST); try { ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();