ARTEMIS-2990 Improve scalability of wildcard address manager add/remove

This commit is contained in:
franz1981 2020-11-15 09:42:53 +01:00 committed by Clebert Suconic
parent d63422161f
commit 923fcb7fe4
10 changed files with 179 additions and 158 deletions

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
public class CompositeAddress { public class CompositeAddress {
public static String SEPARATOR = "::"; public static final String SEPARATOR = "::";
public static String toFullyQualified(String address, String qName) { public static String toFullyQualified(String address, String qName) {
return toFullyQualified(SimpleString.toSimpleString(address), SimpleString.toSimpleString(qName)).toString(); return toFullyQualified(SimpleString.toSimpleString(address), SimpleString.toSimpleString(qName)).toString();

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.postoffice; package org.apache.activemq.artemis.core.postoffice;
import java.util.List; import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -31,7 +31,7 @@ public interface Address {
boolean containsWildCard(); boolean containsWildCard();
List<Address> getLinkedAddresses(); Collection<Address> getLinkedAddresses();
void addLinkedAddress(Address address); void addLinkedAddress(Address address);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection; import java.util.Collection;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -33,7 +34,7 @@ public interface Bindings extends UnproposalListener {
void addBinding(Binding binding); void addBinding(Binding binding);
void removeBinding(Binding binding); Binding removeBindingByUniqueName(SimpleString uniqueName);
void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType); void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);

View File

@ -16,12 +16,16 @@
*/ */
package org.apache.activemq.artemis.core.postoffice.impl; package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList; import java.util.Collection;
import java.util.List; import java.util.Collections;
import java.util.Set;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.Address; import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jctools.maps.NonBlockingHashSet;
/** /**
* Splits an address string into its hierarchical parts using {@link WildcardConfiguration#getDelimiter()} as delimiter. * Splits an address string into its hierarchical parts using {@link WildcardConfiguration#getDelimiter()} as delimiter.
@ -36,7 +40,7 @@ public class AddressImpl implements Address {
private final boolean containsWildCard; private final boolean containsWildCard;
private final List<Address> linkedAddresses = new ArrayList<>(); private Set<Address> linkedAddresses = null;
private final WildcardConfiguration wildcardConfiguration; private final WildcardConfiguration wildcardConfiguration;
@ -67,17 +71,27 @@ public class AddressImpl implements Address {
} }
@Override @Override
public List<Address> getLinkedAddresses() { public Collection<Address> getLinkedAddresses() {
final Collection<Address> linkedAddresses = this.linkedAddresses;
if (linkedAddresses == null) {
return Collections.emptySet();
}
return linkedAddresses; return linkedAddresses;
} }
@Override @Override
public void addLinkedAddress(final Address address) { public void addLinkedAddress(final Address address) {
if (linkedAddresses == null) {
linkedAddresses = PlatformDependent.hasUnsafe() ? new NonBlockingHashSet<>() : new ConcurrentHashSet<>();
}
linkedAddresses.add(address); linkedAddresses.add(address);
} }
@Override @Override
public void removeLinkedAddress(final Address actualAddress) { public void removeLinkedAddress(final Address actualAddress) {
if (linkedAddresses == null) {
return;
}
linkedAddresses.remove(actualAddress); linkedAddresses.remove(actualAddress);
} }

View File

@ -149,7 +149,6 @@ public final class BindingsImpl implements Bindings {
} finally { } finally {
updated(); updated();
} }
} }
@Override @Override
@ -162,7 +161,11 @@ public final class BindingsImpl implements Bindings {
} }
@Override @Override
public void removeBinding(final Binding binding) { public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) {
final Binding binding = bindingsNameMap.remove(bindingUniqueName);
if (binding == null) {
return null;
}
try { try {
if (binding.isExclusive()) { if (binding.isExclusive()) {
exclusiveBindings.remove(binding); exclusiveBindings.remove(binding);
@ -181,11 +184,12 @@ public final class BindingsImpl implements Bindings {
} }
bindingsIdMap.remove(binding.getID()); bindingsIdMap.remove(binding.getID());
bindingsNameMap.remove(binding.getUniqueName()); assert !bindingsNameMap.containsKey(binding.getUniqueName());
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings()); logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings());
} }
return binding;
} finally { } finally {
updated(); updated();
} }

View File

@ -208,31 +208,37 @@ public class SimpleAddressManager implements AddressManager {
Bindings bindings = mappings.get(realAddress); Bindings bindings = mappings.get(realAddress);
if (bindings != null) { if (bindings != null) {
removeMapping(bindableName, bindings); final SimpleString bindableQueueName = CompositeAddress.extractQueueName(bindableName);
final Binding binding = bindings.removeBindingByUniqueName(bindableQueueName);
if (binding == null) {
throw new IllegalStateException("Cannot find binding " + bindableName);
}
if (bindings.getBindings().isEmpty()) { if (bindings.getBindings().isEmpty()) {
mappings.remove(realAddress); mappings.remove(realAddress);
} }
} }
} }
protected Binding removeMapping(final SimpleString bindableName, final Bindings bindings) { protected void addMappingsInternal(final SimpleString address,
Binding theBinding = null; final Collection<Binding> newBindings) throws Exception {
if (newBindings.isEmpty()) {
return;
}
SimpleString realAddress = CompositeAddress.extractAddressName(address);
Bindings bindings = mappings.get(realAddress);
for (Binding binding : bindings.getBindings()) { if (bindings == null) {
if (binding.getUniqueName().equals(CompositeAddress.extractQueueName(bindableName))) { bindings = bindingsFactory.createBindings(realAddress);
theBinding = binding;
break; final Bindings prevBindings = mappings.putIfAbsent(realAddress, bindings);
if (prevBindings != null) {
bindings = prevBindings;
} }
} }
for (Binding binding : newBindings) {
if (theBinding == null) { bindings.addBinding(binding);
throw new IllegalStateException("Cannot find binding " + bindableName);
} }
bindings.removeBinding(theBinding);
return theBinding;
} }
protected boolean addMappingInternal(final SimpleString address, final Binding binding) throws Exception { protected boolean addMappingInternal(final SimpleString address, final Binding binding) throws Exception {

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.core.postoffice.impl; package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -28,11 +31,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* extends the simple manager to allow wildcard addresses to be used. * extends the simple manager to allow wildcard addresses to be used.
*/ */
@ -65,20 +63,22 @@ public class WildcardAddressManager extends SimpleAddressManager {
// this should only happen if we're routing to an address that has no mappings when we're running checkAllowable // this should only happen if we're routing to an address that has no mappings when we're running checkAllowable
if (bindings == null && !wildCardAddresses.isEmpty()) { if (bindings == null && !wildCardAddresses.isEmpty()) {
Address add = addAndUpdateAddressMap(address); Address add = addAndUpdateAddressMap(address, true);
if (!add.containsWildCard()) { if (!add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) { for (Address destAdd : add.getLinkedAddresses()) {
Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress()); Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress());
if (b != null) { if (b != null) {
Collection<Binding> theBindings = b.getBindings(); super.addMappingsInternal(address, b.getBindings());
for (Binding theBinding : theBindings) { if (bindings == null) {
super.addMappingInternal(address, theBinding); bindings = super.getBindingsForRoutingAddress(address);
} }
super.getBindingsForRoutingAddress(address).setMessageLoadBalancingType(b.getMessageLoadBalancingType()); bindings.setMessageLoadBalancingType(b.getMessageLoadBalancingType());
} }
} }
} }
bindings = super.getBindingsForRoutingAddress(address); if (bindings == null) {
bindings = super.getBindingsForRoutingAddress(address);
}
} }
return bindings; return bindings;
} }
@ -94,7 +94,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
public boolean addBinding(final Binding binding) throws Exception { public boolean addBinding(final Binding binding) throws Exception {
boolean exists = super.addBinding(binding); boolean exists = super.addBinding(binding);
if (!exists) { if (!exists) {
Address add = addAndUpdateAddressMap(binding.getAddress()); Address add = addAndUpdateAddressMap(binding.getAddress(), false);
if (add.containsWildCard()) { if (add.containsWildCard()) {
for (Address destAdd : add.getLinkedAddresses()) { for (Address destAdd : add.getLinkedAddresses()) {
super.addMappingInternal(destAdd.getAddress(), binding); super.addMappingInternal(destAdd.getAddress(), binding);
@ -103,19 +103,17 @@ public class WildcardAddressManager extends SimpleAddressManager {
for (Address destAdd : add.getLinkedAddresses()) { for (Address destAdd : add.getLinkedAddresses()) {
Bindings bindings = super.getBindingsForRoutingAddress(destAdd.getAddress()); Bindings bindings = super.getBindingsForRoutingAddress(destAdd.getAddress());
if (bindings != null) { if (bindings != null) {
for (Binding b : bindings.getBindings()) { super.addMappingsInternal(binding.getAddress(), bindings.getBindings());
super.addMappingInternal(binding.getAddress(), b);
}
} }
} }
} }
} }
return exists; return !exists;
} }
@Override @Override
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception { public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
Address add = addAndUpdateAddressMap(address); Address add = addAndUpdateAddressMap(address, true);
Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address); Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
if (bindingsForRoutingAddress != null) { if (bindingsForRoutingAddress != null) {
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType); bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
@ -142,13 +140,19 @@ public class WildcardAddressManager extends SimpleAddressManager {
public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception {
Binding binding = super.removeBinding(uniqueName, tx); Binding binding = super.removeBinding(uniqueName, tx);
if (binding != null) { if (binding != null) {
Address add = getAddress(binding.getAddress()); final SimpleString bindingAddress = binding.getAddress();
if (add.containsWildCard()) { final boolean containsWildcard = bindingAddress.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
for (Address theAddress : add.getLinkedAddresses()) { Address address = containsWildcard ? wildCardAddresses.get(bindingAddress) : addresses.get(bindingAddress);
super.removeBindingInternal(theAddress.getAddress(), uniqueName); if (address == null) {
address = new AddressImpl(bindingAddress, wildcardConfiguration);
} else {
if (containsWildcard) {
for (Address linkedAddress : address.getLinkedAddresses()) {
super.removeBindingInternal(linkedAddress.getAddress(), uniqueName);
}
} }
} }
removeAndUpdateAddressMap(add); removeAndUpdateAddressMap(address);
} }
return binding; return binding;
} }
@ -171,31 +175,21 @@ public class WildcardAddressManager extends SimpleAddressManager {
wildCardAddresses.clear(); wildCardAddresses.clear();
} }
private Address getAddress(final SimpleString address) { private Address addAndUpdateAddressMap(final SimpleString address, boolean getBiased) {
Address add = new AddressImpl(address, wildcardConfiguration); final boolean containsWildCard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
Address actualAddress; final Map<SimpleString, Address> addressMap = containsWildCard ? wildCardAddresses : addresses;
if (add.containsWildCard()) { Address actualAddress = null;
actualAddress = wildCardAddresses.get(address); if (getBiased) {
} else { // CHM::get doesn't need to synchronize anything, so it's to be preferred for getBiased cases
actualAddress = addresses.get(address); actualAddress = addressMap.get(address);
}
return actualAddress != null ? actualAddress : add;
}
private synchronized Address addAndUpdateAddressMap(final SimpleString address) {
Address actualAddress;
final boolean containsWildcard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
if (containsWildcard) {
actualAddress = wildCardAddresses.get(address);
} else {
actualAddress = addresses.get(address);
} }
if (actualAddress == null) { if (actualAddress == null) {
actualAddress = new AddressImpl(address, wildcardConfiguration); actualAddress = addressMap.computeIfAbsent(address, addressKey -> new AddressImpl(addressKey, wildcardConfiguration));
addAddress(address, actualAddress); }
assert actualAddress.containsWildCard() == containsWildCard;
if (containsWildcard) { synchronized (this) {
for (Address destAdd : addresses.values()) { if (containsWildCard) {
for (Address destAdd : this.addresses.values()) {
if (destAdd.matches(actualAddress)) { if (destAdd.matches(actualAddress)) {
destAdd.addLinkedAddress(actualAddress); destAdd.addLinkedAddress(actualAddress);
actualAddress.addLinkedAddress(destAdd); actualAddress.addLinkedAddress(destAdd);
@ -209,31 +203,24 @@ public class WildcardAddressManager extends SimpleAddressManager {
} }
} }
} }
} return actualAddress;
return actualAddress;
}
private void addAddress(final SimpleString address, final Address actualAddress) {
if (actualAddress.containsWildCard()) {
wildCardAddresses.put(address, actualAddress);
} else {
addresses.put(address, actualAddress);
} }
} }
private synchronized void removeAndUpdateAddressMap(final Address address) throws Exception { private void removeAndUpdateAddressMap(final Address address) throws Exception {
// we only remove if there are no bindings left // we only remove if there are no bindings left
Bindings bindings = super.getBindingsForRoutingAddress(address.getAddress()); Bindings bindings = super.getBindingsForRoutingAddress(address.getAddress());
if (bindings == null || bindings.getBindings().size() == 0) { if (bindings == null || bindings.getBindings().isEmpty()) {
List<Address> addresses = address.getLinkedAddresses(); synchronized (this) {
for (Address address1 : addresses) { for (Address address1 : address.getLinkedAddresses()) {
address1.removeLinkedAddress(address); address1.removeLinkedAddress(address);
Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress()); Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress());
if (linkedBindings == null || linkedBindings.getBindings().size() == 0) { if (linkedBindings == null || linkedBindings.getBindings().size() == 0) {
removeAddress(address1); removeAddress(address1);
}
} }
removeAddress(address);
} }
removeAddress(address);
} }
} }

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl; package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import javax.transaction.xa.Xid;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -83,7 +83,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
@Override @Override
public void run() { public void run() {
try { try {
bind.removeBinding(fake); bind.removeBindingByUniqueName(fake.getUniqueName());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl; package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.util.Collection;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -30,15 +29,11 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager; import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -220,59 +215,4 @@ public class WildcardAddressManagerPerfTest {
} }
} }
class BindingsFake implements Bindings {
ConcurrentHashSet<Binding> bindings = new ConcurrentHashSet<>();
@Override
public Collection<Binding> getBindings() {
return bindings;
}
@Override
public void addBinding(Binding binding) {
bindings.addIfAbsent(binding);
}
@Override
public void removeBinding(Binding binding) {
bindings.remove(binding);
}
@Override
public void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
}
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return null;
}
@Override
public void unproposed(SimpleString groupID) {
}
@Override
public void updated(QueueBinding binding) {
}
@Override
public boolean redistribute(Message message,
Queue originatingQueue,
RoutingContext context) throws Exception {
return false;
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
log.debug("routing message: " + message);
}
@Override
public boolean allowRedistribute() {
return false;
}
}
} }

