diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java index 0a6ea6359c..96707b82e7 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CompositeAddress.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; public class CompositeAddress { - public static String SEPARATOR = "::"; + public static final String SEPARATOR = "::"; public static String toFullyQualified(String address, String qName) { return toFullyQualified(SimpleString.toSimpleString(address), SimpleString.toSimpleString(qName)).toString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Address.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Address.java index 8ebcab79f5..e484553cad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Address.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Address.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.postoffice; -import java.util.List; +import java.util.Collection; import org.apache.activemq.artemis.api.core.SimpleString; @@ -31,7 +31,7 @@ public interface Address { boolean containsWildCard(); - List
getLinkedAddresses(); + Collection
getLinkedAddresses(); void addLinkedAddress(Address address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index 053acfae7e..70f1a6037e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.postoffice; import java.util.Collection; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -33,7 +34,7 @@ public interface Bindings extends UnproposalListener { void addBinding(Binding binding); - void removeBinding(Binding binding); + Binding removeBindingByUniqueName(SimpleString uniqueName); void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java index a0d15aaab2..e4889d7e61 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java @@ -16,12 +16,16 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.postoffice.Address; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.jctools.maps.NonBlockingHashSet; /** * Splits an address string into its hierarchical parts using {@link WildcardConfiguration#getDelimiter()} as delimiter. @@ -36,7 +40,7 @@ public class AddressImpl implements Address { private final boolean containsWildCard; - private final List
linkedAddresses = new ArrayList<>(); + private Set
linkedAddresses = null; private final WildcardConfiguration wildcardConfiguration; @@ -67,17 +71,27 @@ public class AddressImpl implements Address { } @Override - public List
getLinkedAddresses() { + public Collection
getLinkedAddresses() { + final Collection
linkedAddresses = this.linkedAddresses; + if (linkedAddresses == null) { + return Collections.emptySet(); + } return linkedAddresses; } @Override public void addLinkedAddress(final Address address) { + if (linkedAddresses == null) { + linkedAddresses = PlatformDependent.hasUnsafe() ? new NonBlockingHashSet<>() : new ConcurrentHashSet<>(); + } linkedAddresses.add(address); } @Override public void removeLinkedAddress(final Address actualAddress) { + if (linkedAddresses == null) { + return; + } linkedAddresses.remove(actualAddress); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index f5bb2a3682..dcc7b1b9a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -149,7 +149,6 @@ public final class BindingsImpl implements Bindings { } finally { updated(); } - } @Override @@ -162,7 +161,11 @@ public final class BindingsImpl implements Bindings { } @Override - public void removeBinding(final Binding binding) { + public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) { + final Binding binding = bindingsNameMap.remove(bindingUniqueName); + if (binding == null) { + return null; + } try { if (binding.isExclusive()) { exclusiveBindings.remove(binding); @@ -181,11 +184,12 @@ public final class BindingsImpl implements Bindings { } bindingsIdMap.remove(binding.getID()); - bindingsNameMap.remove(binding.getUniqueName()); + assert !bindingsNameMap.containsKey(binding.getUniqueName()); if (logger.isTraceEnabled()) { logger.trace("Removing binding " + binding + " from " + this + " bindingTable: " + debugBindings()); } + return binding; } finally { updated(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 7ca1a17034..66b30f1ce1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -208,31 +208,37 @@ public class SimpleAddressManager implements AddressManager { Bindings bindings = mappings.get(realAddress); if (bindings != null) { - removeMapping(bindableName, bindings); - + final SimpleString bindableQueueName = CompositeAddress.extractQueueName(bindableName); + final Binding binding = bindings.removeBindingByUniqueName(bindableQueueName); + if (binding == null) { + throw new IllegalStateException("Cannot find binding " + bindableName); + } if (bindings.getBindings().isEmpty()) { mappings.remove(realAddress); } } } - protected Binding removeMapping(final SimpleString bindableName, final Bindings bindings) { - Binding theBinding = null; + protected void addMappingsInternal(final SimpleString address, + final Collection newBindings) throws Exception { + if (newBindings.isEmpty()) { + return; + } + SimpleString realAddress = CompositeAddress.extractAddressName(address); + Bindings bindings = mappings.get(realAddress); - for (Binding binding : bindings.getBindings()) { - if (binding.getUniqueName().equals(CompositeAddress.extractQueueName(bindableName))) { - theBinding = binding; - break; + if (bindings == null) { + bindings = bindingsFactory.createBindings(realAddress); + + final Bindings prevBindings = mappings.putIfAbsent(realAddress, bindings); + + if (prevBindings != null) { + bindings = prevBindings; } } - - if (theBinding == null) { - throw new IllegalStateException("Cannot find binding " + bindableName); + for (Binding binding : newBindings) { + bindings.addBinding(binding); } - - bindings.removeBinding(theBinding); - - return theBinding; } protected boolean addMappingInternal(final SimpleString address, final Binding binding) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java index f30fa62b5d..4492812ba4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -28,11 +31,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.transaction.Transaction; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * extends the simple manager to allow wildcard addresses to be used. */ @@ -65,20 +63,22 @@ public class WildcardAddressManager extends SimpleAddressManager { // this should only happen if we're routing to an address that has no mappings when we're running checkAllowable if (bindings == null && !wildCardAddresses.isEmpty()) { - Address add = addAndUpdateAddressMap(address); + Address add = addAndUpdateAddressMap(address, true); if (!add.containsWildCard()) { for (Address destAdd : add.getLinkedAddresses()) { Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress()); if (b != null) { - Collection theBindings = b.getBindings(); - for (Binding theBinding : theBindings) { - super.addMappingInternal(address, theBinding); + super.addMappingsInternal(address, b.getBindings()); + if (bindings == null) { + bindings = super.getBindingsForRoutingAddress(address); } - super.getBindingsForRoutingAddress(address).setMessageLoadBalancingType(b.getMessageLoadBalancingType()); + bindings.setMessageLoadBalancingType(b.getMessageLoadBalancingType()); } } } - bindings = super.getBindingsForRoutingAddress(address); + if (bindings == null) { + bindings = super.getBindingsForRoutingAddress(address); + } } return bindings; } @@ -94,7 +94,7 @@ public class WildcardAddressManager extends SimpleAddressManager { public boolean addBinding(final Binding binding) throws Exception { boolean exists = super.addBinding(binding); if (!exists) { - Address add = addAndUpdateAddressMap(binding.getAddress()); + Address add = addAndUpdateAddressMap(binding.getAddress(), false); if (add.containsWildCard()) { for (Address destAdd : add.getLinkedAddresses()) { super.addMappingInternal(destAdd.getAddress(), binding); @@ -103,19 +103,17 @@ public class WildcardAddressManager extends SimpleAddressManager { for (Address destAdd : add.getLinkedAddresses()) { Bindings bindings = super.getBindingsForRoutingAddress(destAdd.getAddress()); if (bindings != null) { - for (Binding b : bindings.getBindings()) { - super.addMappingInternal(binding.getAddress(), b); - } + super.addMappingsInternal(binding.getAddress(), bindings.getBindings()); } } } } - return exists; + return !exists; } @Override public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception { - Address add = addAndUpdateAddressMap(address); + Address add = addAndUpdateAddressMap(address, true); Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address); if (bindingsForRoutingAddress != null) { bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType); @@ -142,13 +140,19 @@ public class WildcardAddressManager extends SimpleAddressManager { public Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { Binding binding = super.removeBinding(uniqueName, tx); if (binding != null) { - Address add = getAddress(binding.getAddress()); - if (add.containsWildCard()) { - for (Address theAddress : add.getLinkedAddresses()) { - super.removeBindingInternal(theAddress.getAddress(), uniqueName); + final SimpleString bindingAddress = binding.getAddress(); + final boolean containsWildcard = bindingAddress.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord()); + Address address = containsWildcard ? wildCardAddresses.get(bindingAddress) : addresses.get(bindingAddress); + if (address == null) { + address = new AddressImpl(bindingAddress, wildcardConfiguration); + } else { + if (containsWildcard) { + for (Address linkedAddress : address.getLinkedAddresses()) { + super.removeBindingInternal(linkedAddress.getAddress(), uniqueName); + } } } - removeAndUpdateAddressMap(add); + removeAndUpdateAddressMap(address); } return binding; } @@ -171,31 +175,21 @@ public class WildcardAddressManager extends SimpleAddressManager { wildCardAddresses.clear(); } - private Address getAddress(final SimpleString address) { - Address add = new AddressImpl(address, wildcardConfiguration); - Address actualAddress; - if (add.containsWildCard()) { - actualAddress = wildCardAddresses.get(address); - } else { - actualAddress = addresses.get(address); - } - return actualAddress != null ? actualAddress : add; - } - - private synchronized Address addAndUpdateAddressMap(final SimpleString address) { - Address actualAddress; - final boolean containsWildcard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord()); - if (containsWildcard) { - actualAddress = wildCardAddresses.get(address); - } else { - actualAddress = addresses.get(address); + private Address addAndUpdateAddressMap(final SimpleString address, boolean getBiased) { + final boolean containsWildCard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord()); + final Map addressMap = containsWildCard ? wildCardAddresses : addresses; + Address actualAddress = null; + if (getBiased) { + // CHM::get doesn't need to synchronize anything, so it's to be preferred for getBiased cases + actualAddress = addressMap.get(address); } if (actualAddress == null) { - actualAddress = new AddressImpl(address, wildcardConfiguration); - addAddress(address, actualAddress); - - if (containsWildcard) { - for (Address destAdd : addresses.values()) { + actualAddress = addressMap.computeIfAbsent(address, addressKey -> new AddressImpl(addressKey, wildcardConfiguration)); + } + assert actualAddress.containsWildCard() == containsWildCard; + synchronized (this) { + if (containsWildCard) { + for (Address destAdd : this.addresses.values()) { if (destAdd.matches(actualAddress)) { destAdd.addLinkedAddress(actualAddress); actualAddress.addLinkedAddress(destAdd); @@ -209,31 +203,24 @@ public class WildcardAddressManager extends SimpleAddressManager { } } } - } - return actualAddress; - } - - private void addAddress(final SimpleString address, final Address actualAddress) { - if (actualAddress.containsWildCard()) { - wildCardAddresses.put(address, actualAddress); - } else { - addresses.put(address, actualAddress); + return actualAddress; } } - private synchronized void removeAndUpdateAddressMap(final Address address) throws Exception { + private void removeAndUpdateAddressMap(final Address address) throws Exception { // we only remove if there are no bindings left Bindings bindings = super.getBindingsForRoutingAddress(address.getAddress()); - if (bindings == null || bindings.getBindings().size() == 0) { - List
addresses = address.getLinkedAddresses(); - for (Address address1 : addresses) { - address1.removeLinkedAddress(address); - Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress()); - if (linkedBindings == null || linkedBindings.getBindings().size() == 0) { - removeAddress(address1); + if (bindings == null || bindings.getBindings().isEmpty()) { + synchronized (this) { + for (Address address1 : address.getLinkedAddresses()) { + address1.removeLinkedAddress(address); + Bindings linkedBindings = super.getBindingsForRoutingAddress(address1.getAddress()); + if (linkedBindings == null || linkedBindings.getBindings().size() == 0) { + removeAddress(address1); + } } + removeAddress(address); } - removeAddress(address); } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 0b0b1083a1..80cd76af4f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -16,12 +16,12 @@ */ package org.apache.activemq.artemis.tests.unit.core.postoffice.impl; +import javax.transaction.xa.Xid; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import javax.transaction.xa.Xid; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -83,7 +83,7 @@ public class BindingsImplTest extends ActiveMQTestBase { @Override public void run() { try { - bind.removeBinding(fake); + bind.removeBindingByUniqueName(fake.getUniqueName()); } catch (Exception e) { e.printStackTrace(); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java index 43545e4662..cdcc0040e5 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.unit.core.postoffice.impl; -import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -30,15 +29,11 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; -import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager; import org.apache.activemq.artemis.core.server.Bindable; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; import org.junit.Ignore; import org.junit.Test; @@ -220,59 +215,4 @@ public class WildcardAddressManagerPerfTest { } } - class BindingsFake implements Bindings { - - ConcurrentHashSet bindings = new ConcurrentHashSet<>(); - - @Override - public Collection getBindings() { - return bindings; - } - - @Override - public void addBinding(Binding binding) { - bindings.addIfAbsent(binding); - } - - @Override - public void removeBinding(Binding binding) { - bindings.remove(binding); - } - - @Override - public void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) { - - } - - @Override - public MessageLoadBalancingType getMessageLoadBalancingType() { - return null; - } - - @Override - public void unproposed(SimpleString groupID) { - } - - @Override - public void updated(QueueBinding binding) { - } - - @Override - public boolean redistribute(Message message, - Queue originatingQueue, - RoutingContext context) throws Exception { - return false; - } - - @Override - public void route(Message message, RoutingContext context) throws Exception { - log.debug("routing message: " + message); - } - - @Override - public boolean allowRedistribute() { - return false; - } - } - } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 24e2651aba..6d189b2ad8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -17,10 +17,15 @@ package org.apache.activemq.artemis.tests.unit.core.postoffice.impl; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -40,6 +45,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.jboss.logging.Logger; +import org.junit.Assert; import org.junit.Test; /** @@ -139,6 +145,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#"))); } + @Test + public void testWildCardAddBinding() throws Exception { + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null); + ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST)); + Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one"))); + } + + @Test(expected = ActiveMQQueueExistsException.class) + public void testWildCardAddAlreadyExistingBindingShouldThrowException() throws Exception { + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null); + ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST)); + ad.addBinding(new BindingFake("Queue1.#", "one")); + ad.addBinding(new BindingFake("Queue1.#", "one")); + } + @Test public void testWildCardAddressRemovalDifferentWildcard() throws Exception { @@ -198,6 +219,54 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { } + @Test + public void testConcurrentCalls() throws Exception { + final WildcardConfiguration configuration = new WildcardConfiguration(); + configuration.setAnyWords('>'); + final WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null); + + final SimpleString wildCard = SimpleString.toSimpleString("Topic1.>"); + ad.addAddressInfo(new AddressInfo(wildCard, RoutingType.MULTICAST)); + + AtomicReference oops = new AtomicReference<>(); + int numSubs = 500; + int numThreads = 2; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + + for (int i = 0; i < numSubs; i++ ) { + final int id = i; + + executorService.submit(() -> { + try { + + // add/remove is externally sync via postOffice + synchronized (executorService) { + // subscribe as wildcard + ad.addBinding(new BindingFake(SimpleString.toSimpleString("Topic1.>"), SimpleString.toSimpleString("" + id))); + } + + SimpleString pubAddr = SimpleString.toSimpleString("Topic1." + id ); + // publish to new address, will create + Bindings binding = ad.getBindingsForRoutingAddress(pubAddr); + + // publish again, read only + binding = ad.getBindingsForRoutingAddress(pubAddr); + + // cluster consumer, concurrent access + ad.updateMessageLoadBalancingTypeForAddress(wildCard, MessageLoadBalancingType.ON_DEMAND); + + } catch (Exception e) { + e.printStackTrace(); + oops.set(e); + } + }); + } + + executorService.shutdown(); + assertTrue("finished on time", executorService.awaitTermination(10, TimeUnit.MINUTES)); + assertNull("no exceptions", oops.get()); + } + class BindingFactoryFake implements BindingsFactory { @Override @@ -304,23 +373,23 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { } } - class BindingsFake implements Bindings { + static class BindingsFake implements Bindings { - ArrayList bindings = new ArrayList<>(); + ConcurrentHashMap bindings = new ConcurrentHashMap<>(); @Override public Collection getBindings() { - return bindings; + return bindings.values(); } @Override public void addBinding(Binding binding) { - bindings.add(binding); + bindings.put(binding.getUniqueName(), binding); } @Override - public void removeBinding(Binding binding) { - bindings.remove(binding); + public Binding removeBindingByUniqueName(SimpleString uniqueName) { + return bindings.remove(uniqueName); } @Override