ARTEMIS-880 expand prefix support to consumers

This commit is contained in:
jbertram 2016-12-14 15:08:32 -06:00
parent 3af1e5c734
commit 29edd44d24
1 changed files with 19 additions and 14 deletions

View File

@ -418,10 +418,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean browseOnly,
final boolean supportLargeMessage,
final Integer credits) throws Exception {
Binding binding = postOffice.getBinding(queueName);
final SimpleString unPrefixedQueueName = removePrefix(queueName);
Binding binding = postOffice.getBinding(unPrefixedQueueName);
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(unPrefixedQueueName);
}
SimpleString address = removePrefix(binding.getAddress());
@ -429,13 +431,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
try {
securityCheck(address, CheckType.BROWSE, this);
} catch (Exception e) {
securityCheck(address.concat(".").concat(queueName), CheckType.BROWSE, this);
securityCheck(address.concat(".").concat(unPrefixedQueueName), CheckType.BROWSE, this);
}
} else {
try {
securityCheck(address, CheckType.CONSUME, this);
} catch (Exception e) {
securityCheck(address.concat(".").concat(queueName), CheckType.CONSUME, this);
securityCheck(address.concat(".").concat(unPrefixedQueueName), CheckType.CONSUME, this);
}
}
@ -475,7 +477,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (logger.isDebugEnabled()) {
logger.debug("Session with user=" + username +
", connection=" + this.remotingConnection +
" created a consumer on queue " + queueName +
" created a consumer on queue " + unPrefixedQueueName +
", filter = " + filterString);
}
@ -528,6 +530,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreated) throws Exception {
final SimpleString unPrefixedName = removePrefix(name);
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
@ -540,7 +543,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername());
Queue queue = server.createQueue(art.getA(), art.getB(), name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
Queue queue = server.createQueue(art.getA(), art.getB(), unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@ -549,7 +552,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// session is closed.
// It is up to the user to delete the queue when finished with it
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, name);
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, unPrefixedName);
if (remotingConnection instanceof TempQueueObserver) {
cleaner.setObserver((TempQueueObserver) remotingConnection);
}
@ -557,11 +560,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.addCloseListener(cleaner);
remotingConnection.addFailureListener(cleaner);
tempQueueCleannerUppers.put(name, cleaner);
tempQueueCleannerUppers.put(unPrefixedName, cleaner);
}
if (logger.isDebugEnabled()) {
logger.debug("Queue " + name + " created on address " + address +
logger.debug("Queue " + unPrefixedName + " created on address " + address +
" with filter=" + filterString + " temporary = " +
temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
}
@ -692,15 +695,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void deleteQueue(final SimpleString queueToDelete) throws Exception {
Binding binding = postOffice.getBinding(queueToDelete);
final SimpleString unPrefixedQueueName = removePrefix(queueToDelete);
Binding binding = postOffice.getBinding(unPrefixedQueueName);
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
throw new ActiveMQNonExistentQueueException();
}
server.destroyQueue(queueToDelete, this, true);
server.destroyQueue(unPrefixedQueueName, this, true);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(queueToDelete);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);
if (cleaner != null) {
remotingConnection.removeCloseListener(cleaner);
@ -711,12 +716,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
return server.queueQuery(name);
return server.queueQuery(removePrefix(name));
}
@Override
public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception {
return server.addressQuery(name);
return server.addressQuery(removePrefix(name));
}
@Override