View File

@ -17,10 +17,15 @@
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl; package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -40,6 +45,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/** /**
@ -139,6 +145,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#"))); assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#")));
} }
@Test
public void testWildCardAddBinding() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
}
@Test(expected = ActiveMQQueueExistsException.class)
public void testWildCardAddAlreadyExistingBindingShouldThrowException() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
ad.addBinding(new BindingFake("Queue1.#", "one"));
ad.addBinding(new BindingFake("Queue1.#", "one"));
}
@Test @Test
public void testWildCardAddressRemovalDifferentWildcard() throws Exception { public void testWildCardAddressRemovalDifferentWildcard() throws Exception {
@ -198,6 +219,54 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
} }
@Test
public void testConcurrentCalls() throws Exception {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
final WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null);
final SimpleString wildCard = SimpleString.toSimpleString("Topic1.>");
ad.addAddressInfo(new AddressInfo(wildCard, RoutingType.MULTICAST));
AtomicReference<Throwable> oops = new AtomicReference<>();
int numSubs = 500;
int numThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numSubs; i++ ) {
final int id = i;
executorService.submit(() -> {
try {
// add/remove is externally sync via postOffice
synchronized (executorService) {
// subscribe as wildcard
ad.addBinding(new BindingFake(SimpleString.toSimpleString("Topic1.>"), SimpleString.toSimpleString("" + id)));
}
SimpleString pubAddr = SimpleString.toSimpleString("Topic1." + id );
// publish to new address, will create
Bindings binding = ad.getBindingsForRoutingAddress(pubAddr);
// publish again, read only
binding = ad.getBindingsForRoutingAddress(pubAddr);
// cluster consumer, concurrent access
ad.updateMessageLoadBalancingTypeForAddress(wildCard, MessageLoadBalancingType.ON_DEMAND);
} catch (Exception e) {
e.printStackTrace();
oops.set(e);
}
});
}
executorService.shutdown();
assertTrue("finished on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
assertNull("no exceptions", oops.get());
}
class BindingFactoryFake implements BindingsFactory { class BindingFactoryFake implements BindingsFactory {
@Override @Override
@ -304,23 +373,23 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
} }
} }
class BindingsFake implements Bindings { static class BindingsFake implements Bindings {
ArrayList<Binding> bindings = new ArrayList<>(); ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<>();
@Override @Override
public Collection<Binding> getBindings() { public Collection<Binding> getBindings() {
return bindings; return bindings.values();
} }
@Override @Override
public void addBinding(Binding binding) { public void addBinding(Binding binding) {
bindings.add(binding); bindings.put(binding.getUniqueName(), binding);
} }
@Override @Override
public void removeBinding(Binding binding) { public Binding removeBindingByUniqueName(SimpleString uniqueName) {
bindings.remove(binding); return bindings.remove(uniqueName);
} }
@Override @Override