ARTEMIS-1867 FQQN for producers

There's a *slight* semantic change with the behavior of the queue query
and binding query to make them consistent with the address query, namely
that they will return the name of the queue and the name of the address
in every case and the returned names will be not use the FQQN syntax but
will be parsed to reflect their actual names in the broker.
This commit is contained in:
Justin Bertram 2018-11-12 16:05:53 -06:00 committed by Michael Andre Pearce
parent 03c45d6479
commit 61e0354b1a
23 changed files with 806 additions and 264 deletions

View File

@ -20,78 +20,47 @@ import org.apache.activemq.artemis.api.core.SimpleString;
public class CompositeAddress {
public static SimpleString toFullQN(SimpleString address, SimpleString qName) {
public static String SEPARATOR = "::";
public static String toFullyQualified(String address, String qName) {
return new StringBuilder().append(address).append(SEPARATOR).append(qName).toString();
}
public static SimpleString toFullyQualified(SimpleString address, SimpleString qName) {
return address.concat(SEPARATOR).concat(qName);
}
public static String toFullQN(String address, String qName) {
return address + SEPARATOR + qName;
}
public static String SEPARATOR = "::";
private final String address;
private final String queueName;
private final boolean fqqn;
public String getAddress() {
return address;
}
public String getQueueName() {
return queueName;
}
public CompositeAddress(String address, String queueName) {
this.address = address;
this.queueName = queueName;
this.fqqn = address != null && !address.isEmpty();
}
public CompositeAddress(String singleName) {
int index = singleName.indexOf(SEPARATOR);
if (index == -1) {
this.fqqn = false;
this.address = null;
this.queueName = singleName;
} else {
this.fqqn = true;
this.address = singleName.substring(0, index);
this.queueName = singleName.substring(index + 2);
}
}
public boolean isFqqn() {
return fqqn;
}
public static boolean isFullyQualified(String address) {
return address.contains(SEPARATOR);
}
public static CompositeAddress getQueueName(String address) {
int index = address.indexOf(SEPARATOR);
if (index == -1) {
throw new IllegalStateException("Not A Fully Qualified Name");
}
return new CompositeAddress(address.substring(0, index), address.substring(index + 2));
}
public static String extractQueueName(String name) {
int index = name.indexOf(SEPARATOR);
if (index != -1) {
return name.substring(index + 2);
}
return name;
return address == null ? false : address.contains(SEPARATOR);
}
public static SimpleString extractQueueName(SimpleString name) {
return new SimpleString(extractQueueName(name.toString()));
return name == null ? null : new SimpleString(extractQueueName(name.toString()));
}
public static String extractQueueName(String queue) {
if (queue == null) {
return null;
}
int index = queue.indexOf(SEPARATOR);
if (index != -1) {
return queue.substring(index + SEPARATOR.length());
}
return queue;
}
public static SimpleString extractAddressName(SimpleString address) {
return address == null ? null : new SimpleString(extractAddressName(address.toString()));
}
public static String extractAddressName(String address) {
String[] split = address.split(SEPARATOR);
return split[0];
if (address == null) {
return null;
}
int index = address.indexOf(SEPARATOR);
if (index != -1) {
return address.substring(0, index);
}
return address;
}
}

View File

@ -317,9 +317,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
//find out if we have an address made up of the address and queue name, if yes then set queue name
if (CompositeAddress.isFullyQualified(source.getAddress())) {
CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress());
addressToUse = new SimpleString(compositeAddress.getAddress());
queueNameToUse = new SimpleString(compositeAddress.getQueueName());
addressToUse = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
queueNameToUse = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
} else {
addressToUse = new SimpleString(source.getAddress());
}

View File

