This closes #2520
This commit is contained in:
commit
4ca3a971ba
|
@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
|
@ -47,12 +46,15 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
|||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.utils.UUID;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* ActiveMQ Artemis implementation of a JMS MessageProducer.
|
||||
*/
|
||||
public class ActiveMQMessageProducer implements MessageProducer, QueueSender, TopicPublisher {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ActiveMQMessageProducer.class);
|
||||
|
||||
private final ConnectionFactoryOptions options;
|
||||
|
||||
private final ActiveMQConnection connection;
|
||||
|
@ -395,53 +397,16 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
|||
}
|
||||
|
||||
destination = defaultDestination;
|
||||
// address is meant to be null on this case, as it will use the coreProducer's default address
|
||||
} else {
|
||||
if (defaultDestination != null) {
|
||||
if (!destination.equals(defaultDestination)) {
|
||||
throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is " + "specified in the arguments to the send, " + "these destinations must be equal");
|
||||
}
|
||||
if (defaultDestination != null && !destination.equals(defaultDestination)) {
|
||||
// This is a JMS TCK & Rule Definition.
|
||||
// if you specified a destination on the Producer, you cannot use it for a different destinations.
|
||||
throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is " + "specified in the arguments to the send, " + "these destinations must be equal");
|
||||
}
|
||||
|
||||
session.checkDestination(destination);
|
||||
address = destination.getSimpleAddress();
|
||||
|
||||
if (!connection.containsKnownDestination(address)) {
|
||||
try {
|
||||
ClientSession.AddressQuery query = clientSession.addressQuery(address);
|
||||
|
||||
if (!query.isExists()) {
|
||||
if (destination.isQueue() && query.isAutoCreateQueues()) {
|
||||
clientSession.createAddress(address, RoutingType.ANYCAST, true);
|
||||
if (destination.isTemporary()) {
|
||||
// TODO is it right to use the address for the queue name here?
|
||||
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
|
||||
} else {
|
||||
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
|
||||
}
|
||||
} else if (!destination.isQueue() && query.isAutoCreateAddresses()) {
|
||||
clientSession.createAddress(address, RoutingType.MULTICAST, true);
|
||||
} else if ((destination.isQueue() && !query.isAutoCreateQueues()) || (!destination.isQueue() && !query.isAutoCreateAddresses())) {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist");
|
||||
}
|
||||
} else {
|
||||
if (destination.isQueue()) {
|
||||
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
|
||||
if (!queueQuery.isExists()) {
|
||||
if (destination.isTemporary()) {
|
||||
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
|
||||
} else {
|
||||
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
connection.addKnownDestination(address);
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// The queue was created by another client/admin between the query check and send create queue packet
|
||||
} catch (ActiveMQException e) {
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ActiveMQMessage activeMQJmsMessage;
|
||||
|
|
|
@ -369,24 +369,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
ActiveMQDestination jbd = (ActiveMQDestination) destination;
|
||||
|
||||
if (jbd != null) {
|
||||
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
|
||||
|
||||
if (!response.isExists()) {
|
||||
try {
|
||||
if (jbd.isQueue() && response.isAutoCreateQueues()) {
|
||||
// perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers)
|
||||
session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
|
||||
createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response);
|
||||
} else if (!jbd.isQueue() && response.isAutoCreateAddresses()) {
|
||||
session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// Queue was created between our query and create queue request. Ignore.
|
||||
}
|
||||
|
||||
}
|
||||
checkDestination(jbd);
|
||||
}
|
||||
|
||||
ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
|
||||
|
@ -397,6 +380,55 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
}
|
||||
}
|
||||
|
||||
void checkDestination(ActiveMQDestination destination) throws JMSException {
|
||||
SimpleString address = destination.getSimpleAddress();
|
||||
// TODO: What to do with FQQN
|
||||
if (!connection.containsKnownDestination(address)) {
|
||||
try {
|
||||
ClientSession.AddressQuery addressQuery = session.addressQuery(address);
|
||||
|
||||
// First we create the address
|
||||
if (!addressQuery.isExists()) {
|
||||
if (destination.isQueue()) {
|
||||
if (addressQuery.isAutoCreateAddresses() && addressQuery.isAutoCreateQueues()) {
|
||||
session.createAddress(address, RoutingType.ANYCAST, true);
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses() + " , autoCreateQueues=" + addressQuery.isAutoCreateQueues());
|
||||
}
|
||||
} else {
|
||||
if (addressQuery.isAutoCreateAddresses()) {
|
||||
session.createAddress(address, RoutingType.MULTICAST, true);
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Second we create the queue, the address would have existed or successfully created.
|
||||
if (destination.isQueue()) {
|
||||
ClientSession.QueueQuery queueQuery = session.queueQuery(address);
|
||||
if (!queueQuery.isExists()) {
|
||||
if (addressQuery.isAutoCreateQueues()) {
|
||||
if (destination.isTemporary()) {
|
||||
createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, addressQuery);
|
||||
} else {
|
||||
createQueue(destination, RoutingType.ANYCAST, address, null, true, true, addressQuery);
|
||||
}
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist, address exists but autoCreateQueues=" + addressQuery.isAutoCreateQueues());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (ActiveMQQueueExistsException thatsOK) {
|
||||
// nothing to be done
|
||||
} catch (ActiveMQException e) {
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
// this is done at the end, if no exceptions are thrown
|
||||
connection.addKnownDestination(address);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer(final Destination destination) throws JMSException {
|
||||
return createConsumer(destination, null, false);
|
||||
|
|
|
@ -17,26 +17,41 @@
|
|||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.JMSSecurityException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
||||
import org.apache.activemq.artemis.junit.Wait;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -48,6 +63,9 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
|
|||
|
||||
public static final String QUEUE_NAME = "test";
|
||||
|
||||
ClientSessionFactory factory;
|
||||
ClientSession clientSession;
|
||||
|
||||
@Test
|
||||
public void testAutoCreateOnSendToQueue() throws Exception {
|
||||
Connection connection = cf.createConnection();
|
||||
|
@ -265,6 +283,98 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
@Test //(timeout = 30000)
|
||||
// QueueAutoCreationTest was created to validate auto-creation of queues
|
||||
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
|
||||
public void testAutoCreateOnTopic() throws Exception {
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||
System.out.println("Address is " + addressName);
|
||||
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
|
||||
Topic topic = new ActiveMQTopic(addressName.toString());
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
producer.send(session.createTextMessage("hello"));
|
||||
}
|
||||
|
||||
Assert.assertTrue(((ActiveMQConnection)connection).containsKnownDestination(addressName));
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
// QueueAutoCreationTest was created to validate auto-creation of queues
|
||||
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
|
||||
public void testAutoCreateOnAddressOnly() throws Exception {
|
||||
|
||||
server.getAddressSettingsRepository().clear();
|
||||
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(false);
|
||||
server.getAddressSettingsRepository().addMatch("#", settings);
|
||||
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||
System.out.println("Address is " + addressName);
|
||||
javax.jms.Queue queue = new ActiveMQQueue(addressName.toString());
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
try {
|
||||
producer.send(queue, session.createTextMessage("hello"));
|
||||
Assert.fail("Expected to throw exception here");
|
||||
} catch (JMSException expected) {
|
||||
}
|
||||
Assert.assertFalse(((ActiveMQConnection)connection).containsKnownDestination(addressName));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
// QueueAutoCreationTest was created to validate auto-creation of queues
|
||||
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
|
||||
public void testAutoCreateOnAddressOnlyDuringProducerCreate() throws Exception {
|
||||
server.getAddressSettingsRepository().clear();
|
||||
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(false);
|
||||
server.getAddressSettingsRepository().addMatch("#", settings);
|
||||
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||
clientSession.createAddress(addressName, RoutingType.ANYCAST, true); // this will force the system to create the address only
|
||||
javax.jms.Queue queue = new ActiveMQQueue(addressName.toString());
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
try {
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
Assert.fail("Exception expected");
|
||||
} catch (JMSException expected) {
|
||||
}
|
||||
|
||||
Assert.assertFalse(((ActiveMQConnection)connection).containsKnownDestination(addressName));
|
||||
}
|
||||
|
||||
|
||||
@Test (timeout = 30000)
|
||||
// QueueAutoCreationTest was created to validate auto-creation of queues
|
||||
// and this test was added to validate a regression: https://issues.apache.org/jira/browse/ARTEMIS-2238
|
||||
public void testAutoCreateOnAddressOnlyDuringProducerCreateQueueSucceed() throws Exception {
|
||||
server.getAddressSettingsRepository().clear();
|
||||
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true);
|
||||
server.getAddressSettingsRepository().addMatch("#", settings);
|
||||
|
||||
ConnectionFactory factory = cf;
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
|
||||
clientSession.createAddress(addressName, RoutingType.ANYCAST, true); // this will force the system to create the address only
|
||||
javax.jms.Queue queue = new ActiveMQQueue(addressName.toString());
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
Assert.assertNotNull(server.locateQueue(addressName));
|
||||
|
||||
Assert.assertTrue(((ActiveMQConnection) connection).containsKnownDestination(addressName));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
|
@ -276,6 +386,17 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
|
|||
Set<Role> roles = new HashSet<>();
|
||||
roles.add(role);
|
||||
server.getSecurityRepository().addMatch("#", roles);
|
||||
ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
|
||||
factory = locator.createSessionFactory();
|
||||
clientSession = factory.createSession();
|
||||
}
|
||||
|
||||
@After
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
clientSession.close();
|
||||
factory.close();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue