diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 21afbf95e4..aab414ff5f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -264,7 +264,7 @@ public class AMQPSessionCallback implements SessionCallback { if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { - serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true); + serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } @@ -289,7 +289,7 @@ public class AMQPSessionCallback implements SessionCallback { bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); } else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) { try { - serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true); + serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 95a6ef5616..f8d5f57d63 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; @@ -57,7 +56,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.remoting.FailureListener; -import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -70,6 +68,7 @@ import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.TempQueueObserver; import org.apache.activemq.artemis.core.server.impl.RefsOperation; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; @@ -725,20 +724,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } public void addDestination(DestinationInfo info) throws Exception { + boolean created = false; ActiveMQDestination dest = info.getDestination(); - if (dest.isQueue()) { - SimpleString qName = new SimpleString(dest.getPhysicalName()); - QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); - if (binding == null) { - if (dest.isTemporary()) { - internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), false); - } else { - ConnectionInfo connInfo = getState().getInfo(); - CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; - server.getSecurityStore().check(qName, checkType, this); - server.checkQueueCreationLimit(getUsername()); - server.createQueue(qName, RoutingType.ANYCAST, qName, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), null,true, false); - } + + SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName()); + if (server.locateQueue(qName) == null) { + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName()); + if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) { + internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary()); + created = true; + } else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) { + internalSession.createAddress(qName, RoutingType.MULTICAST, !dest.isTemporary()); + created = true; } } @@ -748,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.state.addTempDestination(info); } - if (!AdvisorySupport.isAdvisoryTopic(dest)) { + if (created && !AdvisorySupport.isAdvisoryTopic(dest)) { AMQConnectionContext context = getContext(); DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index fc474aadf7..5733713979 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -260,6 +260,40 @@ public class ConsumerTest extends ActiveMQTestBase { assertEquals(0, server.getTotalMessageCount()); } + @Test + public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable { + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue"))); + + ConnectionFactory factorySend = createFactory(2); + Connection connection = factorySend.createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue("queue"); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage msg = session.createTextMessage("hello"); + msg.setIntProperty("mycount", 0); + producer.send(msg); + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer.receive(1000)); + } finally { + connection.close(); + } + + assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue"))); + assertNull(server.locateQueue(SimpleString.toSimpleString("queue"))); + assertEquals(0, server.getTotalMessageCount()); + } + @Test public void testSendCoreReceiveAMQP() throws Throwable { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java index c45ff7d888..e67fd65739 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java @@ -24,9 +24,11 @@ import org.junit.Assert; import org.junit.Test; import javax.jms.Connection; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import java.util.Map; public class ProducerAutoCreateQueueTest extends BasicOpenWireTest { @@ -53,7 +55,90 @@ public class ProducerAutoCreateQueueTest extends BasicOpenWireTest { connection.close(); } } + } + @Test + public void testAutoCreateSendToTopic() throws Exception { + Connection connection = null; + try { + connection = factory.createConnection("admin", "password"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic trash = session.createTopic("trash"); + final MessageProducer producer = session.createProducer(trash); + producer.send(session.createTextMessage("foo")); + } finally { + if (connection != null) { + connection.close(); + } + } + assertNotNull(server.getAddressInfo(new SimpleString("trash"))); + assertEquals(0, server.getTotalMessageCount()); + } + + @Test + public void testAutoCreateSendToQueue() throws Exception { + Connection connection = null; + try { + connection = factory.createConnection("admin", "password"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue trash = session.createQueue("trash"); + final MessageProducer producer = session.createProducer(trash); + producer.send(session.createTextMessage("foo")); + } finally { + if (connection != null) { + connection.close(); + } + } + + assertNotNull(server.getAddressInfo(new SimpleString("trash"))); + assertNotNull(server.locateQueue(new SimpleString("trash"))); + assertEquals(1, server.getTotalMessageCount()); + } + + @Test + public void testAutoDelete() throws Exception { + Connection connection = null; + try { + connection = factory.createConnection("admin", "password"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue trash = session.createQueue("trash"); + final MessageProducer producer = session.createProducer(trash); + producer.send(session.createTextMessage("foo")); + Assert.assertNotNull(server.locateQueue(new SimpleString("trash"))); + MessageConsumer consumer = session.createConsumer(trash); + connection.start(); + assertNotNull(consumer.receive(1000)); + } finally { + if (connection != null) { + connection.close(); + } + } + + assertNull(server.locateQueue(new SimpleString("trash"))); + } + + @Test + public void testAutoDeleteNegative() throws Exception { + server.getAddressSettingsRepository().addMatch("trash", new AddressSettings().setAutoDeleteQueues(false).setAutoDeleteAddresses(false)); + Connection connection = null; + try { + connection = factory.createConnection("admin", "password"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue trash = session.createQueue("trash"); + final MessageProducer producer = session.createProducer(trash); + producer.send(session.createTextMessage("foo")); + Assert.assertNotNull(server.locateQueue(new SimpleString("trash"))); + MessageConsumer consumer = session.createConsumer(trash); + connection.start(); + assertNotNull(consumer.receive(1000)); + } finally { + if (connection != null) { + connection.close(); + } + } + + assertNotNull(server.locateQueue(new SimpleString("trash"))); + assertNotNull(server.getAddressInfo(new SimpleString("trash"))); } }