@ -239,9 +239,8 @@ public class AMQSession implements SessionCallback {
SimpleString addressToUse = queueName;
RoutingType routingTypeToUse = RoutingType.ANYCAST;
if (CompositeAddress.isFullyQualified(queueName.toString())) {
CompositeAddress compositeAddress = CompositeAddress.getQueueName(queueName.toString());
addressToUse = new SimpleString(compositeAddress.getAddress());
queueNameToUse = new SimpleString(compositeAddress.getQueueName());
addressToUse = CompositeAddress.extractAddressName(queueName);
queueNameToUse = CompositeAddress.extractQueueName(queueName);
if (bindingQuery.getAddressInfo() != null) {
routingTypeToUse = bindingQuery.getAddressInfo().getRoutingType();
} else {

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.jboss.logging.Logger;
public final class BindingsImpl implements Bindings {
@ -55,7 +56,13 @@ public final class BindingsImpl implements Bindings {
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<>();
private final Map<Long, Binding> bindingsMap = new ConcurrentHashMap<>();
private final Map<Long, Binding> bindingsIdMap = new ConcurrentHashMap<>();
/**
* This is the same as bindingsIdMap but indexed on the binding's uniqueName rather than ID. Two maps are
* maintained to speed routing, otherwise we'd have to loop through the bindingsIdMap when routing to an FQQN.
*/
private final Map<SimpleString, Binding> bindingsNameMap = new ConcurrentHashMap<>();
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<>();
@ -77,6 +84,10 @@ public final class BindingsImpl implements Bindings {
this.name = name;
}
public SimpleString getName() {
return name;
}
@Override
public void setMessageLoadBalancingType(final MessageLoadBalancingType messageLoadBalancingType) {
this.messageLoadBalancingType = messageLoadBalancingType;
@ -89,12 +100,12 @@ public final class BindingsImpl implements Bindings {
@Override
public Collection<Binding> getBindings() {
return bindingsMap.values();
return bindingsIdMap.values();
}
@Override
public void unproposed(SimpleString groupID) {
for (Binding binding : bindingsMap.values()) {
for (Binding binding : bindingsIdMap.values()) {
binding.unproposed(groupID);
}
}
@ -127,7 +138,8 @@ public final class BindingsImpl implements Bindings {
}
}
bindingsMap.put(binding.getID(), binding);
bindingsIdMap.put(binding.getID(), binding);
bindingsNameMap.put(binding.getUniqueName(), binding);
if (logger.isTraceEnabled()) {
logger.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
@ -166,7 +178,8 @@ public final class BindingsImpl implements Bindings {
}
}
bindingsMap.remove(binding.getID());
bindingsIdMap.remove(binding.getID());
bindingsNameMap.remove(binding.getUniqueName());
if (logger.isTraceEnabled()) {
logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
@ -276,7 +289,7 @@ public final class BindingsImpl implements Bindings {
ByteBuffer buffer = ByteBuffer.wrap(ids);
while (buffer.hasRemaining()) {
long id = buffer.getLong();
for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) {
for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
if (entry.getValue() instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
if (remoteQueueBinding.getRemoteQueueID() == id) {
@ -309,6 +322,11 @@ public final class BindingsImpl implements Bindings {
} else if (groupingHandler != null && groupRouting && groupId != null) {
context.clear();
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
if (theBinding != null) {
theBinding.route(message, context);
}
} else {
// in a optimization, we are reusing the previous context if everything is right for it
// so the simpleRouting will only happen if neededk
@ -596,10 +614,10 @@ public final class BindingsImpl implements Bindings {
out.println("bindingsMap:");
if (bindingsMap.isEmpty()) {
if (bindingsIdMap.isEmpty()) {
out.println("\tEMPTY!");
}
for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) {
for (Map.Entry<Long, Binding> entry : bindingsIdMap.entrySet()) {
out.println("\tkey=" + entry.getKey() + ", value=" + entry.getValue());
}
@ -639,7 +657,7 @@ public final class BindingsImpl implements Bindings {
while (buff.hasRemaining()) {
long bindingID = buff.getLong();
Binding binding = bindingsMap.get(bindingID);
Binding binding = bindingsIdMap.get(bindingID);
if (binding != null) {
if (idsToAckList.contains(bindingID)) {
binding.routeWithAck(message, context);

View File

@ -83,6 +83,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
@ -1211,7 +1212,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// Private -----------------------------------------------------------------
private void setPagingStore(SimpleString address, Message message) throws Exception {
PagingStore store = pagingManager.getPageStore(address);
PagingStore store = pagingManager.getPageStore(CompositeAddress.extractAddressName(address));
message.setContext(store);
}
@ -1703,7 +1704,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public Bindings createBindings(final SimpleString address) {
GroupingHandler groupingHandler = server.getGroupingHandler();
BindingsImpl bindings = new BindingsImpl(address, groupingHandler);
BindingsImpl bindings = new BindingsImpl(CompositeAddress.extractAddressName(address), groupingHandler);
if (groupingHandler != null) {
groupingHandler.addListener(bindings);
}

View File

@ -107,7 +107,7 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Bindings getBindingsForRoutingAddress(final SimpleString address) throws Exception {
return mappings.get(address);
return mappings.get(CompositeAddress.extractAddressName(address));
}
@Override
@ -122,9 +122,10 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Bindings getMatchingBindings(final SimpleString address) throws Exception {
Address add = new AddressImpl(address, wildcardConfiguration);
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Address add = new AddressImpl(realAddress, wildcardConfiguration);
Bindings bindings = bindingsFactory.createBindings(address);
Bindings bindings = bindingsFactory.createBindings(realAddress);
for (Binding binding : nameMap.values()) {
Address addCheck = new AddressImpl(binding.getAddress(), wildcardConfiguration);
@ -139,10 +140,11 @@ public class SimpleAddressManager implements AddressManager {
@Override
public Bindings getDirectBindings(final SimpleString address) throws Exception {
Bindings bindings = bindingsFactory.createBindings(address);
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = bindingsFactory.createBindings(realAddress);
for (Binding binding : nameMap.values()) {
if (binding.getAddress().equals(address)) {
if (binding.getAddress().equals(realAddress)) {
bindings.addBinding(binding);
}
}
@ -152,11 +154,11 @@ public class SimpleAddressManager implements AddressManager {
@Override
public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Binding binding = getBinding(realAddress);
Binding binding = getBinding(address);
if (binding == null || !(binding instanceof LocalQueueBinding) || !binding.getAddress().equals(address)) {
Bindings bindings = mappings.get(address);
if (binding == null || !(binding instanceof LocalQueueBinding) || !binding.getAddress().equals(realAddress)) {
Bindings bindings = mappings.get(realAddress);
if (bindings != null) {
for (Binding theBinding : bindings.getBindings()) {
if (theBinding instanceof LocalQueueBinding) {
@ -174,9 +176,10 @@ public class SimpleAddressManager implements AddressManager {
public SimpleString getMatchingQueue(final SimpleString address,
final SimpleString queueName,
RoutingType routingType) throws Exception {
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Binding binding = getBinding(queueName);
if (binding != null && !binding.getAddress().equals(address) && !address.toString().isEmpty()) {
if (binding != null && !binding.getAddress().equals(realAddress) && !realAddress.toString().isEmpty()) {
throw new IllegalStateException("queue belongs to address" + binding.getAddress());
}
return binding != null ? binding.getUniqueName() : null;
@ -196,13 +199,14 @@ public class SimpleAddressManager implements AddressManager {
}
protected void removeBindingInternal(final SimpleString address, final SimpleString bindableName) {
Bindings bindings = mappings.get(address);
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = mappings.get(realAddress);
if (bindings != null) {
removeMapping(bindableName, bindings);
if (bindings.getBindings().isEmpty()) {
mappings.remove(address);
mappings.remove(realAddress);
}
}
}
@ -227,14 +231,15 @@ public class SimpleAddressManager implements AddressManager {
}
protected boolean addMappingInternal(final SimpleString address, final Binding binding) throws Exception {
Bindings bindings = mappings.get(address);
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = mappings.get(realAddress);
Bindings prevBindings = null;
if (bindings == null) {
bindings = bindingsFactory.createBindings(address);
bindings = bindingsFactory.createBindings(realAddress);
prevBindings = mappings.putIfAbsent(address, bindings);
prevBindings = mappings.putIfAbsent(realAddress, bindings);
if (prevBindings != null) {
bindings = prevBindings;
@ -273,11 +278,11 @@ public class SimpleAddressManager implements AddressManager {
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
EnumSet<RoutingType> routingTypes) throws Exception {
AddressInfo info = addressInfoMap.get(addressName);
SimpleString realAddressName = CompositeAddress.extractAddressName(addressName);
AddressInfo info = addressInfoMap.get(realAddressName);
if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(realAddressName);
}
if (routingTypes == null || isEquals(routingTypes, info.getRoutingTypes())) {
@ -285,7 +290,7 @@ public class SimpleAddressManager implements AddressManager {
return info;
}
validateRoutingTypes(addressName, routingTypes);
validateRoutingTypes(realAddressName, routingTypes);
final EnumSet<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
info.setRoutingTypes(updatedRoutingTypes);
@ -340,16 +345,16 @@ public class SimpleAddressManager implements AddressManager {
@Override
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
return addressInfoMap.remove(address);
return addressInfoMap.remove(CompositeAddress.extractAddressName(address));
}
@Override
public AddressInfo getAddressInfo(SimpleString addressName) {
return addressInfoMap.get(addressName);
return addressInfoMap.get(CompositeAddress.extractAddressName(addressName));
}
@Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
getBindingsForRoutingAddress(address).setMessageLoadBalancingType(messageLoadBalancingType);
getBindingsForRoutingAddress(CompositeAddress.extractAddressName(address)).setMessageLoadBalancingType(messageLoadBalancingType);
}
}

View File

@ -441,4 +441,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229215, value = "Cannot delete queue {0} on binding {1} - it has {2} messages", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException cannotDeleteQueueWithMessages(SimpleString name, SimpleString queueName, long messageCount);
@Message(id = 229216, value = "Invalid queue name: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException invalidQueueName(SimpleString queueName);
}

View File

@ -423,7 +423,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.securityManager = securityManager;
addressSettingsRepository = new HierarchicalObjectRepository<>(configuration.getWildcardConfiguration());
addressSettingsRepository = new HierarchicalObjectRepository<>(configuration.getWildcardConfiguration(), new HierarchicalObjectRepository.MatchModifier() {
@Override
public String modify(String input) {
return CompositeAddress.extractAddressName(input);
}
});
addressSettingsRepository.setDefault(new AddressSettings());
@ -840,9 +845,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
}
CompositeAddress addressKey = new CompositeAddress(address.toString());
String realAddress = addressKey.isFqqn() ? addressKey.getAddress() : addressKey.getQueueName();
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(realAddress);
SimpleString realAddress = CompositeAddress.extractAddressName(address);
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(realAddress.toString());
boolean autoCreateQeueus = addressSettings.isAutoCreateQueues();
boolean autoCreateAddresses = addressSettings.isAutoCreateAddresses();
@ -859,26 +863,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// make an exception for the management address (see HORNETQ-29)
ManagementService managementService = getManagementService();
SimpleString bindAddress = new SimpleString(realAddress);
if (managementService != null) {
if (bindAddress.equals(managementService.getManagementAddress())) {
if (realAddress.equals(managementService.getManagementAddress())) {
return new BindingQueryResult(true, null, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch);
}
}
Bindings bindings = getPostOffice().getMatchingBindings(bindAddress);
Bindings bindings = getPostOffice().getMatchingBindings(realAddress);
for (Binding binding : bindings.getBindings()) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
if (addressKey.isFqqn()) {
names.add(new SimpleString(addressKey.getAddress()).concat(CompositeAddress.SEPARATOR).concat(binding.getUniqueName()));
} else {
names.add(binding.getUniqueName());
}
names.add(binding.getUniqueName());
}
}
AddressInfo info = getAddressInfo(bindAddress);
AddressInfo info = getAddressInfo(realAddress);
return new BindingQueryResult(info != null, info, names, autoCreateQeueus, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers, defaultExclusive, defaultLastValue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch);
}
@ -889,12 +888,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
SimpleString realName = CompositeAddress.extractQueueName(name);
final QueueQueryResult response;
Binding binding = getPostOffice().getBinding(name);
Binding binding = getPostOffice().getBinding(realName);
final SimpleString addressName = binding != null && binding.getType() == BindingType.LOCAL_QUEUE
? binding.getAddress() : name;
? binding.getAddress() : CompositeAddress.extractAddressName(name);
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
@ -918,14 +919,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), defaultConsumerWindowSize);
} else if (name.equals(managementAddress)) {
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), defaultConsumerWindowSize);
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, defaultConsumerWindowSize);
} else if (autoCreateQueues) {
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, defaultConsumerWindowSize);
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, defaultConsumerWindowSize);
} else {
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null, null, null, null, null, defaultConsumerWindowSize);
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, defaultConsumerWindowSize);
}
return response;
@ -937,18 +936,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
SimpleString realName = CompositeAddress.extractAddressName(name);
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(realName.toString());
boolean autoCreateAddresses = addressSettings.isAutoCreateAddresses();
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
AddressInfo addressInfo = postOffice.getAddressInfo(name);
AddressInfo addressInfo = postOffice.getAddressInfo(realName);
AddressQueryResult response;
if (addressInfo != null) {
response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
} else {
response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
response = new AddressQueryResult(realName, null, -1, false, false, autoCreateAddresses, defaultPurgeOnNoConsumers, defaultMaxConsumers);
}
return response;
}
@ -3045,12 +3046,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final long delayBeforeDispatch,
final boolean autoCreateAddress,
final boolean configurationManaged) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
SimpleString realQueueName = CompositeAddress.extractQueueName(queueName);
if (realQueueName == null || realQueueName.length() == 0) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueName(queueName);
}
final QueueBinding binding = (QueueBinding) postOffice.getBinding(realQueueName);
if (binding != null) {
if (ignoreIfExists) {
return binding.getQueue();
} else {
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, binding.getAddress());
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(realQueueName, binding.getAddress());
}
}
@ -3061,9 +3068,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueConfig.Builder queueConfigBuilder;
final SimpleString addressToUse = addrInfo == null ? queueName : addrInfo.getName();
final SimpleString addressToUse = (addrInfo == null || addrInfo.getName() == null) ? realQueueName : addrInfo.getName();
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressToUse);
queueConfigBuilder = QueueConfig.builderWith(queueID, realQueueName, addressToUse);
AddressInfo info = postOffice.getAddressInfo(addressToUse);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.PrefixUtil;
import java.util.EnumSet;
@ -58,7 +59,7 @@ public class AddressInfo {
* @param routingTypes
*/
public AddressInfo(SimpleString name, EnumSet<RoutingType> routingTypes) {
this.name = name;
this.name = CompositeAddress.extractAddressName(name);
setRoutingTypes(routingTypes);
}
@ -68,7 +69,7 @@ public class AddressInfo {
* @param routingType
*/
public AddressInfo(SimpleString name, RoutingType routingType) {
this.name = name;
this.name = CompositeAddress.extractAddressName(name);
addRoutingType(routingType);
}

View File

@ -89,6 +89,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -733,7 +734,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public AddressInfo createAddress(final SimpleString address,
EnumSet<RoutingType> routingTypes,
final boolean autoCreated) throws Exception {
Pair<SimpleString, EnumSet<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes);
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Pair<SimpleString, EnumSet<RoutingType>> art = getAddressAndRoutingTypes(realAddress, routingTypes);
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
return server.getAddressInfo(art.getA());

View File

@ -71,6 +71,8 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
*/
private final MatchComparator matchComparator;
private final MatchModifier matchModifier;
private final WildcardConfiguration wildcardConfiguration;
/**
@ -104,8 +106,13 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
}
public HierarchicalObjectRepository(final WildcardConfiguration wildcardConfiguration) {
this(wildcardConfiguration, new MatchModifier() { });
}
public HierarchicalObjectRepository(final WildcardConfiguration wildcardConfiguration, final MatchModifier matchModifier) {
this.wildcardConfiguration = wildcardConfiguration == null ? DEFAULT_WILDCARD_CONFIGURATION : wildcardConfiguration;
this.matchComparator = new MatchComparator(this.wildcardConfiguration);
this.matchModifier = matchModifier;
}
@Override
@ -164,14 +171,15 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
private void addMatch(final String match, final T value, final boolean immutableMatch, boolean notifyListeners) {
lock.writeLock().lock();
try {
String modifiedMatch = matchModifier.modify(match);
clearCache();
if (immutableMatch) {
immutables.add(match);
immutables.add(modifiedMatch);
}
Match.verify(match, wildcardConfiguration);
Match<T> match1 = new Match<>(match, value, wildcardConfiguration);
matches.put(match, match1);
Match.verify(modifiedMatch, wildcardConfiguration);
Match<T> match1 = new Match<>(modifiedMatch, value, wildcardConfiguration);
matches.put(modifiedMatch, match1);
} finally {
lock.writeLock().unlock();
}
@ -195,19 +203,20 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
*/
@Override
public T getMatch(final String match) {
T cacheResult = cache.get(match);
String modifiedMatch = matchModifier.modify(match);
T cacheResult = cache.get(modifiedMatch);
if (cacheResult != null) {
return cacheResult;
}
lock.readLock().lock();
try {
T actualMatch;
Map<String, Match<T>> possibleMatches = getPossibleMatches(match);
Map<String, Match<T>> possibleMatches = getPossibleMatches(modifiedMatch);
Collection<Match<T>> orderedMatches = sort(possibleMatches);
actualMatch = merge(orderedMatches);
T value = actualMatch != null ? actualMatch : defaultmatch;
if (value != null) {
cache.put(match, value);
cache.put(modifiedMatch, value);
}
return value;
} finally {
@ -262,16 +271,17 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
public void removeMatch(final String match) {
lock.writeLock().lock();
try {
boolean isImmutable = immutables.contains(match);
String modMatch = matchModifier.modify(match);
boolean isImmutable = immutables.contains(modMatch);
if (isImmutable) {
logger.debug("Cannot remove match " + match + " since it came from a main config");
logger.debug("Cannot remove match " + modMatch + " since it came from a main config");
} else {
/**
* clear the cache before removing the match. This will force any thread at
* {@link #getMatch(String)} to get the lock to recompute.
*/
clearCache();
matches.remove(match);
matches.remove(modMatch);
onChange();
}
} finally {
@ -387,6 +397,15 @@ public class HierarchicalObjectRepository<T> implements HierarchicalRepository<T
return possibleMatches;
}
/**
* Modifies the match String for any add or get from the repository
*/
public interface MatchModifier {
default String modify(String input) {
return input;
}
}
/**
* Compares to matches to see which one is more specific.
*/

View File

@ -29,16 +29,13 @@ import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Before;
@ -189,9 +186,9 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages());
SimpleString qName = b.getUniqueName();
//do FQQN query
QueueQueryResult result = server.queueQuery(CompositeAddress.toFullQN(multicastAddress, qName));
QueueQueryResult result = server.queueQuery(CompositeAddress.toFullyQualified(multicastAddress, qName));
assertTrue(result.isExists());
assertEquals(result.getName(), CompositeAddress.toFullQN(multicastAddress, qName));
assertEquals(result.getName(), qName);
//do qname query
result = server.queueQuery(qName);
assertTrue(result.isExists());
@ -238,58 +235,75 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
@Test
public void testQueue() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true));
Connection connection = createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue q1 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ1).toString());
javax.jms.Queue q2 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ2).toString());
javax.jms.Queue q3 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ3).toString());
javax.jms.Queue q1 = session.createQueue(CompositeAddress.toFullyQualified(anycastAddress, anycastQ1).toString());
javax.jms.Queue q2 = session.createQueue(CompositeAddress.toFullyQualified(anycastAddress, anycastQ2).toString());
javax.jms.Queue q3 = session.createQueue(CompositeAddress.toFullyQualified(anycastAddress, anycastQ3).toString());
//send 3 messages to anycastAddress
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession coreSession = cf.createSession();
MessageProducer producer1 = session.createProducer(q1);
producer1.send(session.createMessage());
producer1.send(session.createMessage());
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ1).getMessageCount() == 2, 2000, 200));
//send 3 messages
ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
sendMessages(coreSession, coreProducer, 3);
MessageProducer producer2 = session.createProducer(q2);
producer2.send(session.createMessage());
producer2.send(session.createMessage());
producer2.send(session.createMessage());
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ2).getMessageCount() == 3, 2000, 200));
MessageProducer producer3 = session.createProducer(q3);
producer3.send(session.createMessage());
producer3.send(session.createMessage());
producer3.send(session.createMessage());
producer3.send(session.createMessage());
producer3.send(session.createMessage());
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ3).getMessageCount() == 5, 2000, 200));
System.out.println("Queue is: " + q1);
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
MessageConsumer consumer3 = session.createConsumer(q3);
//each consumer receives one
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer1.receive(2000));
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ1).getMessageCount() == 0, 2000, 200));
Queue queue1 = getProxyToQueue(anycastQ1.toString());
Wait.assertEquals(0, queue1::getMessageCount);
Queue queue2 = getProxyToQueue(anycastQ2.toString());
Wait.assertEquals(0, queue2::getMessageCount);
Queue queue3 = getProxyToQueue(anycastQ3.toString());
Wait.assertEquals(0, queue3::getMessageCount);
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer2.receive(2000));
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ2).getMessageCount() == 0, 2000, 200));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ3).getMessageCount() == 0, 2000, 200));
connection.close();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
//FQQN query
final QueueQueryResult query = server.queueQuery(CompositeAddress.toFullQN(anycastAddress, q));
assertTrue(query.isExists());
QueueQueryResult query = server.queueQuery(CompositeAddress.toFullyQualified(anycastAddress, q));
assertTrue(query.isExists() || query.isAutoCreateQueues());
assertEquals(anycastAddress, query.getAddress());
assertEquals(CompositeAddress.toFullQN(anycastAddress, q), query.getName());
assertEquals(q, query.getName());
assertEquals("Message not consumed", 0, query.getMessageCount());
//try query again using qName
QueueQueryResult qNameQuery = server.queueQuery(q);
assertEquals(q, qNameQuery.getName());
query = server.queueQuery(q);
assertEquals(q, query.getName());
}
} finally {
connection.close();
if (locator != null) {
locator.close();
}
}
}
@ -308,7 +322,7 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//::queue ok!
String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
String specialName = CompositeAddress.toFullyQualified(new SimpleString(""), anycastQ1).toString();
javax.jms.Queue q1 = session.createQueue(specialName);
session.createConsumer(q1);
} catch (InvalidDestinationException e) {

View File

@ -78,12 +78,12 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
final int num = 3;
sendMessages(session, producer, num);
ClientConsumer consumer1 = session.createConsumer(toFullQN(mixedAddress, anycastQ1));
ClientConsumer consumer2 = session.createConsumer(toFullQN(mixedAddress, anycastQ2));
ClientConsumer consumer3 = session.createConsumer(toFullQN(mixedAddress, anycastQ3));
ClientConsumer consumer4 = session.createConsumer(toFullQN(mixedAddress, multicastQ1));
ClientConsumer consumer5 = session.createConsumer(toFullQN(mixedAddress, multicastQ2));
ClientConsumer consumer6 = session.createConsumer(toFullQN(mixedAddress, multicastQ3));
ClientConsumer consumer1 = session.createConsumer(CompositeAddress.toFullyQualified(mixedAddress, anycastQ1));
ClientConsumer consumer2 = session.createConsumer(CompositeAddress.toFullyQualified(mixedAddress, anycastQ2));
ClientConsumer consumer3 = session.createConsumer(CompositeAddress.toFullyQualified(mixedAddress, anycastQ3));
ClientConsumer consumer4 = session.createConsumer(CompositeAddress.toFullyQualified(mixedAddress, multicastQ1));
ClientConsumer consumer5 = session.createConsumer(CompositeAddress.toFullyQualified(mixedAddress, multicastQ2));
ClientConsumer consumer6 = session.createConsumer(CompositeAddress.toFullyQualified(mixedAddress, multicastQ3));
session.start();
@ -122,10 +122,10 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3, multicastQ1, multicastQ2, multicastQ3}) {
QueueQueryResult query = server.queueQuery(toFullQN(mixedAddress, q));
QueueQueryResult query = server.queueQuery(CompositeAddress.toFullyQualified(mixedAddress, q));
assertTrue(query.isExists());
assertEquals(mixedAddress, query.getAddress());
assertEquals(toFullQN(mixedAddress, q), query.getName());
assertEquals(q, query.getName());
assertEquals(0, query.getMessageCount());
}
}
@ -144,9 +144,9 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
ClientProducer producer = session.createProducer(multicastAddress);
sendMessages(session, producer, 1);
ClientConsumer consumer1 = session.createConsumer(toFullQN(multicastAddress, multicastQ1));
ClientConsumer consumer2 = session.createConsumer(toFullQN(multicastAddress, multicastQ2));
ClientConsumer consumer3 = session.createConsumer(toFullQN(multicastAddress, multicastQ3));
ClientConsumer consumer1 = session.createConsumer(CompositeAddress.toFullyQualified(multicastAddress, multicastQ1));
ClientConsumer consumer2 = session.createConsumer(CompositeAddress.toFullyQualified(multicastAddress, multicastQ2));
ClientConsumer consumer3 = session.createConsumer(CompositeAddress.toFullyQualified(multicastAddress, multicastQ3));
session.start();
//each consumer receives one
@ -163,10 +163,10 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
session.commit();
//queues are empty now
for (SimpleString q : new SimpleString[]{multicastQ1, multicastQ2, multicastQ3}) {
QueueQueryResult query = server.queueQuery(toFullQN(multicastAddress, q));
QueueQueryResult query = server.queueQuery(CompositeAddress.toFullyQualified(multicastAddress, q));
assertTrue(query.isExists());
assertEquals(multicastAddress, query.getAddress());
assertEquals(toFullQN(multicastAddress, q), query.getName());
assertEquals(q, query.getName());
assertEquals(0, query.getMessageCount());
}
}
@ -181,33 +181,53 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
ClientSession session = cf.createSession();
session.start();
//send 3 messages
ClientProducer producer = session.createProducer(anycastAddress);
sendMessages(session, producer, 3);
ClientProducer producer1 = session.createProducer(CompositeAddress.toFullyQualified(anycastAddress, anycastQ1).toString());
for (int i = 0; i < 2; i++) {
producer1.send(session.createMessage(false));
}
assertTrue(org.apache.activemq.artemis.junit.Wait.waitFor(() -> server.locateQueue(anycastQ1).getMessageCount() == 2, 2000, 200));
ClientConsumer consumer1 = session.createConsumer(toFullQN(anycastAddress, anycastQ1));
ClientConsumer consumer2 = session.createConsumer(toFullQN(anycastAddress, anycastQ2));
ClientConsumer consumer3 = session.createConsumer(toFullQN(anycastAddress, anycastQ3));
session.start();
ClientProducer producer2 = session.createProducer(CompositeAddress.toFullyQualified(anycastAddress, anycastQ2).toString());
for (int i = 0; i < 3; i++) {
producer2.send(session.createMessage(false));
}
assertTrue(org.apache.activemq.artemis.junit.Wait.waitFor(() -> server.locateQueue(anycastQ2).getMessageCount() == 3, 2000, 200));
//each consumer receives one
ClientMessage m = consumer1.receive(2000);
assertNotNull(m);
m.acknowledge();
m = consumer2.receive(2000);
assertNotNull(m);
m.acknowledge();
m = consumer3.receive(2000);
assertNotNull(m);
m.acknowledge();
ClientProducer producer3 = session.createProducer(CompositeAddress.toFullyQualified(anycastAddress, anycastQ3).toString());
for (int i = 0; i < 5; i++) {
producer3.send(session.createMessage(false));
}
assertTrue(org.apache.activemq.artemis.junit.Wait.waitFor(() -> server.locateQueue(anycastQ3).getMessageCount() == 5, 2000, 200));
ClientConsumer consumer1 = session.createConsumer(CompositeAddress.toFullyQualified(anycastAddress, anycastQ1));
ClientConsumer consumer2 = session.createConsumer(CompositeAddress.toFullyQualified(anycastAddress, anycastQ2));
ClientConsumer consumer3 = session.createConsumer(CompositeAddress.toFullyQualified(anycastAddress, anycastQ3));
ClientMessage m = null;
for (int i = 0; i < 2; i++) {
m = consumer1.receive(2000);
assertNotNull(m);
m.acknowledge();
}
for (int i = 0; i < 3; i++) {
m = consumer2.receive(2000);
assertNotNull(m);
m.acknowledge();
}
for (int i = 0; i < 5; i++) {
m = consumer3.receive(2000);
assertNotNull(m);
m.acknowledge();
}
session.commit();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
QueueQueryResult query = server.queueQuery(toFullQN(anycastAddress, q));
QueueQueryResult query = server.queueQuery(CompositeAddress.toFullyQualified(anycastAddress, q));
assertTrue(query.isExists());
assertEquals(anycastAddress, query.getAddress());
assertEquals(toFullQN(anycastAddress, q), query.getName());
assertEquals(q, query.getName());
assertEquals(0, query.getMessageCount());
}
}
@ -224,7 +244,7 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
sendMessages(session, producer, 1);
//::queue
ClientConsumer consumer1 = session.createConsumer(toFullQN(new SimpleString(""), anycastQ1));
ClientConsumer consumer1 = session.createConsumer(CompositeAddress.toFullyQualified(new SimpleString(""), anycastQ1));
session.start();
ClientMessage m = consumer1.receive(2000);
@ -236,7 +256,7 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
try {
//queue::
session.createConsumer(toFullQN(anycastQ1, new SimpleString("")));
session.createConsumer(CompositeAddress.toFullyQualified(anycastQ1, new SimpleString("")));
fail("should get exception");
} catch (ActiveMQNonExistentQueueException e) {
//expected.
@ -244,14 +264,10 @@ public class FullQualifiedQueueTest extends ActiveMQTestBase {
try {
//::
session.createConsumer(toFullQN(new SimpleString(""), new SimpleString("")));
session.createConsumer(CompositeAddress.toFullyQualified(new SimpleString(""), new SimpleString("")));
fail("should get exception");
} catch (ActiveMQNonExistentQueueException e) {
//expected.
}
}
private SimpleString toFullQN(SimpleString address, SimpleString qName) {
return address.concat(CompositeAddress.SEPARATOR).concat(qName);
}
}

