ARTEMIS-2238 Enhancement to queueQuery on producer

Move logic to ActiveMQSession
Use same method for createProducer, avoiding duplicated logic
Specific exception messages, for users
This commit is contained in:
Michael André Pearce 2019-01-25 01:05:13 +00:00 committed by Clebert Suconic
parent 0f905224e7
commit 33f56c81bd
2 changed files with 55 additions and 85 deletions

View File

@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -396,19 +395,17 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
if (defaultDestination == null) {
throw new UnsupportedOperationException("Destination must be specified on send with an anonymous producer");
}
destination = defaultDestination;
} else {
if (defaultDestination != null) {
if (!destination.equals(defaultDestination)) {
throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is " + "specified in the arguments to the send, " + "these destinations must be equal");
}
} else if (defaultDestination != null) {
if (!destination.equals(defaultDestination)) {
throw new UnsupportedOperationException("Where a default destination is specified " + "for the sender and a destination is " + "specified in the arguments to the send, " + "these destinations must be equal");
}
} else {
session.checkDestination(destination);
address = destination.getSimpleAddress();
}
checkDestination(destination, address, clientSession);
ActiveMQMessage activeMQJmsMessage;
@ -499,66 +496,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
}
}
private void checkDestination(ActiveMQDestination destination,
SimpleString address,
ClientSession clientSession) throws JMSException {
// TODO: What to do with FQQN
if (!connection.containsKnownDestination(address)) {
try {
ClientSession.AddressQuery addressQuery = clientSession.addressQuery(address);
boolean addressExists = addressQuery.isExists();
// first we check the address existence, and autoCreate it if allowed in case it does not exists
if (!addressExists && addressQuery.isAutoCreateAddresses()) {
if (destination.isQueue() && !addressQuery.isAutoCreateQueues()) {
if (logger.isDebugEnabled()) {
logger.debug("Address " + address + " was not created because we would not have permission to create queue");
}
// if it can't create the internal queue on JMS Queues, why bother creating the address, just mark it false now
addressExists = false;
} else {
RoutingType addressType = destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST;
clientSession.createAddress(address, addressType, true);
addressExists = true;
}
}
// Second we create the queue, but we only do it if the address was created
if (destination.isQueue() && addressExists) {
ClientSession.QueueQuery queueQuery = clientSession.queueQuery(address);
if (!queueQuery.isExists()) {
if (addressQuery.isAutoCreateQueues()) {
try {
if (destination.isTemporary()) {
session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, addressQuery);
} else {
session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, addressQuery);
}
} catch (ActiveMQQueueExistsException thatsOK) {
// nothing to be done
}
} else {
throw new InvalidDestinationException("Queue " + address + " does not exist");
}
}
}
if (!addressExists) {
throw new InvalidDestinationException("Address " + address + " does not exist");
}
// this is done at the end, if no exceptions are thrown
connection.addKnownDestination(address);
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
}
private void checkClosed() throws JMSException {
if (clientProducer.isClosed()) {

View File

@ -369,24 +369,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQDestination jbd = (ActiveMQDestination) destination;
if (jbd != null) {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
if (!response.isExists()) {
try {
if (jbd.isQueue() && response.isAutoCreateQueues()) {
// perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers)
session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response);
} else if (!jbd.isQueue() && response.isAutoCreateAddresses()) {
session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
} else {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
} catch (ActiveMQQueueExistsException e) {
// Queue was created between our query and create queue request. Ignore.
}
}
checkDestination(jbd);
}
ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
@ -397,6 +380,55 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
void checkDestination(ActiveMQDestination destination) throws JMSException {
SimpleString address = destination.getSimpleAddress();
// TODO: What to do with FQQN
if (!connection.containsKnownDestination(address)) {
try {
ClientSession.AddressQuery addressQuery = session.addressQuery(address);
// First we create the address
if (!addressQuery.isExists()) {
if (destination.isQueue()) {
if (addressQuery.isAutoCreateAddresses() && addressQuery.isAutoCreateQueues()) {
session.createAddress(address, RoutingType.ANYCAST, true);
} else {
throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses() + " , autoCreateQueues=" + addressQuery.isAutoCreateQueues());
}
} else {
if (addressQuery.isAutoCreateAddresses()) {
session.createAddress(address, RoutingType.MULTICAST, true);
} else {
throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses());
}
}
}
// Second we create the queue, the address would have existed or successfully created.
if (destination.isQueue()) {
ClientSession.QueueQuery queueQuery = session.queueQuery(address);
if (!queueQuery.isExists()) {
if (addressQuery.isAutoCreateQueues()) {
if (destination.isTemporary()) {
createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, addressQuery);
} else {
createQueue(destination, RoutingType.ANYCAST, address, null, true, true, addressQuery);
}
} else {
throw new InvalidDestinationException("Destination " + address + " does not exist, address exists but autoCreateQueues=" + addressQuery.isAutoCreateQueues());
}
}
}
} catch (ActiveMQQueueExistsException thatsOK) {
// nothing to be done
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
// this is done at the end, if no exceptions are thrown
connection.addKnownDestination(address);
}
}
@Override
public MessageConsumer createConsumer(final Destination destination) throws JMSException {
return createConsumer(destination, null, false);