From f19337901a45e3437d1b944a5c60fb975754ab88 Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Thu, 16 Sep 2021 09:57:07 +0200 Subject: [PATCH] [ARTEMIS-2545]: Auto queue creation does not work with MDBs. * Adding support for queue autocreation from the resource adapter. Issue: https://issues.apache.org/jira/browse/ARTEMIS-2545 --- .../artemis/utils/AutoCreateUtil.java | 90 +++++++++++++++++++ .../artemis/jms/client/ActiveMQSession.java | 64 +++---------- .../ra/inflow/ActiveMQMessageHandler.java | 2 + .../integration/ra/ResourceAdapterTest.java | 29 ++++++ 4 files changed, 133 insertions(+), 52 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java new file mode 100644 index 0000000000..48e98eda12 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/AutoCreateUtil.java @@ -0,0 +1,90 @@ +/* + * Copyright 2021 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +/** + * Utility class to create queues 'automatically'. + */ +public class AutoCreateUtil { + + public static void autoCreateQueue(ClientSession session, SimpleString destAddress, SimpleString selectorString) throws ActiveMQException { + AddressQuery response = session.addressQuery(destAddress); + /* The address query will send back exists=true even if the node only has a REMOTE binding for the destination. + * Therefore, we must check if the queue names list contains the exact name of the address to know whether or + * not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here. + */ + SimpleString queueName = getCoreQueueName(session, destAddress); + if (!response.isExists() || !response.getQueueNames().contains(queueName)) { + if (response.isAutoCreateQueues()) { + try { + QueueConfiguration queueConfiguration = new QueueConfiguration(queueName) + .setAutoCreated(true) + .setAddress(destAddress); + setRequiredQueueConfigurationIfNotSet(queueConfiguration,response, RoutingType.ANYCAST, selectorString, true); + session.createQueue(queueConfiguration); + ActiveMQClientLogger.LOGGER.debug("The queue " + destAddress + " was created automatically"); + } catch (ActiveMQQueueExistsException e) { + // The queue was created by another client/admin between the query check and send create queue packet + } + } else { + throw new ActiveMQException("Destination " + destAddress + " does not exist", QUEUE_DOES_NOT_EXIST); + } + } + } + + /** + * Set the non nullable (CreateQueueMessage_V2) queue attributes (all others have static defaults or get defaulted if null by address settings server side). + * + * @param queueConfiguration the provided queue configuration the client wants to set + * @param addressQuery the address settings query information (this could be removed if max consumers and purge on no consumers were null-able in CreateQueueMessage_V2) + * @param routingType of the queue (multicast or anycast) + * @param filter to apply on the queue + * @param durable if queue is durable + */ + public static void setRequiredQueueConfigurationIfNotSet(QueueConfiguration queueConfiguration, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) { + if (queueConfiguration.getRoutingType() == null) { + queueConfiguration.setRoutingType(routingType); + } + if (queueConfiguration.getFilterString() == null) { + queueConfiguration.setFilterString(filter); + } + if (queueConfiguration.getMaxConsumers() == null) { + queueConfiguration.setMaxConsumers(addressQuery.getDefaultMaxConsumers()); + } + if (queueConfiguration.isPurgeOnNoConsumers() == null) { + queueConfiguration.setPurgeOnNoConsumers(addressQuery.isDefaultPurgeOnNoConsumers()); + } + queueConfiguration.setDurable(durable); + } + + public static SimpleString getCoreQueueName(ClientSession session, SimpleString destAddress) { + if (session.getVersion() < PacketImpl.FQQN_CHANGE_VERSION) { + return destAddress; + } + return CompositeAddress.extractQueueName(destAddress); + } +} diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 6ec2225136..1b2d619462 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.jms.client; +import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST; + import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -72,6 +74,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompati import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; +import org.apache.activemq.artemis.utils.AutoCreateUtil; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.SelectorTranslator; @@ -802,22 +805,13 @@ public class ActiveMQSession implements QueueSession, TopicSession { SimpleString autoDeleteQueueName = null; if (dest.isQueue()) { - AddressQuery response = session.addressQuery(dest.getSimpleAddress()); - - /* The address query will send back exists=true even if the node only has a REMOTE binding for the destination. - * Therefore, we must check if the queue names list contains the exact name of the address to know whether or - * not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here. - */ - if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) { - if (response.isAutoCreateQueues()) { - try { - createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response); - } catch (ActiveMQQueueExistsException e) { - // The queue was created by another client/admin between the query check and send create queue packet - } - } else { + try { + AutoCreateUtil.autoCreateQueue(session, dest.getSimpleAddress(), null); + } catch (ActiveMQException ex) { + if (ex.getType() == QUEUE_DOES_NOT_EXIST) { throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); } + throw ex; } dest.setCreated(true); @@ -848,7 +842,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (!CompositeAddress.isFullyQualified(dest.getAddress())) { createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response); } else { - if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) { + if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) { if (response.isAutoCreateQueues()) { try { createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response); @@ -931,14 +925,6 @@ public class ActiveMQSession implements QueueSession, TopicSession { } } - private SimpleString getCoreQueueName(ActiveMQDestination dest) { - if (session.getVersion() < PacketImpl.FQQN_CHANGE_VERSION) { - return dest.getSimpleAddress(); - } else { - return CompositeAddress.extractQueueName(dest.getSimpleAddress()); - } - } - private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException { QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes(); int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority(); @@ -1278,19 +1264,19 @@ public class ActiveMQSession implements QueueSession, TopicSession { void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, ClientSession.AddressQuery addressQuery) throws ActiveMQException { QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration(); - setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, false); + AutoCreateUtil.setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, false); session.createQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setDurable(false).setTemporary(true)); } void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, ClientSession.AddressQuery addressQuery) throws ActiveMQException { QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration(); - setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable); + AutoCreateUtil.setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable); session.createSharedQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setDurable(durable)); } void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, ClientSession.AddressQuery addressQuery) throws ActiveMQException { QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration(); - setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable); + AutoCreateUtil.setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable); session.createQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setAutoCreated(autoCreated).setDurable(durable)); } @@ -1351,32 +1337,6 @@ public class ActiveMQSession implements QueueSession, TopicSession { return topic; } } - - /** - * Set the non nullable (CreateQueueMessage_V2) queue attributes (all others have static defaults or get defaulted if null by address settings server side). - * - * @param queueConfiguration the provided queue configuration the client wants to set - * @param addressQuery the address settings query information (this could be removed if max consumers and purge on no consumers were null-able in CreateQueueMessage_V2) - * @param routingType of the queue (multicast or anycast) - * @param filter to apply on the queue - * @param durable if queue is durable - */ - private void setRequiredQueueConfigurationIfNotSet(QueueConfiguration queueConfiguration, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) { - if (queueConfiguration.getRoutingType() == null) { - queueConfiguration.setRoutingType(routingType); - } - if (queueConfiguration.getFilterString() == null) { - queueConfiguration.setFilterString(filter); - } - if (queueConfiguration.getMaxConsumers() == null) { - queueConfiguration.setMaxConsumers(addressQuery.getDefaultMaxConsumers()); - } - if (queueConfiguration.isPurgeOnNoConsumers() == null) { - queueConfiguration.setPurgeOnNoConsumers(addressQuery.isDefaultPurgeOnNoConsumers()); - } - } - - // Inner classes ------------------------------------------------- } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index 714277f538..0ee70437a7 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -50,6 +50,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRALogger; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper; +import org.apache.activemq.artemis.utils.AutoCreateUtil; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.VersionLoader; import org.jboss.logging.Logger; @@ -172,6 +173,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList } } else { tempQueueName = activation.getAddress(); + AutoCreateUtil.autoCreateQueue(session, tempQueueName, selectorString); } consumer = (ClientConsumerInternal) session.createConsumer(tempQueueName, selectorString); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java index 7689a02c59..7b6f1b12cc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java @@ -135,6 +135,35 @@ public class ResourceAdapterTest extends ActiveMQRATestBase { activation.stop(); } + @Test + public void testAutoCreateQueuePrefixWhenUseJndiIsFalse() throws Exception { + final String prefix = "jms.queue."; + final String destinationName = "autocreatedtest"; + final SimpleString prefixedDestinationName = SimpleString.toSimpleString(prefix + destinationName); + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + ra.setConnectorClassName(INVM_CONNECTOR_FACTORY); + ra.start(new BootstrapContext()); + Connection conn = ra.getDefaultActiveMQConnectionFactory().createConnection(); + conn.close(); + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setResourceAdapter(ra); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(destinationName); + spec.setQueuePrefix(prefix); + spec.setMaxSession(1); + spec.setSetupAttempts(1); + + ActiveMQActivation activation = new ActiveMQActivation(ra, new MessageEndpointFactory(), spec); + + activation.start(); + + assertEquals(1, server.locateQueue(prefixedDestinationName).getConsumerCount()); + + activation.stop(); + } + @Test public void testTopicPrefixWhenUseJndiIsFalse() throws Exception { final String prefix = "jms.topic.";