[ARTEMIS-2545]: Auto queue creation does not work with MDBs.
* Adding support for queue autocreation from the resource adapter. Issue: https://issues.apache.org/jira/browse/ARTEMIS-2545
This commit is contained in:
parent
6f1f9a62c2
commit
f19337901a
|
@ -0,0 +1,90 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 The Apache Software Foundation.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
|
||||||
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class to create queues 'automatically'.
|
||||||
|
*/
|
||||||
|
public class AutoCreateUtil {
|
||||||
|
|
||||||
|
public static void autoCreateQueue(ClientSession session, SimpleString destAddress, SimpleString selectorString) throws ActiveMQException {
|
||||||
|
AddressQuery response = session.addressQuery(destAddress);
|
||||||
|
/* The address query will send back exists=true even if the node only has a REMOTE binding for the destination.
|
||||||
|
* Therefore, we must check if the queue names list contains the exact name of the address to know whether or
|
||||||
|
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
|
||||||
|
*/
|
||||||
|
SimpleString queueName = getCoreQueueName(session, destAddress);
|
||||||
|
if (!response.isExists() || !response.getQueueNames().contains(queueName)) {
|
||||||
|
if (response.isAutoCreateQueues()) {
|
||||||
|
try {
|
||||||
|
QueueConfiguration queueConfiguration = new QueueConfiguration(queueName)
|
||||||
|
.setAutoCreated(true)
|
||||||
|
.setAddress(destAddress);
|
||||||
|
setRequiredQueueConfigurationIfNotSet(queueConfiguration,response, RoutingType.ANYCAST, selectorString, true);
|
||||||
|
session.createQueue(queueConfiguration);
|
||||||
|
ActiveMQClientLogger.LOGGER.debug("The queue " + destAddress + " was created automatically");
|
||||||
|
} catch (ActiveMQQueueExistsException e) {
|
||||||
|
// The queue was created by another client/admin between the query check and send create queue packet
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ActiveMQException("Destination " + destAddress + " does not exist", QUEUE_DOES_NOT_EXIST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the non nullable (CreateQueueMessage_V2) queue attributes (all others have static defaults or get defaulted if null by address settings server side).
|
||||||
|
*
|
||||||
|
* @param queueConfiguration the provided queue configuration the client wants to set
|
||||||
|
* @param addressQuery the address settings query information (this could be removed if max consumers and purge on no consumers were null-able in CreateQueueMessage_V2)
|
||||||
|
* @param routingType of the queue (multicast or anycast)
|
||||||
|
* @param filter to apply on the queue
|
||||||
|
* @param durable if queue is durable
|
||||||
|
*/
|
||||||
|
public static void setRequiredQueueConfigurationIfNotSet(QueueConfiguration queueConfiguration, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) {
|
||||||
|
if (queueConfiguration.getRoutingType() == null) {
|
||||||
|
queueConfiguration.setRoutingType(routingType);
|
||||||
|
}
|
||||||
|
if (queueConfiguration.getFilterString() == null) {
|
||||||
|
queueConfiguration.setFilterString(filter);
|
||||||
|
}
|
||||||
|
if (queueConfiguration.getMaxConsumers() == null) {
|
||||||
|
queueConfiguration.setMaxConsumers(addressQuery.getDefaultMaxConsumers());
|
||||||
|
}
|
||||||
|
if (queueConfiguration.isPurgeOnNoConsumers() == null) {
|
||||||
|
queueConfiguration.setPurgeOnNoConsumers(addressQuery.isDefaultPurgeOnNoConsumers());
|
||||||
|
}
|
||||||
|
queueConfiguration.setDurable(durable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SimpleString getCoreQueueName(ClientSession session, SimpleString destAddress) {
|
||||||
|
if (session.getVersion() < PacketImpl.FQQN_CHANGE_VERSION) {
|
||||||
|
return destAddress;
|
||||||
|
}
|
||||||
|
return CompositeAddress.extractQueueName(destAddress);
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.jms.client;
|
package org.apache.activemq.artemis.jms.client;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
|
@ -72,6 +74,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompati
|
||||||
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage;
|
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage;
|
||||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||||
|
import org.apache.activemq.artemis.utils.AutoCreateUtil;
|
||||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||||
|
|
||||||
|
@ -802,22 +805,13 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
SimpleString autoDeleteQueueName = null;
|
SimpleString autoDeleteQueueName = null;
|
||||||
|
|
||||||
if (dest.isQueue()) {
|
if (dest.isQueue()) {
|
||||||
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
|
try {
|
||||||
|
AutoCreateUtil.autoCreateQueue(session, dest.getSimpleAddress(), null);
|
||||||
/* The address query will send back exists=true even if the node only has a REMOTE binding for the destination.
|
} catch (ActiveMQException ex) {
|
||||||
* Therefore, we must check if the queue names list contains the exact name of the address to know whether or
|
if (ex.getType() == QUEUE_DOES_NOT_EXIST) {
|
||||||
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
|
|
||||||
*/
|
|
||||||
if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) {
|
|
||||||
if (response.isAutoCreateQueues()) {
|
|
||||||
try {
|
|
||||||
createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);
|
|
||||||
} catch (ActiveMQQueueExistsException e) {
|
|
||||||
// The queue was created by another client/admin between the query check and send create queue packet
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
|
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
|
||||||
}
|
}
|
||||||
|
throw ex;
|
||||||
}
|
}
|
||||||
|
|
||||||
dest.setCreated(true);
|
dest.setCreated(true);
|
||||||
|
@ -848,7 +842,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
|
if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
|
||||||
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
|
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
|
||||||
} else {
|
} else {
|
||||||
if (!response.isExists() || !response.getQueueNames().contains(getCoreQueueName(dest))) {
|
if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) {
|
||||||
if (response.isAutoCreateQueues()) {
|
if (response.isAutoCreateQueues()) {
|
||||||
try {
|
try {
|
||||||
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response);
|
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response);
|
||||||
|
@ -931,14 +925,6 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private SimpleString getCoreQueueName(ActiveMQDestination dest) {
|
|
||||||
if (session.getVersion() < PacketImpl.FQQN_CHANGE_VERSION) {
|
|
||||||
return dest.getSimpleAddress();
|
|
||||||
} else {
|
|
||||||
return CompositeAddress.extractQueueName(dest.getSimpleAddress());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
|
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
|
||||||
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
|
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
|
||||||
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
|
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
|
||||||
|
@ -1278,19 +1264,19 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
|
|
||||||
void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
|
void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
|
||||||
QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration();
|
QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration();
|
||||||
setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, false);
|
AutoCreateUtil.setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, false);
|
||||||
session.createQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setDurable(false).setTemporary(true));
|
session.createQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setDurable(false).setTemporary(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
|
void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
|
||||||
QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration();
|
QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration();
|
||||||
setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable);
|
AutoCreateUtil.setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable);
|
||||||
session.createSharedQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setDurable(durable));
|
session.createSharedQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setDurable(durable));
|
||||||
}
|
}
|
||||||
|
|
||||||
void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
|
void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
|
||||||
QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration();
|
QueueConfiguration queueConfiguration = destination.getQueueConfiguration() == null ? new QueueConfiguration(queueName) : destination.getQueueConfiguration();
|
||||||
setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable);
|
AutoCreateUtil.setRequiredQueueConfigurationIfNotSet(queueConfiguration, addressQuery, routingType, filter, durable);
|
||||||
session.createQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setAutoCreated(autoCreated).setDurable(durable));
|
session.createQueue(queueConfiguration.setName(queueName).setAddress(destination.getAddress()).setAutoCreated(autoCreated).setDurable(durable));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1351,32 +1337,6 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
||||||
return topic;
|
return topic;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the non nullable (CreateQueueMessage_V2) queue attributes (all others have static defaults or get defaulted if null by address settings server side).
|
|
||||||
*
|
|
||||||
* @param queueConfiguration the provided queue configuration the client wants to set
|
|
||||||
* @param addressQuery the address settings query information (this could be removed if max consumers and purge on no consumers were null-able in CreateQueueMessage_V2)
|
|
||||||
* @param routingType of the queue (multicast or anycast)
|
|
||||||
* @param filter to apply on the queue
|
|
||||||
* @param durable if queue is durable
|
|
||||||
*/
|
|
||||||
private void setRequiredQueueConfigurationIfNotSet(QueueConfiguration queueConfiguration, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) {
|
|
||||||
if (queueConfiguration.getRoutingType() == null) {
|
|
||||||
queueConfiguration.setRoutingType(routingType);
|
|
||||||
}
|
|
||||||
if (queueConfiguration.getFilterString() == null) {
|
|
||||||
queueConfiguration.setFilterString(filter);
|
|
||||||
}
|
|
||||||
if (queueConfiguration.getMaxConsumers() == null) {
|
|
||||||
queueConfiguration.setMaxConsumers(addressQuery.getDefaultMaxConsumers());
|
|
||||||
}
|
|
||||||
if (queueConfiguration.isPurgeOnNoConsumers() == null) {
|
|
||||||
queueConfiguration.setPurgeOnNoConsumers(addressQuery.isDefaultPurgeOnNoConsumers());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Inner classes -------------------------------------------------
|
// Inner classes -------------------------------------------------
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRALogger;
|
||||||
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
||||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||||
import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper;
|
import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper;
|
||||||
|
import org.apache.activemq.artemis.utils.AutoCreateUtil;
|
||||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -172,6 +173,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tempQueueName = activation.getAddress();
|
tempQueueName = activation.getAddress();
|
||||||
|
AutoCreateUtil.autoCreateQueue(session, tempQueueName, selectorString);
|
||||||
}
|
}
|
||||||
consumer = (ClientConsumerInternal) session.createConsumer(tempQueueName, selectorString);
|
consumer = (ClientConsumerInternal) session.createConsumer(tempQueueName, selectorString);
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,6 +135,35 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
|
||||||
activation.stop();
|
activation.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCreateQueuePrefixWhenUseJndiIsFalse() throws Exception {
|
||||||
|
final String prefix = "jms.queue.";
|
||||||
|
final String destinationName = "autocreatedtest";
|
||||||
|
final SimpleString prefixedDestinationName = SimpleString.toSimpleString(prefix + destinationName);
|
||||||
|
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
|
||||||
|
ra.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
||||||
|
ra.start(new BootstrapContext());
|
||||||
|
Connection conn = ra.getDefaultActiveMQConnectionFactory().createConnection();
|
||||||
|
conn.close();
|
||||||
|
|
||||||
|
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
|
||||||
|
spec.setResourceAdapter(ra);
|
||||||
|
spec.setUseJNDI(false);
|
||||||
|
spec.setDestinationType("javax.jms.Queue");
|
||||||
|
spec.setDestination(destinationName);
|
||||||
|
spec.setQueuePrefix(prefix);
|
||||||
|
spec.setMaxSession(1);
|
||||||
|
spec.setSetupAttempts(1);
|
||||||
|
|
||||||
|
ActiveMQActivation activation = new ActiveMQActivation(ra, new MessageEndpointFactory(), spec);
|
||||||
|
|
||||||
|
activation.start();
|
||||||
|
|
||||||
|
assertEquals(1, server.locateQueue(prefixedDestinationName).getConsumerCount());
|
||||||
|
|
||||||
|
activation.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTopicPrefixWhenUseJndiIsFalse() throws Exception {
|
public void testTopicPrefixWhenUseJndiIsFalse() throws Exception {
|
||||||
final String prefix = "jms.topic.";
|
final String prefix = "jms.topic.";
|
||||||
|
|
Loading…
Reference in New Issue