View File

@ -236,7 +236,7 @@ public class SessionTest extends ActiveMQTestBase {
QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
Assert.assertFalse(resp.isExists());
Assert.assertFalse(resp.isAutoCreateQueues());
Assert.assertEquals(null, resp.getAddress());
Assert.assertEquals(queueName, resp.getAddress().toString());
clientSession.close();
}

View File

@ -73,7 +73,7 @@ public class MQTTFQQNTest extends MQTTTestSupport {
result = server.queueQuery(new SimpleString("foo.bah::" + b.getUniqueName()));
assertTrue(result.isExists());
assertEquals(new SimpleString("foo.bah"), result.getAddress());
assertEquals(new SimpleString("foo.bah::" + b.getUniqueName()), result.getName());
assertEquals(b.getUniqueName(), result.getName());
} finally {
subscriptionProvider.disconnect();
}
@ -162,7 +162,7 @@ public class MQTTFQQNTest extends MQTTTestSupport {
QueueQueryResult result = server.queueQuery(new SimpleString("::" + b.getUniqueName()));
assertTrue(result.isExists());
assertEquals(new SimpleString("foo.bah"), result.getAddress());
assertEquals(new SimpleString("::" + b.getUniqueName()), result.getName());
assertEquals(b.getUniqueName(), result.getName());
//check queue::
result = server.queueQuery(new SimpleString(b.getUniqueName() + "::"));

View File

@ -27,27 +27,25 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
/**
* Verify FQQN queues work with openwire/artemis JMS API
*/
@ -111,9 +109,9 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages());
SimpleString qName = b.getUniqueName();
//do FQQN query
QueueQueryResult result = server.queueQuery(CompositeAddress.toFullQN(multicastAddress, qName));
QueueQueryResult result = server.queueQuery(CompositeAddress.toFullyQualified(multicastAddress, qName));
assertTrue(result.isExists());
assertEquals(result.getName(), CompositeAddress.toFullQN(multicastAddress, qName));
assertEquals(result.getName(), qName);
//do qname query
result = server.queueQuery(qName);
assertTrue(result.isExists());
@ -163,27 +161,35 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
//that it is possible for jms clients to receive from
//core queues by its FQQN.
public void testQueue() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true);
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true);
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true));
Connection connection = factory.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q1 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ1).toString());
Queue q2 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ2).toString());
Queue q3 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ3).toString());
Queue q1 = session.createQueue(CompositeAddress.toFullyQualified(anycastAddress, anycastQ1).toString());
Queue q2 = session.createQueue(CompositeAddress.toFullyQualified(anycastAddress, anycastQ2).toString());
Queue q3 = session.createQueue(CompositeAddress.toFullyQualified(anycastAddress, anycastQ3).toString());
//send 3 messages to anycastAddress
locator = createNonHALocator(true);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession coreSession = cf.createSession();
MessageProducer producer1 = session.createProducer(q1);
producer1.send(session.createMessage());
producer1.send(session.createMessage());
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ1).getMessageCount() == 2, 2000, 200));
//send 3 messages
ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
sendMessages(coreSession, coreProducer, 3);
MessageProducer producer2 = session.createProducer(q2);
producer2.send(session.createMessage());
producer2.send(session.createMessage());
producer2.send(session.createMessage());
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ2).getMessageCount() == 3, 2000, 200));
MessageProducer producer3 = session.createProducer(q3);
producer3.send(session.createMessage());
producer3.send(session.createMessage());
producer3.send(session.createMessage());
producer3.send(session.createMessage());
producer3.send(session.createMessage());
assertTrue(Wait.waitFor(() -> server.locateQueue(anycastQ3).getMessageCount() == 5, 2000, 200));
System.out.println("Queue is: " + q1);
MessageConsumer consumer1 = session.createConsumer(q1);
@ -192,17 +198,26 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
//each consumer receives one
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
assertNotNull(consumer3.receive(2000));
connection.close();
//queues are empty now
for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) {
//FQQN query
QueueQueryResult query = server.queueQuery(CompositeAddress.toFullQN(anycastAddress, q));
assertTrue(query.isExists());
QueueQueryResult query = server.queueQuery(CompositeAddress.toFullyQualified(anycastAddress, q));
assertTrue(query.isExists() || query.isAutoCreateQueues());
assertEquals(anycastAddress, query.getAddress());
assertEquals(CompositeAddress.toFullQN(anycastAddress, q), query.getName());
assertEquals(q, query.getName());
assertEquals(0, query.getMessageCount());
//try query again using qName
query = server.queueQuery(q);
@ -240,7 +255,7 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
producer.send(message);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(durableQueue, durableQueue).toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(durableQueue, durableQueue).toString());
MessageConsumer messageConsumer = session.createConsumer(destinationFQN);
@ -322,7 +337,7 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
Destination destination = session.createTopic(topic.toString());
MessageProducer producer = session.createProducer(destination);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
@ -368,7 +383,7 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
Destination destination = session.createTopic(topic.toString());
MessageProducer producer = session.createProducer(destination);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
@ -415,7 +430,7 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
Destination destination = session.createTopic(topic.toString());
MessageProducer producer = session.createProducer(destination);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
@ -460,7 +475,7 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
@ -512,7 +527,7 @@ public class FQQNOpenWireTest extends OpenWireTestBase {
Destination destination = session.createTopic(topic.toString());
MessageProducer producer = session.createProducer(destination);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullQN(topic, subscriptionQ).toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);

