ARTEMIS-3033 - implement address tree map for wildcards in place of linked addresses

This commit is contained in:
gtully 2020-11-26 13:22:59 +00:00
parent 4e70fcdb52
commit 546bbfebfb
20 changed files with 1909 additions and 244 deletions

View File

@ -47,6 +47,8 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
// Cache the string
private transient String str;
private transient String[] paths;
// Static
// ----------------------------------------------------------------------
@ -281,6 +283,35 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return str;
}
/**
* note the result of the first use is cached, the separator is configured on
* the postoffice so will be static for the duration of a server instance.
* calling with different separator values could give invalid results
*
* @param separator value from wildcardConfiguration
* @return String[] reference to the split paths or the cached value if previously called
*/
public String[] getPaths(final char separator) {
if (paths != null) {
return paths;
}
List<String> pathsList = new ArrayList<>();
StringBuilder pathAccumulator = new StringBuilder();
for (char c : toString().toCharArray()) {
if (c == separator) {
pathsList.add(pathAccumulator.toString());
pathAccumulator.delete(0, pathAccumulator.length());
} else {
pathAccumulator.append(c);
}
}
pathsList.add(pathAccumulator.toString());
paths = new String[pathsList.size()];
pathsList.toArray(paths);
return paths;
}
@Override
public boolean equals(final Object other) {
if (this == other) {

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString;
/**
@ -31,11 +29,5 @@ public interface Address {
boolean containsWildCard();
Collection<Address> getLinkedAddresses();
void addLinkedAddress(Address address);
void removeLinkedAddress(Address actualAddress);
boolean matches(Address add);
}

View File

@ -44,6 +44,8 @@ public interface AddressManager {
*/
Binding removeBinding(SimpleString uniqueName, Transaction tx) throws Exception;
Bindings getExistingBindingsForRoutingAddress(SimpleString address) throws Exception;
Bindings getBindingsForRoutingAddress(SimpleString address) throws Exception;
Collection<Binding> getMatchingBindings(SimpleString address) throws Exception;

View File

@ -36,6 +36,8 @@ public interface Bindings extends UnproposalListener {
Binding removeBindingByUniqueName(SimpleString uniqueName);
SimpleString getName();
void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);
MessageLoadBalancingType getMessageLoadBalancingType();

View File

@ -16,16 +16,11 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jctools.maps.NonBlockingHashSet;
/**
* Splits an address string into its hierarchical parts using {@link WildcardConfiguration#getDelimiter()} as delimiter.
@ -70,31 +65,6 @@ public class AddressImpl implements Address {
return containsWildCard;
}
@Override
public Collection<Address> getLinkedAddresses() {
final Collection<Address> linkedAddresses = this.linkedAddresses;
if (linkedAddresses == null) {
return Collections.emptySet();
}
return linkedAddresses;
}
@Override
public void addLinkedAddress(final Address address) {
if (linkedAddresses == null) {
linkedAddresses = PlatformDependent.hasUnsafe() ? new NonBlockingHashSet<>() : new ConcurrentHashSet<>();
}
linkedAddresses.add(address);
}
@Override
public void removeLinkedAddress(final Address actualAddress) {
if (linkedAddresses == null) {
return;
}
linkedAddresses.remove(actualAddress);
}
/**
* This method should actually be called `isMatchedBy`.
*

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
public class AddressMap<T> {
private final AddressPartNode<T> rootNode;
private final char DELIMITER;
public AddressMap(final String any, String single, char delimiter) {
rootNode = new AddressPartNode<>(any, single);
this.DELIMITER = delimiter;
}
public void put(final SimpleString key, T value) {
rootNode.add(getPaths(key), 0, value);
}
public void remove(final SimpleString key, T value) {
rootNode.remove(getPaths(key), 0, value);
}
public void reset() {
rootNode.reset();
}
public String[] getPaths(final SimpleString address) {
return address.getPaths(DELIMITER);
}
/**
* @param address - a non wildcard to match against wildcards in the map
*/
public void visitMatchingWildcards(SimpleString address,
AddressMapVisitor<T> collector) throws Exception {
final String[] paths = getPaths(address);
rootNode.visitMatchingWildcards(paths, 0, collector);
}
/**
* @param wildcardAddress - a wildcard address to match against non wildcards in the map
*/
public void visitMatching(SimpleString wildcardAddress,
AddressMapVisitor<T> collector) throws Exception {
final String[] paths = getPaths(wildcardAddress);
rootNode.visitNonWildcard(paths, 0, collector);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
public interface AddressMapVisitor<T> {
void visit(T value) throws Exception;
}

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public final class AddressPartNode<T> {
protected final String ANY_CHILD;
protected final String ANY_DESCENDENT;
private final AddressPartNode<T> parent;
private final List<T> values = new CopyOnWriteArrayList<>();
private final Map<String, AddressPartNode<T>> childNodes = new ConcurrentHashMap<>();
private final String path;
public AddressPartNode(final String path, final AddressPartNode<T> parent) {
this.parent = parent;
this.ANY_DESCENDENT = parent.ANY_DESCENDENT;
this.ANY_CHILD = parent.ANY_CHILD;
// allow '==' equality wildcard nodes
if (ANY_DESCENDENT.equals(path)) {
this.path = ANY_DESCENDENT;
} else if (ANY_CHILD.equals(path)) {
this.path = ANY_CHILD;
} else {
this.path = path;
}
}
public AddressPartNode(String anyDescendent, String anyChild) {
ANY_DESCENDENT = anyDescendent;
ANY_CHILD = anyChild;
path = "Root";
parent = null;
}
public AddressPartNode<T> getChild(final String path) {
return childNodes.get(path);
}
public Collection<AddressPartNode<T>> getChildren() {
return childNodes.values();
}
public AddressPartNode<T> getChildOrCreate(final String path) {
AddressPartNode<T> answer = childNodes.get(path);
if (answer == null) {
answer = new AddressPartNode<>(path, this);
childNodes.put(path, answer);
}
return answer;
}
public void add(final String[] paths, final int idx, final T value) {
if (idx >= paths.length) {
values.add(value);
} else {
getChildOrCreate(paths[idx]).add(paths, idx + 1, value);
}
}
public void remove(final String[] paths, final int idx, final T value) {
if (idx >= paths.length) {
values.remove(value);
pruneIfEmpty();
} else {
getChildOrCreate(paths[idx]).remove(paths, idx + 1, value);
}
}
public void visitDescendantNonWildcardValues(final AddressMapVisitor<T> collector) throws Exception {
visitValues(collector);
for (AddressPartNode<T> child : childNodes.values()) {
if (ANY_CHILD == child.getPath() || ANY_DESCENDENT == child.getPath()) {
continue;
}
child.visitDescendantNonWildcardValues(collector);
}
}
public void visitPathTailNonWildcard(final String[] paths,
final int startIndex,
final AddressMapVisitor<T> collector) throws Exception {
if (childNodes.isEmpty()) {
return;
}
// look for a path match after 0-N skips among children
AddressPartNode<T> match = null;
for (int i = startIndex; i < paths.length; i++) {
match = getChild(paths[i]);
if (match != null) {
if (ANY_CHILD == match.getPath() || ANY_DESCENDENT == match.getPath()) {
continue;
}
match.visitNonWildcard(paths, i + 1, collector);
break;
}
}
// walk the rest of the sub tree to find a tail path match
for (AddressPartNode<T> child : childNodes.values()) {
// instance equality arranged in node creation getChildOrCreate
if (child != match && ANY_DESCENDENT != child.getPath() && ANY_CHILD != child.getPath()) {
child.visitPathTailNonWildcard(paths, startIndex, collector);
}
}
}
public void visitPathTailMatch(final String[] paths, final int startIndex, final AddressMapVisitor<T> collector) throws Exception {
if (childNodes.isEmpty()) {
return;
}
// look for a path match after 0-N skips among immediate children
AddressPartNode<T> match = null;
for (int i = startIndex; i < paths.length; i++) {
match = getChild(paths[i]);
if (match != null) {
match.visitMatchingWildcards(paths, i + 1, collector);
break;
}
}
// walk the rest of the sub tree to find a tail path match
for (AddressPartNode<T> child : childNodes.values()) {
// instance equality arranged in node creation
if (child != match && ANY_DESCENDENT != child.getPath()) {
child.visitPathTailMatch(paths, startIndex, collector);
}
}
}
// wildcards in the paths, ignore wildcard expansions in the map
public void visitNonWildcard(final String[] paths, final int startIndex, final AddressMapVisitor<T> collector) throws Exception {
boolean canVisitAnyDescendent = true;
AddressPartNode<T> node = this;
final int size = paths.length;
for (int i = startIndex; i < size && node != null; i++) {
final String path = paths[i];
// snuff out any descendant postfix in the paths ....#
if (ANY_DESCENDENT.equals(path)) {
if (i == size - 1) {
node.visitDescendantNonWildcardValues(collector);
canVisitAnyDescendent = false;
break;
}
}
if (ANY_CHILD.equals(path)) {
for (AddressPartNode<T> anyChild : node.getChildren()) {
if (ANY_CHILD == anyChild.getPath() || ANY_DESCENDENT == anyChild.getPath()) {
continue;
}
anyChild.visitNonWildcard(paths, i + 1, collector);
}
// once we have done depth first on all children we are done with our paths
return;
} else if (ANY_DESCENDENT.equals(path)) {
node.visitValues(collector);
node.visitPathTailNonWildcard(paths, i + 1, collector);
// once we have done depth first on all children we are done with our paths
return;
} else {
node = node.getChild(path);
}
}
if (node != null) {
if (canVisitAnyDescendent) {
node.visitValues(collector);
}
}
}
// non wildcard paths, match any expanded wildcards in the map
public void visitMatchingWildcards(final String[] paths, final int startIndex, final AddressMapVisitor<T> collector) throws Exception {
boolean canVisitAnyDescendent = true;
AddressPartNode<T> node = this;
AddressPartNode<T> anyDescendentNode = null;
AddressPartNode<T> anyChildNode = null;
final int size = paths.length;
for (int i = startIndex; i < size && node != null; i++) {
final String path = paths[i];
anyDescendentNode = node.getChild(ANY_DESCENDENT);
if (anyDescendentNode != null) {
anyDescendentNode.visitValues(collector);
// match tail with current path, such that ANY_DESCENDENT can match zero
anyDescendentNode.visitPathTailMatch(paths, i, collector);
canVisitAnyDescendent = false;
}
anyChildNode = node.getChild(ANY_CHILD);
if (anyChildNode != null) {
anyChildNode.visitMatchingWildcards(paths, i + 1, collector);
}
node = node.getChild(path);
if (node != null && (node == anyChildNode || node == anyDescendentNode)) {
// down that path before, out of here
return;
}
}
if (node != null) {
node.visitValues(collector);
if (canVisitAnyDescendent) {
// allow zero node any descendant at the end of path node
anyDescendentNode = node.getChild(ANY_DESCENDENT);
if (anyDescendentNode != null) {
anyDescendentNode.visitValues(collector);
}
}
}
}
public void visitValues(final AddressMapVisitor<T> collector) throws Exception {
for (T o : values) {
collector.visit(o);
}
}
public String getPath() {
return path;
}
protected void pruneIfEmpty() {
if (parent != null && childNodes.isEmpty() && values.isEmpty()) {
parent.removeChild(this);
}
}
protected void removeChild(final AddressPartNode<T> node) {
childNodes.remove(node.getPath());
pruneIfEmpty();
}
public void reset() {
values.clear();
childNodes.clear();
}
}

View File

@ -86,6 +86,7 @@ public final class BindingsImpl implements Bindings {
this.name = name;
}
@Override
public SimpleString getName() {
return name;
}

View File

@ -668,7 +668,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return null;
}
Bindings bindingsOnQueue = addressManager.getBindingsForRoutingAddress(queueBinding.getAddress());
Bindings bindingsOnQueue = addressManager.getExistingBindingsForRoutingAddress(queueBinding.getAddress());
try {
@ -923,7 +923,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new ActiveMQNonExistentQueueException();
}
if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
if (deleteData && addressManager.getExistingBindingsForRoutingAddress(binding.getAddress()) == null) {
deleteDuplicateCache(binding.getAddress());
}
@ -995,7 +995,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public Bindings getBindingsForAddress(final SimpleString address) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
Bindings bindings = addressManager.getExistingBindingsForRoutingAddress(address);
if (bindings == null) {
bindings = createBindings(address);
@ -1006,7 +1006,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public Bindings lookupBindingsForAddress(final SimpleString address) throws Exception {
return addressManager.getBindingsForRoutingAddress(address);
return addressManager.getExistingBindingsForRoutingAddress(address);
}
@Override

View File

@ -111,6 +111,11 @@ public class SimpleAddressManager implements AddressManager {
return binding.getA();
}
@Override
public Bindings getExistingBindingsForRoutingAddress(final SimpleString address) throws Exception {
return mappings.get(CompositeAddress.extractAddressName(address));
}
@Override
public Bindings getBindingsForRoutingAddress(final SimpleString address) throws Exception {
return mappings.get(CompositeAddress.extractAddressName(address));
@ -214,14 +219,18 @@ public class SimpleAddressManager implements AddressManager {
}
if (bindings.getBindings().isEmpty()) {
mappings.remove(realAddress);
bindingsEmpty(realAddress, bindings);
}
}
}
protected void addMappingsInternal(final SimpleString address,
protected void bindingsEmpty(SimpleString realAddress, Bindings bindings) {
}
protected Bindings addMappingsInternal(final SimpleString address,
final Collection<Binding> newBindings) throws Exception {
if (newBindings.isEmpty()) {
return;
return null;
}
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = mappings.get(realAddress);
@ -238,27 +247,29 @@ public class SimpleAddressManager implements AddressManager {
for (Binding binding : newBindings) {
bindings.addBinding(binding);
}
return bindings;
}
protected boolean addMappingInternal(final SimpleString address, final Binding binding) throws Exception {
boolean addedNewBindings = false;
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = mappings.get(realAddress);
Bindings prevBindings = null;
if (bindings == null) {
bindings = bindingsFactory.createBindings(realAddress);
prevBindings = mappings.putIfAbsent(realAddress, bindings);
final Bindings prevBindings = mappings.putIfAbsent(realAddress, bindings);
if (prevBindings != null) {
bindings = prevBindings;
} else {
addedNewBindings = true;
}
}
bindings.addBinding(binding);
return prevBindings != null;
return addedNewBindings;
}
@Override

View File

@ -16,17 +16,12 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -35,13 +30,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public class WildcardAddressManager extends SimpleAddressManager {
/**
* These are all the addresses, we use this so we can link back from the actual address to its linked wildcard addresses
* or vice versa
*/
private final Map<SimpleString, Address> addresses = new ConcurrentHashMap<>();
private final Map<SimpleString, Address> wildCardAddresses = new ConcurrentHashMap<>();
private final AddressMap<Bindings> addressMap = new AddressMap<>(wildcardConfiguration.getAnyWordsString(), wildcardConfiguration.getSingleWordString(), wildcardConfiguration.getDelimiter());
public WildcardAddressManager(final BindingsFactory bindingsFactory,
final WildcardConfiguration wildcardConfiguration,
@ -56,27 +45,38 @@ public class WildcardAddressManager extends SimpleAddressManager {
super(bindingsFactory, storageManager, metricsManager);
}
// publish, may be a new address that needs wildcard bindings added
// won't contain a wildcard because we don't ever route to a wildcards at this time
@Override
public Bindings getBindingsForRoutingAddress(final SimpleString address) throws Exception {
assert !isAWildcardAddress(address);
Bindings bindings = super.getBindingsForRoutingAddress(address);
// this should only happen if we're routing to an address that has no mappings when we're running checkAllowable
if (bindings == null && !wildCardAddresses.isEmpty()) {
Address add = addAndUpdateAddressMap(address);
if (!add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) {
Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress());
if (b != null) {
super.addMappingsInternal(address, b.getBindings());
if (bindings == null) {
bindings = super.getBindingsForRoutingAddress(address);
if (bindings == null) {
final Bindings[] lazyCreateResult = new Bindings[1];
addressMap.visitMatchingWildcards(address, new AddressMapVisitor<Bindings>() {
Bindings newBindings = null;
@Override
public void visit(Bindings matchingBindings) throws Exception {
if (newBindings == null) {
newBindings = addMappingsInternal(address, matchingBindings.getBindings());
lazyCreateResult[0] = newBindings;
} else {
for (Binding binding : matchingBindings.getBindings()) {
newBindings.addBinding(binding);
}
bindings.setMessageLoadBalancingType(b.getMessageLoadBalancingType());
}
}
}
if (bindings == null) {
bindings = super.getBindingsForRoutingAddress(address);
});
bindings = lazyCreateResult[0];
if (bindings != null) {
// record such that any new wildcard bindings can join
addressMap.put(address, bindings);
}
}
return bindings;
@ -84,134 +84,71 @@ public class WildcardAddressManager extends SimpleAddressManager {
/**
* If the address to add the binding to contains a wildcard then a copy of the binding (with the same underlying queue)
* will be added to the actual mappings. Otherwise the binding is added as normal.
* will be added to matching addresses. If the address is non wildcard, then we need to add any existing matching wildcard
* bindings to this address the first time we see it.
*
* @param binding the binding to add
* @return true if the address was a new mapping
*/
@Override
public boolean addBinding(final Binding binding) throws Exception {
boolean exists = super.addBinding(binding);
if (!exists) {
Address add = addAndUpdateAddressMap(binding.getAddress());
if (add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) {
super.addMappingInternal(destAdd.getAddress(), binding);
final boolean bindingsForANewAddress = super.addBinding(binding);
final SimpleString address = binding.getAddress();
final Bindings bindingsForRoutingAddress = mappings.get(binding.getAddress());
if (isAWildcardAddress(address)) {
addressMap.visitMatching(address, bindings -> {
// this wildcard binding needs to be added to matching addresses
bindings.addBinding(binding);
});
} else if (bindingsForANewAddress) {
// existing wildcards may match this new simple address
addressMap.visitMatchingWildcards(address, bindings -> {
// apply existing bindings from matching wildcards
for (Binding toAdd : bindings.getBindings()) {
bindingsForRoutingAddress.addBinding(toAdd);
}
} else {
for (Address destAdd : add.getLinkedAddresses()) {
Bindings bindings = super.getBindingsForRoutingAddress(destAdd.getAddress());
if (bindings != null) {
super.addMappingsInternal(binding.getAddress(), bindings.getBindings());
}
}
}
});
}
return !exists;
if (bindingsForANewAddress) {
addressMap.put(address, bindingsForRoutingAddress);
}
return bindingsForANewAddress;
}
/**
* If the address is a wild card then the binding will be removed from the actual mappings for any linked address.
* otherwise it will be removed as normal.
*
* @param uniqueName the name of the binding to remove
* @return true if this was the last mapping for a specific address
*/
@Override
public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception {
Binding binding = super.removeBinding(uniqueName, tx);
if (binding != null) {
final SimpleString bindingAddress = binding.getAddress();
final boolean containsWildcard = bindingAddress.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
Address address = containsWildcard ? wildCardAddresses.get(bindingAddress) : addresses.get(bindingAddress);
if (address == null) {
address = new AddressImpl(bindingAddress, wildcardConfiguration);
} else {
if (containsWildcard) {
for (Address linkedAddress : address.getLinkedAddresses()) {
super.removeBindingInternal(linkedAddress.getAddress(), uniqueName);
}
}
SimpleString address = binding.getAddress();
if (isAWildcardAddress(address)) {
addressMap.visitMatching(address, bindings -> removeBindingInternal(bindings.getName(), uniqueName));
}
removeAndUpdateAddressMap(address);
}
return binding;
}
private boolean isAWildcardAddress(SimpleString address) {
return address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
final AddressInfo removed = super.removeAddressInfo(address);
if (removed != null) {
//Remove from mappings so removeAndUpdateAddressMap processes and cleanup
mappings.remove(address);
removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration));
}
return removed;
protected void bindingsEmpty(SimpleString realAddress, Bindings bindings) {
addressMap.remove(realAddress, bindings);
}
@Override
public void clear() {
super.clear();
addresses.clear();
wildCardAddresses.clear();
addressMap.reset();
}
private Address addAndUpdateAddressMap(final SimpleString address) {
final boolean containsWildCard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
final Map<SimpleString, Address> addressMap = containsWildCard ? wildCardAddresses : addresses;
Address actualAddress = addressMap.get(address);
if (actualAddress == null) {
synchronized (this) {
actualAddress = addressMap.get(address);
if (actualAddress == null) {
actualAddress = new AddressImpl(address, wildcardConfiguration);
assert actualAddress.containsWildCard() == containsWildCard;
if (containsWildCard) {
for (Address destAdd : addresses.values()) {
if (destAdd.matches(actualAddress)) {
destAdd.addLinkedAddress(actualAddress);
actualAddress.addLinkedAddress(destAdd);
}
}
} else {
for (Address destAdd : wildCardAddresses.values()) {
if (actualAddress.matches(destAdd)) {
destAdd.addLinkedAddress(actualAddress);
actualAddress.addLinkedAddress(destAdd);
}
}
}
// only publish when complete
addressMap.put(address, actualAddress);
}
}
}
return actualAddress;
}
private void removeAndUpdateAddressMap(final Address address) throws Exception {
// we only remove if there are no bindings left
Bindings bindings = super.getBindingsForRoutingAddress(address.getAddress());
if (bindings == null || bindings.getBindings().isEmpty()) {
synchronized (this) {
for (Address address1 : address.getLinkedAddresses()) {
address1.removeLinkedAddress(address);
Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress());
if (linkedBindings == null || linkedBindings.getBindings().size() == 0) {
removeAddress(address1);
}
}
removeAddress(address);
}
}
}
private void removeAddress(final Address add) {
if (add.containsWildCard()) {
wildCardAddresses.remove(add.getAddress());
} else {
addresses.remove(add.getAddress());
}
public AddressMap<Bindings> getAddressMap() {
return addressMap;
}
}

View File

@ -28,7 +28,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
SimpleString getName();

View File

@ -99,6 +99,50 @@ public class AddressingTest extends ActiveMQTestBase {
}
}
@Test
public void testDynamicMulticastRouting() throws Exception {
SimpleString sendAddress = new SimpleString("test.address");
AddressInfo addressInfo = new AddressInfo(sendAddress);
addressInfo.addRoutingType(RoutingType.MULTICAST);
server.addOrUpdateAddressInfo(addressInfo);
Queue q1 = server.createQueue(new QueueConfiguration(new SimpleString("1.test.address")).setAddress("test.address").setRoutingType(RoutingType.MULTICAST));
Queue q2 = server.createQueue(new QueueConfiguration(new SimpleString("2.test.#")).setAddress("test.#").setRoutingType(RoutingType.MULTICAST));
ClientSession session = sessionFactory.createSession();
session.start();
ClientConsumer consumer1 = session.createConsumer(q1.getName());
ClientConsumer consumer2 = session.createConsumer(q2.getName());
ClientProducer producer = session.createProducer(sendAddress);
ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true);
m.getBodyBuffer().writeString("TestMessage");
producer.send(m);
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
// add in a new wildcard producer, bindings version will be incremented
Queue q3 = server.createQueue(new QueueConfiguration(new SimpleString("3.test.*")).setAddress("test.*").setRoutingType(RoutingType.MULTICAST));
ClientConsumer consumer3 = session.createConsumer(q3.getName());
producer.send(m);
assertNotNull(consumer1.receive(2000));
assertNotNull(consumer2.receive(2000));
assertNotNull(consumer3.receive(2000));
q1.deleteQueue();
q2.deleteQueue();
q3.deleteQueue();
}
@Test
public void testAnycastRouting() throws Exception {

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.jmh;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.AddressMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Fork(2)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 8, time = 1)
public class AddressMapPerfTest {
public AddressMap<Object> objectAddressMap;
@Param({"2", "8", "10"})
int entriesLog2;
int entries;
private static final WildcardConfiguration WILDCARD_CONFIGURATION;
SimpleString[] keys;
static {
WILDCARD_CONFIGURATION = new WildcardConfiguration();
WILDCARD_CONFIGURATION.setAnyWords('>');
WILDCARD_CONFIGURATION.setSingleWord('*');
}
@Setup
public void init() {
objectAddressMap =
new AddressMap<>(WILDCARD_CONFIGURATION.getAnyWordsString(), WILDCARD_CONFIGURATION.getSingleWordString(), WILDCARD_CONFIGURATION.getDelimiter());
entries = 1 << entriesLog2;
keys = new SimpleString[entries];
for (int i = 0; i < entries; i++) {
keys[i] = SimpleString.toSimpleString("topic." + i % entriesLog2 + "." + i);
keys[i].getPaths(WILDCARD_CONFIGURATION.getDelimiter()); // getPaths is not thread safe
}
}
@State(value = Scope.Thread)
public static class ThreadState {
long next;
SimpleString[] keys;
AtomicInteger counter = new AtomicInteger();
@Setup
public void init(AddressMapPerfTest benchmarkState) {
keys = benchmarkState.keys;
}
public SimpleString nextKeyValue() {
final long current = next;
next = current + 1;
final int index = (int) (current & (keys.length - 1));
return keys[index];
}
}
@Benchmark
@Group("both")
@GroupThreads(2)
public void testPutWhileRemove(ThreadState state) {
SimpleString s = state.nextKeyValue();
objectAddressMap.put(s, s);
}
@Benchmark
@Group("both")
@GroupThreads(2)
public void testRemoveWhilePut(ThreadState state) {
SimpleString s = state.nextKeyValue();
objectAddressMap.remove(s, s);
}
@Benchmark
@GroupThreads(4)
public void testPutAndVisit(final ThreadState state) throws Exception {
SimpleString s = state.nextKeyValue();
objectAddressMap.put(s, s);
// look for it
objectAddressMap.visitMatchingWildcards(s, value -> state.counter.incrementAndGet());
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.jmh;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Fork(2)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 8, time = 1)
public class WildcardAddressManagerHeirarchyPerfTest {
private static class BindingFactoryFake implements BindingsFactory {
@Override
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null);
}
}
private static class BindingFake implements Binding {
final SimpleString address;
final SimpleString id;
final Long idl;
BindingFake(SimpleString addressParameter, SimpleString id, long idl) {
this.address = addressParameter;
this.id = id;
this.idl = idl;
}
@Override
public void unproposed(SimpleString groupID) {
}
@Override
public SimpleString getAddress() {
return address;
}
@Override
public Bindable getBindable() {
return null;
}
@Override
public BindingType getType() {
return BindingType.LOCAL_QUEUE;
}
@Override
public SimpleString getUniqueName() {
return id;
}
@Override
public SimpleString getRoutingName() {
return id;
}
@Override
public SimpleString getClusterName() {
return null;
}
@Override
public Filter getFilter() {
return null;
}
@Override
public boolean isHighAcceptPriority(Message message) {
return false;
}
@Override
public boolean isExclusive() {
return false;
}
@Override
public Long getID() {
return idl;
}
@Override
public int getDistance() {
return 0;
}
@Override
public void route(Message message, RoutingContext context) {
}
@Override
public void close() {
}
@Override
public String toManagementString() {
return "FakeBiding Address=" + this.address;
}
@Override
public boolean isConnected() {
return true;
}
@Override
public void routeWithAck(Message message, RoutingContext context) {
}
}
public WildcardAddressManager addressManager;
@Param({"2", "8", "10"})
int topicsLog2;
@Param({"true", "false"})
boolean verifyWildcardBinding;
int topics;
AtomicLong topicCounter;
int partitions;
private static final WildcardConfiguration WILDCARD_CONFIGURATION;
SimpleString[] addresses;
Binding[] bindings;
static {
WILDCARD_CONFIGURATION = new WildcardConfiguration();
WILDCARD_CONFIGURATION.setAnyWords('>');
}
@Setup
public void init() throws Exception {
addressManager = new WildcardAddressManager(new BindingFactoryFake(), WILDCARD_CONFIGURATION, null, null);
topics = 1 << topicsLog2;
addresses = new SimpleString[topics];
bindings = new Binding[topics];
partitions = topicsLog2 * 2;
for (int i = 0; i < topics; i++) {
if (verifyWildcardBinding) {
// ensure simple matches present
addresses[i] = SimpleString.toSimpleString(MessageFormat.format("Topic1.abc-{0}.def-{0}.{1}", i % partitions, i));
addressManager.addBinding(new BindingFake(addresses[i], SimpleString.toSimpleString("" + i), i));
} else {
// ensure wildcard matches present
addresses[i] = SimpleString.toSimpleString(MessageFormat.format("Topic1.abc-{0}.*.{1}", i % partitions, i));
addressManager.addBinding(new BindingFake(addresses[i], SimpleString.toSimpleString("" + i), i));
}
}
topicCounter = new AtomicLong(0);
topicCounter.set(topics);
}
private long nextId() {
return topicCounter.incrementAndGet();
}
@State(value = Scope.Thread)
public static class ThreadState {
long next;
SimpleString[] addresses;
Binding binding;
@Setup
public void init(WildcardAddressManagerHeirarchyPerfTest benchmarkState) {
final long id = benchmarkState.nextId();
addresses = benchmarkState.addresses;
if (benchmarkState.verifyWildcardBinding) {
binding = new BindingFake(SimpleString.toSimpleString(MessageFormat.format("Topic1.abc-{0}.def-{1}.>", id % benchmarkState.partitions, id)), SimpleString.toSimpleString("" + id), id);
} else {
binding = new BindingFake(SimpleString.toSimpleString(MessageFormat.format("Topic1.abc-{0}.def-{0}.{1}", id % benchmarkState.partitions, id)), SimpleString.toSimpleString("" + id), id);
}
}
public SimpleString nextAddress() {
final long current = next;
next = current + 1;
final int index = (int) (current & (addresses.length - 1));
return addresses[index];
}
}
@Benchmark
@Threads(4)
public Binding testJustAddRemoveNewBinding(ThreadState state) throws Exception {
final Binding binding = state.binding;
addressManager.addBinding(binding);
addressManager.getBindingsForRoutingAddress(state.nextAddress());
return addressManager.removeBinding(binding.getUniqueName(), null);
}
}

View File

@ -52,7 +52,7 @@ public class WildcardAddressManagerPerfTest {
private static class BindingFactoryFake implements BindingsFactory {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
public Bindings createBindings(SimpleString address) {
return new BindingsImpl(address, null);
}
}
@ -130,11 +130,11 @@ public class WildcardAddressManagerPerfTest {
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
public void route(Message message, RoutingContext context) {
}
@Override
public void close() throws Exception {
public void close() {
}
@Override

View File

@ -0,0 +1,881 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.impl.AddressImpl;
import org.apache.activemq.artemis.core.postoffice.impl.AddressMap;
import org.apache.activemq.artemis.core.postoffice.impl.AddressMapVisitor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AddressMapUnitTest {
AddressMap<SimpleString> underTest = new AddressMap<>("#", "*", '.');
@Test
public void testAddGetRemove() throws Exception {
SimpleString a = new SimpleString("a.b.c");
assertTrue(isEmpty(a));
underTest.put(a, a);
assertFalse(isEmpty(a));
assertEquals(1, countMatchingWildcards(a));
underTest.remove(a, a);
assertTrue(isEmpty(a));
}
private boolean isEmpty(SimpleString match) throws Exception {
return countMatchingWildcards(match) == 0;
}
@Test
public void testWildcardAddGet() throws Exception {
SimpleString a = new SimpleString("a.*.c");
assertTrue(isEmpty(a));
underTest.put(a, a);
assertFalse(isEmpty(a));
assertEquals(1, countMatchingWildcards(a));
underTest.remove(a, a);
assertTrue(isEmpty(a));
}
@Test
public void testWildcardAllAddGet() throws Exception {
SimpleString a = new SimpleString("a.b.#");
assertTrue(isEmpty(a));
underTest.put(a, a);
assertFalse(isEmpty(a));
assertEquals(1, countMatchingWildcards(a));
underTest.remove(a, a);
assertTrue(isEmpty(a));
}
@Test
public void testNoDots() throws Exception {
SimpleString s1 = new SimpleString("abcde");
SimpleString s2 = new SimpleString("abcde");
underTest.put(s1, s1);
assertEquals(1, countMatchingWildcards(s2));
}
@Test
public void testDotsSameLength2() throws Exception {
SimpleString s1 = new SimpleString("a.b");
SimpleString s2 = new SimpleString("a.b");
underTest.put(s1, s1);
assertEquals(1, countMatchingWildcards(s2));
}
@Test
public void testA() throws Exception {
SimpleString s1 = new SimpleString("a.b.c");
SimpleString s2 = new SimpleString("a.b.c.d.e.f.g.h.i.j.k.l.m.n.*");
underTest.put(s1, s1);
assertEquals(0, countMatchingWildcards(s2));
}
@Test
public void testB() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s2 = new SimpleString("a.b.x.e");
SimpleString s3 = new SimpleString("a.b.c.*");
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testC() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s2 = new SimpleString("a.b.c.x");
SimpleString s3 = new SimpleString("a.b.*.d");
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testD() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e");
SimpleString s2 = new SimpleString("a.b.c.x.e");
SimpleString s3 = new SimpleString("a.b.*.d.*");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testE() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.x.e.f");
SimpleString s3 = new SimpleString("a.b.*.d.*.f");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testF() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.x.e.f");
SimpleString s3 = new SimpleString("#");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertTrue(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(2, countNonWildcardMatching(s3));
}
@Test
public void testG() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.x.e.f");
SimpleString s3 = new SimpleString("a.#");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertTrue(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(2, countNonWildcardMatching(s3));
}
@Test
public void testH() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.x.e.f");
SimpleString s3 = new SimpleString("#.b.#");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertTrue(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(2, countNonWildcardMatching(s3));
}
@Test
public void testI() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.x.e.f");
SimpleString s3 = new SimpleString("a.#.b.#");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertTrue(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(2, countNonWildcardMatching(s3));
}
@Test
public void testJ() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.x.e.f");
SimpleString s3 = new SimpleString("a.#.c.d.e.f");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testK() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.d.e.x");
SimpleString s3 = new SimpleString("a.#.c.d.e.*");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertTrue(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(2, countNonWildcardMatching(s3));
}
@Test
public void testL() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d.e.f");
SimpleString s2 = new SimpleString("a.b.c.d.e.x");
SimpleString s3 = new SimpleString("a.#.c.d.*.f");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testM() throws Exception {
SimpleString s1 = new SimpleString("a.b.c");
SimpleString s2 = new SimpleString("a.b.x.e");
SimpleString s3 = new SimpleString("a.b.c.#");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testN() throws Exception {
SimpleString s1 = new SimpleString("usd.stock");
SimpleString s2 = new SimpleString("a.b.x.e");
SimpleString s3 = new SimpleString("*.stock.#");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testO() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s2 = new SimpleString("a.b.x.e");
SimpleString s3 = new SimpleString("a.b.c.*");
Address a1 = new AddressImpl(s1);
Address a2 = new AddressImpl(s2);
Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
assertFalse(a2.matches(w));
underTest.put(s1, s1);
underTest.put(s2, s2);
assertEquals(1, countNonWildcardMatching(s3));
}
@Test
public void testP() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s3 = new SimpleString("a.b.c#");
Address a1 = new AddressImpl(s1);
Address w = new AddressImpl(s3);
assertFalse(a1.matches(w));
underTest.put(s1, s1);
assertEquals(0, countNonWildcardMatching(s3));
}
@Test
public void testQ() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s3 = new SimpleString("#a.b.c");
Address a1 = new AddressImpl(s1);
Address w = new AddressImpl(s3);
assertFalse(a1.matches(w));
underTest.put(s1, s1);
assertEquals(0, countNonWildcardMatching(s3));
}
@Test
public void testR() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s3 = new SimpleString("#*a.b.c");
Address a1 = new AddressImpl(s1);
Address w = new AddressImpl(s3);
assertFalse(a1.matches(w));
underTest.put(s1, s1);
assertEquals(0, countNonWildcardMatching(s3));
}
@Test
public void testS() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s3 = new SimpleString("a.b.c*");
Address a1 = new AddressImpl(s1);
Address w = new AddressImpl(s3);
assertFalse(a1.matches(w));
underTest.put(s1, s1);
assertEquals(0, countNonWildcardMatching(s3));
}
@Test
public void testT() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s3 = new SimpleString("*a.b.c");
Address a1 = new AddressImpl(s1);
Address w = new AddressImpl(s3);
assertFalse(a1.matches(w));
underTest.put(s1, s1);
assertEquals(0, countNonWildcardMatching(s3));
}
@Test
public void testU() throws Exception {
SimpleString s1 = new SimpleString("a.b.c.d");
SimpleString s3 = new SimpleString("*a.b.c");
Address a1 = new AddressImpl(s1);
Address w = new AddressImpl(s3);
assertFalse(a1.matches(w));
underTest.put(s1, s1);
assertEquals(0, countNonWildcardMatching(s3));
}
@Test
public void testV() throws Exception {
final SimpleString s1 = new SimpleString("a.b.d");
final SimpleString s3 = new SimpleString("a.b.#.d");
final Address a1 = new AddressImpl(s1);
final Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
underTest.put(s1, s1);
assertEquals(1, countNonWildcardMatching(s3));
final SimpleString s2 = new SimpleString("a.b.b.b.b.d");
underTest.put(s2, s2);
assertEquals(2, countNonWildcardMatching(s3));
}
@Test
public void testVReverse() throws Exception {
final SimpleString s1 = new SimpleString("a.b.d");
final SimpleString s3 = new SimpleString("a.b.#.d");
final Address a1 = new AddressImpl(s1);
final Address w = new AddressImpl(s3);
assertTrue(a1.matches(w));
underTest.put(s3, s3);
assertEquals(1, countMatchingWildcards(s1));
}
@Test
public void testHashNMatch() throws Exception {
SimpleString addressABCF = new SimpleString("a.b.c.f");
SimpleString addressACF = new SimpleString("a.c.f");
SimpleString match = new SimpleString("a.#.f");
underTest.put(addressABCF, addressABCF);
underTest.put(addressACF, addressACF);
assertEquals(2, countNonWildcardMatching(match));
}
@Test
public void testEndHash() throws Exception {
SimpleString addressAB = new SimpleString("a.b");
SimpleString addressACF = new SimpleString("a.c.f");
SimpleString addressABC = new SimpleString("a.b.c");
SimpleString match = new SimpleString("a.b.#");
underTest.put(addressAB, addressAB);
underTest.put(addressACF, addressACF);
assertEquals(1, countNonWildcardMatching(match));
underTest.put(addressABC, addressABC);
assertEquals(2, countNonWildcardMatching(match));
}
@Test
public void testHashEndInMap() throws Exception {
SimpleString addressABHash = new SimpleString("a.b.#");
SimpleString addressABC = new SimpleString("a.b.c");
SimpleString match = new SimpleString("a.b");
underTest.put(addressABHash, addressABHash);
underTest.put(addressABC, addressABC);
assertEquals(1, countMatchingWildcards(match));
}
private int countMatchingWildcards(SimpleString plainAddress) throws Exception {
final AtomicInteger count = new AtomicInteger();
underTest.visitMatchingWildcards(plainAddress, value -> {
count.incrementAndGet();
});
return count.get();
}
private int countNonWildcardMatching(SimpleString canBeWildcardAddress) throws Exception {
final AtomicInteger count = new AtomicInteger();
underTest.visitMatching(canBeWildcardAddress, value -> {
count.incrementAndGet();
});
return count.get();
}
@Test
public void testHashEndMatchMap() throws Exception {
SimpleString match = new SimpleString("a.b.#");
SimpleString addressABC = new SimpleString("a.b.c");
SimpleString addressAB = new SimpleString("a.b");
underTest.put(addressAB, addressAB);
underTest.put(addressABC, addressABC);
assertEquals(0, countMatchingWildcards(match));
assertEquals(2, countNonWildcardMatching(match));
underTest.put(match, match);
assertEquals(1, countMatchingWildcards(match));
}
@Test
public void testHashAGet() throws Exception {
SimpleString hashA = new SimpleString("#.a");
underTest.put(hashA, hashA);
SimpleString matchA = new SimpleString("a");
SimpleString matchAB = new SimpleString("a.b");
assertEquals(1, countMatchingWildcards(matchA));
assertEquals(0, countMatchingWildcards(matchAB));
AddressImpl aStar = new AddressImpl(hashA);
AddressImpl aA = new AddressImpl(matchA);
assertTrue(aA.matches(aStar));
AddressImpl aAB = new AddressImpl(matchAB);
assertFalse(aAB.matches(aStar));
}
@Test
public void testStarOne() throws Exception {
SimpleString star = new SimpleString("*");
underTest.put(star, star);
SimpleString matchA = new SimpleString("a");
SimpleString matchAB = new SimpleString("a.b");
final AtomicInteger count = new AtomicInteger();
underTest.visitMatchingWildcards(matchA, value -> count.incrementAndGet());
assertEquals(1, count.get());
count.set(0);
underTest.visitMatchingWildcards(matchAB, value -> count.incrementAndGet());
assertEquals(0, count.get());
}
@Test
public void testHashOne() throws Exception {
SimpleString hash = new SimpleString("#");
underTest.put(hash, hash);
SimpleString matchA = new SimpleString("a");
SimpleString matchAB = new SimpleString("a.b");
SimpleString matchABC = new SimpleString("a.b.c");
final AtomicInteger count = new AtomicInteger();
AddressMapVisitor<SimpleString> countCollector = value -> count.incrementAndGet();
count.set(0);
underTest.visitMatchingWildcards(matchA, countCollector);
assertEquals(1, count.get());
count.set(0);
underTest.visitMatchingWildcards(matchAB, countCollector);
assertEquals(1, count.get());
count.set(0);
underTest.visitMatchingWildcards(matchABC, countCollector);
assertEquals(1, count.get());
}
@Test
public void testHashAMatch() throws Exception {
SimpleString a = new SimpleString("a");
underTest.put(a, a);
assertEquals(1, countNonWildcardMatching(new SimpleString("#.a")));
assertEquals(1, countMatchingWildcards(new SimpleString("a")));
}
@Test
public void testHashA() throws Exception {
SimpleString hashA = new SimpleString("#.a");
underTest.put(hashA, hashA);
assertEquals(1, countMatchingWildcards(new SimpleString("a")));
assertEquals(1, countMatchingWildcards(new SimpleString("d.f.c.a")));
// has to end in 'a', and not being with 'a'
SimpleString abcaS = new SimpleString("a.b.c.a");
AddressImpl aHashA = new AddressImpl(hashA);
AddressImpl aABCA = new AddressImpl(abcaS);
assertFalse(aABCA.matches(aHashA));
assertFalse(aHashA.matches(aABCA));
assertEquals(0, countMatchingWildcards(abcaS));
assertEquals(0, countMatchingWildcards(new SimpleString("a.b")));
assertEquals(0, countMatchingWildcards(new SimpleString("a.b.c")));
assertEquals(0, countMatchingWildcards(new SimpleString("a.b.c.a.d")));
// will match a.....a
SimpleString AHashA = new SimpleString("a.#.a");
underTest.put(AHashA, AHashA);
assertEquals(1, countMatchingWildcards(new SimpleString("a.b.c.a")));
assertEquals(0, countNonWildcardMatching(new SimpleString("a.b.c.a")));
// only now remove the #.a
underTest.remove(hashA, hashA);
assertEquals(1, countMatchingWildcards(new SimpleString("a.b.c.a")));
assertEquals(1, countMatchingWildcards(new SimpleString("a.a")));
}
@Test
public void testAHashA() throws Exception {
final AtomicInteger count = new AtomicInteger();
AddressMapVisitor<SimpleString> countCollector = value -> count.incrementAndGet();
// will match a.....a
SimpleString AHashA = new SimpleString("a.#.a");
underTest.put(AHashA, AHashA);
count.set(0);
underTest.visitMatchingWildcards(new SimpleString("a.b.c.a"), countCollector);
assertEquals(1, count.get());
count.set(0);
underTest.visitMatchingWildcards(new SimpleString("a.a"), countCollector);
assertEquals(1, count.get());
count.set(0);
underTest.visitMatchingWildcards(new SimpleString("a"), countCollector);
assertEquals(0, count.get());
}
@Test
public void testStar() throws Exception {
SimpleString star = new SimpleString("*");
SimpleString addressA = new SimpleString("a");
SimpleString addressAB = new SimpleString("a.b");
underTest.put(star, star);
underTest.put(addressAB, addressAB);
final AtomicInteger count = new AtomicInteger();
underTest.visitMatchingWildcards(addressA, value -> count.incrementAndGet());
assertEquals(1, count.get());
}
@Test
public void testSomeAndAny() throws Exception {
SimpleString star = new SimpleString("test.*.some.#");
underTest.put(star, star);
assertEquals(0, countNonWildcardMatching(star));
assertEquals(1, countMatchingWildcards(star));
SimpleString addressA = new SimpleString("test.1.some.la");
underTest.put(addressA, addressA);
assertEquals(1, countMatchingWildcards(star));
assertEquals(1, countNonWildcardMatching(star));
assertEquals(2, countMatchingWildcards(addressA));
assertEquals(1, countNonWildcardMatching(addressA));
}
@Test
public void testAnyAndSome() throws Exception {
SimpleString star = new SimpleString("test.#.some.*");
underTest.put(star, star);
assertEquals(1, countMatchingWildcards(star));
// add another match
SimpleString addressA = new SimpleString("test.1.some.la");
underTest.put(addressA, addressA);
assertEquals(1, countMatchingWildcards(star));
assertEquals(1, countNonWildcardMatching(star));
assertEquals(1, countNonWildcardMatching(addressA));
assertEquals(2, countMatchingWildcards(addressA));
}
@Test
public void testAnyAndSomeInMap() throws Exception {
SimpleString hashHash = new SimpleString("test.#.some.#");
underTest.put(hashHash, hashHash);
SimpleString starStar = new SimpleString("test.*.some.*");
underTest.put(starStar, starStar);
SimpleString hashStar = new SimpleString("test.#.A.*");
underTest.put(hashStar, hashStar);
SimpleString oneHashStar = new SimpleString("test.1.#.T");
underTest.put(oneHashStar, oneHashStar);
assertEquals(2, countMatchingWildcards(hashHash));
assertEquals(0, countNonWildcardMatching(hashHash));
SimpleString reqular = new SimpleString("test.a.b.some");
underTest.put(reqular, reqular);
assertEquals(1, countNonWildcardMatching(hashHash));
assertEquals(1, countNonWildcardMatching(reqular));
assertEquals(2, countMatchingWildcards(reqular));
}
@Test
public void testManyEntries() throws Exception {
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test." + i);
underTest.put(star, star);
}
assertEquals(10, countNonWildcardMatching(new SimpleString("test.*")));
assertEquals(10, countNonWildcardMatching(new SimpleString("test.#")));
assertEquals(1, countMatchingWildcards(new SimpleString("test.0")));
underTest.put(new SimpleString("test.#"), new SimpleString("test.#"));
underTest.put(new SimpleString("test.*"), new SimpleString("test.*"));
assertEquals(3, countMatchingWildcards(new SimpleString("test.1")));
assertEquals(10, countNonWildcardMatching(new SimpleString("test.#")));
assertEquals(10, countNonWildcardMatching(new SimpleString("test.*")));
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test.a." + i);
underTest.put(star, star);
}
assertEquals(2, countMatchingWildcards(new SimpleString("test.a.0")));
assertEquals(20, countNonWildcardMatching(new SimpleString("test.#")));
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test.b." + i);
underTest.put(star, star);
}
assertEquals(10, countNonWildcardMatching(new SimpleString("test.b.*")));
underTest.remove(new SimpleString("test.#"), new SimpleString("test.#"));
assertEquals(10, countNonWildcardMatching(new SimpleString("test.b.*")));
assertEquals(1, countMatchingWildcards(new SimpleString("test.a.0")));
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test.c." + i);
underTest.put(star, star);
}
assertEquals(10, countNonWildcardMatching(new SimpleString("test.c.*")));
SimpleString testStarStar = new SimpleString("test.*.*");
assertEquals(30, countNonWildcardMatching(testStarStar));
underTest.put(testStarStar, testStarStar);
assertEquals(30, countNonWildcardMatching(testStarStar));
assertEquals(1, countMatchingWildcards(testStarStar));
assertEquals(1, countMatchingWildcards(new SimpleString("test.b.c")));
}
@Test
public void testReset() throws Exception {
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test." + i);
underTest.put(star, star);
}
assertEquals(0, countMatchingWildcards(new SimpleString("test.*")));
assertEquals(10, countNonWildcardMatching(new SimpleString("test.*")));
underTest.reset();
assertEquals(0, countNonWildcardMatching(new SimpleString("test.*")));
}
@Test
public void testRemove() throws Exception {
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test." + i);
underTest.put(star, star);
}
SimpleString test1 = new SimpleString("test.1");
assertEquals(1, countMatchingWildcards(test1));
underTest.remove(test1, test1);
assertEquals(0, countMatchingWildcards(test1));
assertEquals(9, countNonWildcardMatching(new SimpleString("test.*")));
for (int i = 0; i < 10; i++) {
SimpleString star = new SimpleString("test." + i);
underTest.remove(star, star);
}
assertEquals(0, countNonWildcardMatching(new SimpleString("test.*")));
}
@Test
public void testMax() throws Exception {
underTest.put(new SimpleString("test.#.a"), new SimpleString("test.#.a"));
underTest.put(new SimpleString("test.*.a"), new SimpleString("test.*.a"));
underTest.put(new SimpleString("*.a"), new SimpleString("*.a"));
underTest.put(new SimpleString("#.a"), new SimpleString("#.a"));
assertEquals(3, countMatchingWildcards(new SimpleString("test.a")));
assertEquals(1, countMatchingWildcards(new SimpleString("test.a.a")));
}
}

View File

@ -19,9 +19,9 @@ package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
@ -29,19 +29,17 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.impl.AddressMapVisitor;
import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.jboss.logging.Logger;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class WildcardAddressManagerPerfTest {
private static final Logger log = Logger.getLogger(WildcardAddressManagerPerfTest.class);
@Test
@Ignore
@ -55,11 +53,10 @@ public class WildcardAddressManagerPerfTest {
configuration.setAnyWords('>');
final WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null);
final SimpleString wildCard = SimpleString.toSimpleString("Topic1.>");
ad.addAddressInfo(new AddressInfo(wildCard, RoutingType.MULTICAST));
int numSubs = 1000;
int numThreads = 1;
int numSubs = 5000;
int numThreads = 4;
final int partitions = 2;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numSubs; i++ ) {
@ -68,27 +65,35 @@ public class WildcardAddressManagerPerfTest {
executorService.submit(() -> {
try {
if (id % 500 == 0) {
if (id % 1000 == 0) {
// give gc a chance
Thread.yield();
}
// subscribe as wildcard
ad.addBinding(new BindingFake(SimpleString.toSimpleString("Topic1.>"), SimpleString.toSimpleString("" + id), id));
ad.addBinding(new BindingFake(SimpleString.toSimpleString("Topic1." + id % partitions + ".>"), SimpleString.toSimpleString("" + id), id));
SimpleString pubAddr = SimpleString.toSimpleString("Topic1." + id % partitions + "." + id );
if (id != 0 && id % 1000 == 0) {
System.err.println("0. pub for: " + id );
}
SimpleString pubAddr = SimpleString.toSimpleString("Topic1." + id );
// publish
Bindings binding = ad.getBindingsForRoutingAddress(pubAddr);
if (id % 100 == 0) {
System.err.println("1. Bindings for: " + id + ", " + binding.getBindings().size());
}
if (binding != null) {
if (id != 0 && id % 1000 == 0) {
System.err.println("1. Bindings for: " + id + ", " + binding.getBindings().size());
}
// publish again
binding = ad.getBindingsForRoutingAddress(pubAddr);
// publish again
binding = ad.getBindingsForRoutingAddress(pubAddr);
if (id % 100 == 0) {
System.err.println("2. Bindings for: " + id + ", " + binding.getBindings().size());
if (id % 500 == 0) {
System.err.println("2. Bindings for: " + id + ", " + binding.getBindings().size());
}
}
} catch (Exception e) {
@ -100,14 +105,24 @@ public class WildcardAddressManagerPerfTest {
executorService.shutdown();
assertTrue("finished on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
// TimeUnit.MINUTES.sleep(5);
final AtomicLong addresses = new AtomicLong();
final AtomicLong bindings = new AtomicLong();
ad.getAddressMap().visitMatchingWildcards(SimpleString.toSimpleString(">"), new AddressMapVisitor<Bindings>() {
@Override
public void visit(Bindings value) {
addresses.incrementAndGet();
bindings.addAndGet(value.getBindings().size());
}
});
System.err.println("Total: Addresses: " + addresses.get() + ", bindings: " + bindings.get());
System.out.println("Type so we can go on..");
// System.in.read();
//System.in.read();
System.out.println("we can go on..");
}
class BindingFactoryFake implements BindingsFactory {
@Override

View File

@ -16,9 +16,7 @@
*/
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -31,7 +29,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -116,61 +113,43 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
/**
* Test for ARTEMIS-1610
* @throws Exception
*/
@SuppressWarnings("unchecked")
@Test
public void testWildCardAddressRemoval() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.#"), RoutingType.MULTICAST));
ad.addBinding(new BindingFake("Topic1.topic", "two"));
ad.addBinding(new BindingFake("Topic1.#", "two"));
ad.addBinding(new BindingFake("Queue1.#", "one"));
Field wildcardAddressField = WildcardAddressManager.class.getDeclaredField("wildCardAddresses");
wildcardAddressField.setAccessible(true);
Map<SimpleString, Address> wildcardAddresses = (Map<SimpleString, Address>)wildcardAddressField.get(ad);
//Calling this method will trigger the wildcard to be added to the wildcard map internal
//to WildcardAddressManager
ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.#"));
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.topic")).getBindings().size());
//Remove the address
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.#"));
//Verify the address was cleaned up properly
assertEquals(1, wildcardAddresses.size());
assertNull(ad.getAddressInfo(SimpleString.toSimpleString("Topic1.#")));
assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#")));
}
@Test
public void testWildCardAddBinding() throws Exception {
public void testWildCardAddRemoveBinding() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
SimpleString address = SimpleString.toSimpleString("Queue1.1");
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
BindingFake bindingFake = new BindingFake("Queue1.#", "one");
Assert.assertTrue(ad.addBinding(bindingFake));
assertEquals(1, ad.getBindingsForRoutingAddress(address).getBindings().size());
ad.removeBinding(bindingFake.getUniqueName(), null);
assertNull(ad.getExistingBindingsForRoutingAddress(address));
}
@Test
public void tesWildcardOnClusterUpdate() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.A", "oneOnA")));
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
Field wildcardAddressField = WildcardAddressManager.class.getDeclaredField("wildCardAddresses");
wildcardAddressField.setAccessible(true);
Map<SimpleString, Address> wildcardAddresses = (Map<SimpleString, Address>)wildcardAddressField.get(ad);
SimpleString addressOfInterest = SimpleString.toSimpleString("Queue1.#");
assertEquals(1, wildcardAddresses.get(addressOfInterest).getLinkedAddresses().size());
// whack the existing state, it should remain whacked!
wildcardAddresses.get(addressOfInterest).getLinkedAddresses().clear();
// new binding on existing address, verify just reads linkedAddresses
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.A", "twoOnA")));
assertTrue("no addresses added", wildcardAddresses.get(addressOfInterest).getLinkedAddresses().isEmpty());
}
@Test(expected = ActiveMQQueueExistsException.class)
public void testWildCardAddAlreadyExistingBindingShouldThrowException() throws Exception {
@ -190,7 +169,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
ad.addBinding(new BindingFake("Topic1.>", "one"));
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getExistingBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).size());
@ -225,7 +204,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
ad.addBinding(new BindingFake("Topic1.test", "two"));
ad.addBinding(new BindingFake("Topic2.test", "three"));
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getExistingBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(2, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test1")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test2")).getBindings().size());
@ -239,6 +218,60 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
}
@Test
public void testNumberOfBindingsThatMatch() throws Exception {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null);
ad.addBinding(new BindingFake("T.>", "1"));
ad.addBinding(new BindingFake("T.>", "2"));
ad.addBinding(new BindingFake("T.>", "3"));
assertEquals(3, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1")).getBindings().size());
assertEquals(3, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.2")).getBindings().size());
assertEquals(3, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.3")).getBindings().size());
assertEquals(3, ad.getExistingBindingsForRoutingAddress(SimpleString.toSimpleString("T.>")).getBindings().size());
ad.addBinding(new BindingFake("T.*", "10"));
assertEquals(1, ad.getExistingBindingsForRoutingAddress(SimpleString.toSimpleString("T.*")).getBindings().size());
// wildcard binding should not be added to existing matching wildcards, still 3
assertEquals(3, ad.getExistingBindingsForRoutingAddress(SimpleString.toSimpleString("T.>")).getBindings().size());
assertEquals(4, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1")).getBindings().size());
assertEquals(4, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.2")).getBindings().size());
assertEquals(4, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.3")).getBindings().size());
ad.addBinding(new BindingFake("T.1.>", "11"));
assertEquals(1, ad.getExistingBindingsForRoutingAddress(SimpleString.toSimpleString("T.1.>")).getBindings().size());
assertEquals(5, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1")).getBindings().size());
assertEquals(4, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.2")).getBindings().size());
assertEquals(4, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.3")).getBindings().size());
ad.addBinding(new BindingFake("T.1.2", "12"));
assertEquals(5, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1.2")).getBindings().size());
ad.addBinding(new BindingFake("T.1.2.3.4", "13"));
assertEquals(5, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1.2.3.4")).getBindings().size());
ad.addBinding(new BindingFake("T.>.4", "14"));
assertEquals(6, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1.2.3.4")).getBindings().size());
ad.addBinding(new BindingFake("T.1.A.3.4", "15"));
assertEquals(6, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("T.1.A.3.4")).getBindings().size());
}
@Test
public void testConcurrentCalls() throws Exception {
final WildcardConfiguration configuration = new WildcardConfiguration();
@ -267,10 +300,10 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
SimpleString pubAddr = SimpleString.toSimpleString("Topic1." + id );
// publish to new address, will create
Bindings binding = ad.getBindingsForRoutingAddress(pubAddr);
ad.getBindingsForRoutingAddress(pubAddr);
// publish again, read only
binding = ad.getBindingsForRoutingAddress(pubAddr);
ad.getBindingsForRoutingAddress(pubAddr);
} catch (Exception e) {
e.printStackTrace();
@ -284,15 +317,15 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertNull("no exceptions", oops.get());
}
class BindingFactoryFake implements BindingsFactory {
static class BindingFactoryFake implements BindingsFactory {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
return new BindingsFake();
public Bindings createBindings(SimpleString address) {
return new BindingsFake(address);
}
}
class BindingFake implements Binding {
static class BindingFake implements Binding {
final SimpleString address;
final SimpleString id;
@ -392,8 +425,13 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
static class BindingsFake implements Bindings {
SimpleString name;
ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<>();
BindingsFake(SimpleString address) {
this.name = address;
}
@Override
public Collection<Binding> getBindings() {
return bindings.values();
@ -409,6 +447,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
return bindings.remove(uniqueName);
}
@Override
public SimpleString getName() {
return name;
}
@Override
public void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {