From d1cd1e71a198927c5180a1a032d6796e82b86da8 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 6 Sep 2017 09:07:21 -0500 Subject: [PATCH] ARTEMIS-1352 auto-create MULTICAST queue when AMQP client sends to topic --- .../amqp/broker/AMQPSessionCallback.java | 23 +++++++++----- .../proton/ProtonServerReceiverContext.java | 15 ++++++---- .../amqp/AmqpMaxFrameSizeTest.java | 3 ++ .../integration/client/ConsumerTest.java | 30 +++++++++++++++++++ 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 6807adac56..21afbf95e4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -21,6 +21,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; @@ -276,15 +277,23 @@ public class AMQPSessionCallback implements SessionCallback { return queueQueryResult; } - public boolean bindingQuery(String address) throws Exception { - BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address)); - if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) { + public boolean bindingQuery(String address, RoutingType routingType) throws Exception { + SimpleString simpleAddress = SimpleString.toSimpleString(address); + BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) { try { - serverSession.createQueue(new SimpleString(address), new SimpleString(address), RoutingType.ANYCAST, null, false, true); + serverSession.createAddress(simpleAddress, routingType, true); + } catch (ActiveMQAddressExistsException e) { + // The address may have been created by another thread in the mean time. Catch and do nothing. + } + bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) { + try { + serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address)); + bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); } return bindingQueryResult.isExists(); } @@ -406,7 +415,7 @@ public class AMQPSessionCallback implements SessionCallback { return; } - if (!bindingQuery(message.getAddress().toString())) { + if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } } @@ -660,7 +669,7 @@ public class AMQPSessionCallback implements SessionCallback { } public RoutingType getDefaultRoutingType(String address) { - return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(); + return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType(); } public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 2feb8dac7b..eee35a6a29 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -119,12 +119,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (address != null && !address.isEmpty()) { try { - if (!sessionSPI.bindingQuery(address)) { + if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } } catch (ActiveMQAMQPNotFoundException e) { throw e; } catch (Exception e) { + e.printStackTrace(); throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } @@ -177,11 +178,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } private RoutingType getRoutingType(Symbol[] symbols) { - for (Symbol symbol : symbols) { - if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { - return RoutingType.MULTICAST; - } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) { - return RoutingType.ANYCAST; + if (symbols != null) { + for (Symbol symbol : symbols) { + if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { + return RoutingType.MULTICAST; + } else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) { + return RoutingType.ANYCAST; + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java index 778cd40de3..c6e1008535 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -40,6 +42,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMultipleTransfers() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); String testQueueName = "ConnectionFrameSize"; int nMsgs = 200; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 69c7f68e69..fc474aadf7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -230,6 +230,36 @@ public class ConsumerTest extends ActiveMQTestBase { internalSend(2, 1); } + @Test + public void testAutoCreateMulticastAddress() throws Throwable { + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + assertNull(server.getAddressInfo(SimpleString.toSimpleString("topic"))); + + ConnectionFactory factorySend = createFactory(2); + Connection connection = factorySend.createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic topic = session.createTopic("topic"); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage msg = session.createTextMessage("hello"); + msg.setIntProperty("mycount", 0); + producer.send(msg); + } finally { + connection.close(); + } + + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("topic"))); + assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("topic")).getRoutingType()); + assertEquals(0, server.getTotalMessageCount()); + } + @Test public void testSendCoreReceiveAMQP() throws Throwable {