View File

@ -82,10 +82,10 @@ public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
openwireConnection.start();
Queue q1 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue2"));
Queue q3 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue3"));
Queue q4 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue4"));
Queue q1 = session.createQueue(CompositeAddress.toFullyQualified(forwardAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullyQualified(testAddress, "queue2"));
Queue q3 = session.createQueue(CompositeAddress.toFullyQualified(testAddress, "queue3"));
Queue q4 = session.createQueue(CompositeAddress.toFullyQualified(testAddress, "queue4"));
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
@ -146,8 +146,8 @@ public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
producer.send(message);
}
Queue q1 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue2"));
Queue q1 = session.createQueue(CompositeAddress.toFullyQualified(forwardAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullyQualified(testAddress, "queue2"));
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);

View File

@ -83,8 +83,8 @@ public class OpenWireDivertNonExclusiveTest extends OpenWireDivertTestBase {
openwireConnection.start();
Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2"));
Queue q1 = session.createQueue(CompositeAddress.toFullyQualified(testAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullyQualified(forwardAddress, "queue2"));
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);
@ -153,8 +153,8 @@ public class OpenWireDivertNonExclusiveTest extends OpenWireDivertTestBase {
producer.send(message);
}
Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2"));
Queue q1 = session.createQueue(CompositeAddress.toFullyQualified(testAddress, "queue1"));
Queue q2 = session.createQueue(CompositeAddress.toFullyQualified(forwardAddress, "queue2"));
MessageConsumer consumer1 = session.createConsumer(q1);
MessageConsumer consumer2 = session.createConsumer(q2);

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class AddressQueryTest extends ActiveMQTestBase {
private ActiveMQServer server;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(false);
server.start();
}
@Test
public void testAddressQueryDefaultsOnStaticAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.addAddressInfo(new AddressInfo(addressName));
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertTrue(addressQueryResult.isExists());
assertFalse(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertFalse(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertEquals(addressName, addressQueryResult.getName());
assertTrue(addressQueryResult.isAutoCreateAddresses());
assertEquals(-1, addressQueryResult.getDefaultMaxConsumers());
assertFalse(addressQueryResult.isAutoCreated());
assertFalse(addressQueryResult.isDefaultPurgeOnNoConsumers());
}
@Test
public void testAddressQueryOnStaticAddressWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(SimpleString.toSimpleString(UUID.randomUUID().toString()));
server.addAddressInfo(new AddressInfo(fqqn));
assertEquals(addressName, server.addressQuery(addressName).getName());
assertEquals(addressName, server.addressQuery(fqqn).getName());
}
@Test
public void testAddressQueryNonExistentAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertFalse(addressQueryResult.isExists());
assertEquals(addressName, addressQueryResult.getName());
}
@Test
public void testAddressQueryNonExistentAddressWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(SimpleString.toSimpleString(UUID.randomUUID().toString()));
AddressQueryResult addressQueryResult = server.addressQuery(fqqn);
assertFalse(addressQueryResult.isExists());
assertEquals(addressName, addressQueryResult.getName());
}
@Test
public void testAddressQueryNonDefaultsOnStaticAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setAutoCreateAddresses(false).setDefaultMaxConsumers(1).setDefaultPurgeOnNoConsumers(true));
server.addAddressInfo(new AddressInfo(addressName).addRoutingType(RoutingType.ANYCAST));
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertFalse(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertEquals(addressName, addressQueryResult.getName());
assertFalse(addressQueryResult.isAutoCreateAddresses());
assertEquals(1, addressQueryResult.getDefaultMaxConsumers());
assertFalse(addressQueryResult.isAutoCreated());
assertTrue(addressQueryResult.isDefaultPurgeOnNoConsumers());
}
@Test
public void testAddressQueryDefaultsOnAutoCreatedAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings());
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
JMSContext c = cf.createContext();
c.createProducer().send(c.createTopic(addressName.toString()), c.createMessage());
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertTrue(addressQueryResult.isExists());
assertFalse(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertEquals(addressName, addressQueryResult.getName());
assertTrue(addressQueryResult.isAutoCreateAddresses());
assertEquals(-1, addressQueryResult.getDefaultMaxConsumers());
assertTrue(addressQueryResult.isAutoCreated());
assertFalse(addressQueryResult.isDefaultPurgeOnNoConsumers());
}
@Test
public void testAddressQueryOnAutoCreatedAddressWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(SimpleString.toSimpleString(UUID.randomUUID().toString()));
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
JMSContext c = cf.createContext();
c.createProducer().send(c.createTopic(fqqn.toString()), c.createMessage());
assertEquals(addressName, server.addressQuery(addressName).getName());
assertEquals(addressName, server.addressQuery(fqqn).getName());
}
@Test
public void testAddressQueryNonDefaultsOnAutoCreatedAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setAutoCreateAddresses(true).setDefaultMaxConsumers(1).setDefaultPurgeOnNoConsumers(true));
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
JMSContext c = cf.createContext();
c.createProducer().send(c.createTopic(addressName.toString()), c.createMessage());
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertEquals(addressName, addressQueryResult.getName());
assertTrue(addressQueryResult.isAutoCreateAddresses());
assertEquals(1, addressQueryResult.getDefaultMaxConsumers());
assertTrue(addressQueryResult.isAutoCreated());
assertTrue(addressQueryResult.isDefaultPurgeOnNoConsumers());
}
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import javax.jms.JMSContext;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class QueueQueryTest extends ActiveMQTestBase {
private ActiveMQServer server;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(false);
server.start();
}
@Test
public void testQueueQueryDefaultsOnStaticQueue() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.createQueue(addressName, RoutingType.MULTICAST, queueName, null, true, false);
QueueQueryResult queueQueryResult = server.queueQuery(queueName);
assertTrue(queueQueryResult.isExists());
assertEquals(RoutingType.MULTICAST, queueQueryResult.getRoutingType());
assertEquals(queueName, queueQueryResult.getName());
assertTrue(queueQueryResult.isAutoCreateQueues());
assertEquals(null, queueQueryResult.getFilterString());
assertFalse(queueQueryResult.isAutoCreated());
assertEquals(addressName, queueQueryResult.getAddress());
assertEquals(0, queueQueryResult.getMessageCount());
assertEquals(0, queueQueryResult.getConsumerCount());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_MAX_QUEUE_CONSUMERS, queueQueryResult.getMaxConsumers());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_CONSUMERS_BEFORE_DISPATCH, queueQueryResult.getConsumersBeforeDispatch().intValue());
assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, queueQueryResult.getDefaultConsumerWindowSize().intValue());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_DELAY_BEFORE_DISPATCH, queueQueryResult.getDelayBeforeDispatch().longValue());
assertEquals(null, queueQueryResult.getLastValueKey());
assertTrue(queueQueryResult.isDurable());
assertFalse(queueQueryResult.isPurgeOnNoConsumers());
assertFalse(queueQueryResult.isTemporary());
assertFalse(queueQueryResult.isExclusive());
assertFalse(queueQueryResult.isNonDestructive());
}
@Test
public void testQueueQueryOnStaticQueueWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(queueName);
server.createQueue(fqqn, RoutingType.MULTICAST, fqqn, null, true, false);
QueueQueryResult queueQueryResult = server.queueQuery(fqqn);
assertEquals(queueName, queueQueryResult.getName());
assertEquals(addressName, queueQueryResult.getAddress());
queueQueryResult = server.queueQuery(queueName);
assertEquals(queueName, queueQueryResult.getName());
assertEquals(addressName, queueQueryResult.getAddress());
}
@Test
public void testQueueQueryNonDefaultsOnStaticQueue() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString filter = SimpleString.toSimpleString("x = 'y'");
SimpleString lastValueKey = SimpleString.toSimpleString("myLastValueKey");
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings()
.setAutoCreateAddresses(true)
.setDefaultMaxConsumers(1)
.setDefaultPurgeOnNoConsumers(true)
.setDefaultConsumersBeforeDispatch(13)
.setDefaultConsumerWindowSize(51)
.setDefaultDelayBeforeDispatch(19L)
.setDefaultLastValueQueue(true)
.setDefaultLastValueKey(lastValueKey)
.setDefaultExclusiveQueue(true)
.setDefaultNonDestructive(true));
server.createQueue(addressName, RoutingType.ANYCAST, queueName, filter, false, true);
QueueQueryResult queueQueryResult = server.queueQuery(queueName);
assertTrue(queueQueryResult.isExists());
assertEquals(RoutingType.ANYCAST, queueQueryResult.getRoutingType());
assertEquals(queueName, queueQueryResult.getName());
assertTrue(queueQueryResult.isAutoCreateQueues());
assertEquals(filter, queueQueryResult.getFilterString());
assertFalse(queueQueryResult.isAutoCreated());
assertEquals(addressName, queueQueryResult.getAddress());
assertEquals(0, queueQueryResult.getMessageCount());
assertEquals(0, queueQueryResult.getConsumerCount());
assertEquals(1, queueQueryResult.getMaxConsumers());
assertEquals(13, queueQueryResult.getConsumersBeforeDispatch().intValue());
assertEquals(51, queueQueryResult.getDefaultConsumerWindowSize().intValue());
assertEquals(19L, queueQueryResult.getDelayBeforeDispatch().longValue());
assertTrue(queueQueryResult.isLastValue());
assertEquals(lastValueKey, queueQueryResult.getLastValueKey());
assertFalse(queueQueryResult.isDurable());
assertTrue(queueQueryResult.isPurgeOnNoConsumers());
assertTrue(queueQueryResult.isTemporary());
assertTrue(queueQueryResult.isExclusive());
assertTrue(queueQueryResult.isNonDestructive());
}
@Test
public void testQueueQueryDefaultsOnAutoCreatedQueue() throws Exception {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.getAddressSettingsRepository().addMatch(queueName.toString(), new AddressSettings());
JMSContext c = new ActiveMQConnectionFactory("vm://0").createContext();
c.createProducer().send(c.createQueue(queueName.toString()), c.createMessage());
QueueQueryResult queueQueryResult = server.queueQuery(queueName);
assertTrue(queueQueryResult.isAutoCreateQueues());
assertEquals(null, queueQueryResult.getFilterString());
assertTrue(queueQueryResult.isAutoCreated());
assertEquals(queueName, queueQueryResult.getAddress());
assertEquals(1, queueQueryResult.getMessageCount());
assertEquals(0, queueQueryResult.getConsumerCount());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_MAX_QUEUE_CONSUMERS, queueQueryResult.getMaxConsumers());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_CONSUMERS_BEFORE_DISPATCH, queueQueryResult.getConsumersBeforeDispatch().intValue());
assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, queueQueryResult.getDefaultConsumerWindowSize().intValue());
assertEquals(ActiveMQDefaultConfiguration.DEFAULT_DELAY_BEFORE_DISPATCH, queueQueryResult.getDelayBeforeDispatch().longValue());
assertEquals(null, queueQueryResult.getLastValueKey());
assertTrue(queueQueryResult.isDurable());
assertFalse(queueQueryResult.isPurgeOnNoConsumers());
assertFalse(queueQueryResult.isTemporary());
assertFalse(queueQueryResult.isExclusive());
assertFalse(queueQueryResult.isNonDestructive());
}
@Test
public void testQueueQueryOnAutoCreatedQueueWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(queueName);
JMSContext c = new ActiveMQConnectionFactory("vm://0").createContext();
c.createProducer().send(c.createQueue(fqqn.toString()), c.createMessage());
QueueQueryResult queueQueryResult = server.queueQuery(fqqn);
assertEquals(queueName, queueQueryResult.getName());
assertEquals(addressName, queueQueryResult.getAddress());
assertEquals(1, queueQueryResult.getMessageCount());
queueQueryResult = server.queueQuery(queueName);
assertEquals(queueName, queueQueryResult.getName());
assertEquals(addressName, queueQueryResult.getAddress());
assertEquals(1, queueQueryResult.getMessageCount());
c.createProducer().send(c.createQueue(addressName.toString()), c.createMessage());
assertEquals(2, server.queueQuery(fqqn).getMessageCount());
assertEquals(2, server.queueQuery(queueName).getMessageCount());
}
@Test
public void testQueueQueryNonDefaultsOnAutoCreatedQueue() throws Exception {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString lastValueKey = SimpleString.toSimpleString("myLastValueKey");
server.getAddressSettingsRepository().addMatch(queueName.toString(), new AddressSettings()
.setAutoCreateAddresses(true)
.setAutoCreateQueues(true)
.setDefaultMaxConsumers(1)
.setDefaultPurgeOnNoConsumers(true)
.setDefaultConsumersBeforeDispatch(13)
.setDefaultConsumerWindowSize(51)
.setDefaultDelayBeforeDispatch(19L)
.setDefaultLastValueQueue(true)
.setDefaultLastValueKey(lastValueKey)
.setDefaultExclusiveQueue(true)
.setDefaultNonDestructive(true));
JMSContext c = new ActiveMQConnectionFactory("vm://0").createContext();
c.createProducer().send(c.createQueue(queueName.toString()), c.createMessage());
QueueQueryResult queueQueryResult = server.queueQuery(queueName);
assertTrue(queueQueryResult.isExists());
assertEquals(RoutingType.ANYCAST, queueQueryResult.getRoutingType());
assertEquals(queueName, queueQueryResult.getName());
assertTrue(queueQueryResult.isAutoCreateQueues());
assertNull(queueQueryResult.getFilterString());
assertTrue(queueQueryResult.isAutoCreated());
assertEquals(queueName, queueQueryResult.getAddress());
assertEquals(0, queueQueryResult.getMessageCount()); // 0 since purgeOnNoConsumers = true
assertEquals(0, queueQueryResult.getConsumerCount());
assertEquals(1, queueQueryResult.getMaxConsumers());
assertEquals(13, queueQueryResult.getConsumersBeforeDispatch().intValue());
assertEquals(51, queueQueryResult.getDefaultConsumerWindowSize().intValue());
assertEquals(19L, queueQueryResult.getDelayBeforeDispatch().longValue());
assertTrue(queueQueryResult.isLastValue());
assertEquals(lastValueKey, queueQueryResult.getLastValueKey());
assertTrue(queueQueryResult.isDurable());
assertTrue(queueQueryResult.isPurgeOnNoConsumers());
assertFalse(queueQueryResult.isTemporary());
assertTrue(queueQueryResult.isExclusive());
assertTrue(queueQueryResult.isNonDestructive());
}
@Test
public void testQueueQueryNonExistentQueue() throws Exception {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
QueueQueryResult queueQueryResult = server.queueQuery(queueName);
assertFalse(queueQueryResult.isExists());
assertEquals(queueName, queueQueryResult.getName());
}
@Test
public void testQueueQueryNonExistentQueueWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(queueName);
QueueQueryResult queueQueryResult = server.queueQuery(fqqn);
assertFalse(queueQueryResult.isExists());
assertEquals(queueName, queueQueryResult.getName());
assertEquals(addressName, queueQueryResult.getAddress());
}
}

