ARTEMIS-2238 Enhancement to queueQuery on producer

This commit is contained in:
Clebert Suconic 2019-01-24 16:01:21 -05:00
parent bcd2f1cc94
commit 0f905224e7
2 changed files with 188 additions and 39 deletions

View File

@ -47,12 +47,15 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
/**
* ActiveMQ Artemis implementation of a JMS MessageProducer.
*/
public class ActiveMQMessageProducer implements MessageProducer, QueueSender, TopicPublisher {
private static final Logger logger = Logger.getLogger(ActiveMQMessageProducer.class);
private final ConnectionFactoryOptions options;
private final ActiveMQConnection connection;
@ -403,47 +406,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
}
address = destination.getSimpleAddress();
if (!connection.containsKnownDestination(address)) {
try {
ClientSession.AddressQuery query = clientSession.addressQuery(address);
if (!query.isExists()) {
if (destination.isQueue() && query.isAutoCreateQueues()) {
clientSession.createAddress(address, RoutingType.ANYCAST, true);
if (destination.isTemporary()) {
// TODO is it right to use the address for the queue name here?
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
} else {
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
}
} else if (!destination.isQueue() && query.isAutoCreateAddresses()) {
clientSession.createAddress(address, RoutingType.MULTICAST, true);
} else if ((destination.isQueue() && !query.isAutoCreateQueues()) || (!destination.isQueue() && !query.isAutoCreateAddresses())) {
throw new InvalidDestinationException("Destination " + address + " does not exist");
}
} else {
if (destination.isQueue()) {
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
if (!queueQuery.isExists()) {
if (destination.isTemporary()) {
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
} else {
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
}
}
}
connection.addKnownDestination(address);
}
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
}
checkDestination(destination, address, clientSession);
ActiveMQMessage activeMQJmsMessage;
boolean foreign = false;
@ -533,6 +499,67 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
}
}
private void checkDestination(ActiveMQDestination destination,
SimpleString address,
ClientSession clientSession) throws JMSException {
// TODO: What to do with FQQN
if (!connection.containsKnownDestination(address)) {
try {
ClientSession.AddressQuery addressQuery = clientSession.addressQuery(address);
boolean addressExists = addressQuery.isExists();
// first we check the address existence, and autoCreate it if allowed in case it does not exists
if (!addressExists && addressQuery.isAutoCreateAddresses()) {
if (destination.isQueue() && !addressQuery.isAutoCreateQueues()) {
if (logger.isDebugEnabled()) {
logger.debug("Address " + address + " was not created because we would not have permission to create queue");
}
// if it can't create the internal queue on JMS Queues, why bother creating the address, just mark it false now
addressExists = false;
} else {
RoutingType addressType = destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST;
clientSession.createAddress(address, addressType, true);
addressExists = true;
}
}
// Second we create the queue, but we only do it if the address was created
if (destination.isQueue() && addressExists) {
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
if (!queueQuery.isExists()) {
if (addressQuery.isAutoCreateQueues()) {
try {
if (destination.isTemporary()) {
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, addressQuery);
} else {
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, addressQuery);
}
} catch (ActiveMQQueueExistsException thatsOK) {
// nothing to be done
}
} else {
throw new InvalidDestinationException("Queue " + address + " does not exist");
}
}
}
if (!addressExists) {
throw new InvalidDestinationException("Address " + address + " does not exist");
}
// this is done at the end, if no exceptions are thrown
connection.addKnownDestination(address);
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
}
private void checkClosed() throws JMSException {
if (clientProducer.isClosed()) {
throw new IllegalStateException("Producer is closed");

View File

@ -17,26 +17,42 @@
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -48,6 +64,9 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
public static final String QUEUE_NAME = "test";
ClientSessionFactory factory;
ClientSession clientSession;
@Test
public void testAutoCreateOnSendToQueue() throws Exception {
Connection connection = cf.createConnection();
@ -265,6 +284,98 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
connection.close();
}
@Test //(timeout = 30000)
// QueueAutoCreationTest was created to validate auto-creation of queues
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
public void testAutoCreateOnTopic() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
System.out.println("Address is " + addressName);
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
Topic topic = new ActiveMQTopic(addressName.toString());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("hello"));
}
Assert.assertTrue(((ActiveMQConnection)connection).containsKnownDestination(addressName));
}
@Test (timeout = 30000)
// QueueAutoCreationTest was created to validate auto-creation of queues
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
public void testAutoCreateOnAddressOnly() throws Exception {
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", settings);
ConnectionFactory factory = new ActiveMQConnectionFactory();
try (Connection connection = factory.createConnection()) {
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
System.out.println("Address is " + addressName);
javax.jms.Queue queue = new ActiveMQQueue(addressName.toString());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
try {
producer.send(queue, session.createTextMessage("hello"));
Assert.fail("Expected to throw exception here");
} catch (JMSException expected) {
}
Assert.assertFalse(((ActiveMQConnection)connection).containsKnownDestination(addressName));
}
}
@Test (timeout = 30000)
// QueueAutoCreationTest was created to validate auto-creation of queues
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
public void testAutoCreateOnAddressOnlyDuringProducerCreate() throws Exception {
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", settings);
ConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
clientSession.createAddress(addressName, RoutingType.ANYCAST, true); // this will force the system to create the address only
javax.jms.Queue queue = new ActiveMQQueue(addressName.toString());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
MessageProducer producer = session.createProducer(queue);
Assert.fail("Exception expected");
} catch (JMSException expected) {
}
Assert.assertFalse(((ActiveMQConnection)connection).containsKnownDestination(addressName));
}
@Test (timeout = 30000)
// QueueAutoCreationTest was created to validate auto-creation of queues
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
public void testAutoCreateOnAddressOnlyDuringProducerCreateQueueSucceed() throws Exception {
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true);
server.getAddressSettingsRepository().addMatch("#", settings);
ConnectionFactory factory = cf;
try(Connection connection = factory.createConnection()) {
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
clientSession.createAddress(addressName, RoutingType.ANYCAST, true); // this will force the system to create the address only
javax.jms.Queue queue = new ActiveMQQueue(addressName.toString());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
Assert.assertNotNull(server.locateQueue(addressName));
Assert.assertTrue(((ActiveMQConnection) connection).containsKnownDestination(addressName));
}
}
@Before
@Override
public void setUp() throws Exception {
@ -276,6 +387,17 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
Set<Role> roles = new HashSet<>();
roles.add(role);
server.getSecurityRepository().addMatch("#", roles);
ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
factory = locator.createSessionFactory();
clientSession = factory.createSession();
}
@After
@Override
public void tearDown() throws Exception {
clientSession.close();
factory.close();
super.tearDown();
}
@Override