View File

@ -19,11 +19,15 @@ package org.apache.activemq.artemis.tests.integration.stomp;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -82,6 +86,94 @@ public class FQQNStompTest extends StompTestBase {
unsubscribe(conn, "sub-01");
}
@Test
public void testReceiveFQQN2() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
final SimpleString q2Name = SimpleString.toSimpleString("q2");
Queue q1 = server.createQueue(myAddress, RoutingType.MULTICAST, q1Name, null, true, false);
Queue q2 = server.createQueue(myAddress, RoutingType.MULTICAST, q2Name, null, true, false);
sendJmsMessage("Hello World!", ActiveMQJMSClient.createTopic(myAddress.toString()));
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 1, 2000, 100));
conn.connect(defUser, defPass);
subscribeQueue(conn, "sub-01", myAddress + "\\c\\c" + q1Name);
ClientStompFrame frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 0, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 1, 2000, 100));
unsubscribe(conn, "sub-01");
}
@Test
public void testSendFQQNMulticast() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
final SimpleString q2Name = SimpleString.toSimpleString("q2");
Queue q1 = server.createQueue(myAddress, RoutingType.MULTICAST, q1Name, null, true, false);
Queue q2 = server.createQueue(myAddress, RoutingType.MULTICAST, q2Name, null, true, false);
conn.connect(defUser, defPass);
send(conn, myAddress + "\\c\\c" + q1Name, null, "Hello World!");
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 0, 2000, 100));
subscribeQueue(conn, "sub-01", myAddress + "\\c\\c" + q1Name);
ClientStompFrame frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 0, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 0, 2000, 100));
unsubscribe(conn, "sub-01");
}
@Test
public void testSendFQQNAnycast() throws Exception {
final SimpleString myAddress = SimpleString.toSimpleString("myAddress");
final SimpleString q1Name = SimpleString.toSimpleString("q1");
final SimpleString q2Name = SimpleString.toSimpleString("q2");
Queue q1 = server.createQueue(myAddress, RoutingType.ANYCAST, q1Name, null, true, false);
Queue q2 = server.createQueue(myAddress, RoutingType.ANYCAST, q2Name, null, true, false);
conn.connect(defUser, defPass);
send(conn, myAddress.toString(), null, "Hello World!", false, RoutingType.ANYCAST);
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 1, 2000, 100));
send(conn, myAddress.toString(), null, "Hello World!", false, RoutingType.ANYCAST);
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 1, 2000, 100));
send(conn, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST);
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 2, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 1, 2000, 100));
send(conn, myAddress + "\\c\\c" + q1Name, null, "Hello World!", false, RoutingType.ANYCAST);
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 3, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 1, 2000, 100));
subscribeQueue(conn, "sub-01", myAddress + "\\c\\c" + q1Name);
ClientStompFrame frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
frame = conn.receiveFrame(2000);
assertNotNull(frame);
assertEquals("Hello World!", frame.getBody());
assertTrue(Wait.waitFor(() -> q1.getMessageCount() == 0, 2000, 100));
assertTrue(Wait.waitFor(() -> q2.getMessageCount() == 1, 2000, 100));
unsubscribe(conn, "sub-01");
}
@Test
public void testReceiveFQQNSpecial() throws Exception {
conn.connect(defUser, defPass);
@ -99,7 +191,7 @@ public class FQQNStompTest extends StompTestBase {
assertNotNull(frame);
assertEquals("ERROR", frame.getCommand());
assertTrue(frame.getBody().contains(getQueueName()));
assertTrue(frame.getBody().contains("not exist"));
assertTrue(frame.getBody().contains("Invalid"));
conn.closeTransport();
//need reconnect because stomp disconnect on error
@ -108,7 +200,7 @@ public class FQQNStompTest extends StompTestBase {
//:: will subscribe to no queue so no message received.
frame = subscribeQueue(conn, "sub-01", "\\c\\c");
assertTrue(frame.getBody().contains("Queue :: does not exist"));
assertTrue(frame.getBody().contains("Invalid queue name: ::"));
}
}

View File

@ -20,6 +20,7 @@ import javax.transaction.xa.Xid;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@ -292,6 +293,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
final SimpleString name;
final SimpleString uniqueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
FakeBinding(final SimpleString name) {
this.name = name;
@ -363,7 +365,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
*/
@Override
public SimpleString getUniqueName() {
return null;
return uniqueName;
}
@Override

View File

@ -200,7 +200,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindignsFake();
return new BindingsFake();
}
}
@ -302,7 +302,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
}
}
class BindignsFake implements Bindings {
class BindingsFake implements Bindings {
ArrayList<Binding> bindings = new ArrayList<>();