ARTEMIS-2990 - alway be getBiased and only publish complete records and only calculate linked addresses once ARTEMIS-2990
This commit is contained in:
parent
a5fd97d2c9
commit
a5d7a043dc
|
@ -63,7 +63,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
||||||
|
|
||||||
// this should only happen if we're routing to an address that has no mappings when we're running checkAllowable
|
// this should only happen if we're routing to an address that has no mappings when we're running checkAllowable
|
||||||
if (bindings == null && !wildCardAddresses.isEmpty()) {
|
if (bindings == null && !wildCardAddresses.isEmpty()) {
|
||||||
Address add = addAndUpdateAddressMap(address, true);
|
Address add = addAndUpdateAddressMap(address);
|
||||||
if (!add.containsWildCard()) {
|
if (!add.containsWildCard()) {
|
||||||
for (Address destAdd : add.getLinkedAddresses()) {
|
for (Address destAdd : add.getLinkedAddresses()) {
|
||||||
Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress());
|
Bindings b = super.getBindingsForRoutingAddress(destAdd.getAddress());
|
||||||
|
@ -94,7 +94,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
||||||
public boolean addBinding(final Binding binding) throws Exception {
|
public boolean addBinding(final Binding binding) throws Exception {
|
||||||
boolean exists = super.addBinding(binding);
|
boolean exists = super.addBinding(binding);
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
Address add = addAndUpdateAddressMap(binding.getAddress(), false);
|
Address add = addAndUpdateAddressMap(binding.getAddress());
|
||||||
if (add.containsWildCard()) {
|
if (add.containsWildCard()) {
|
||||||
for (Address destAdd : add.getLinkedAddresses()) {
|
for (Address destAdd : add.getLinkedAddresses()) {
|
||||||
super.addMappingInternal(destAdd.getAddress(), binding);
|
super.addMappingInternal(destAdd.getAddress(), binding);
|
||||||
|
@ -113,7 +113,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
public void updateMessageLoadBalancingTypeForAddress(SimpleString address, MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||||
Address add = addAndUpdateAddressMap(address, true);
|
Address add = addAndUpdateAddressMap(address);
|
||||||
Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
|
Bindings bindingsForRoutingAddress = super.getBindingsForRoutingAddress(address);
|
||||||
if (bindingsForRoutingAddress != null) {
|
if (bindingsForRoutingAddress != null) {
|
||||||
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
|
bindingsForRoutingAddress.setMessageLoadBalancingType(messageLoadBalancingType);
|
||||||
|
@ -175,36 +175,38 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
||||||
wildCardAddresses.clear();
|
wildCardAddresses.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Address addAndUpdateAddressMap(final SimpleString address, boolean getBiased) {
|
private Address addAndUpdateAddressMap(final SimpleString address) {
|
||||||
final boolean containsWildCard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
|
final boolean containsWildCard = address.containsEitherOf(wildcardConfiguration.getAnyWords(), wildcardConfiguration.getSingleWord());
|
||||||
final Map<SimpleString, Address> addressMap = containsWildCard ? wildCardAddresses : addresses;
|
final Map<SimpleString, Address> addressMap = containsWildCard ? wildCardAddresses : addresses;
|
||||||
Address actualAddress = null;
|
Address actualAddress = addressMap.get(address);
|
||||||
if (getBiased) {
|
|
||||||
// CHM::get doesn't need to synchronize anything, so it's to be preferred for getBiased cases
|
|
||||||
actualAddress = addressMap.get(address);
|
|
||||||
}
|
|
||||||
if (actualAddress == null) {
|
if (actualAddress == null) {
|
||||||
actualAddress = addressMap.computeIfAbsent(address, addressKey -> new AddressImpl(addressKey, wildcardConfiguration));
|
synchronized (this) {
|
||||||
}
|
actualAddress = addressMap.get(address);
|
||||||
assert actualAddress.containsWildCard() == containsWildCard;
|
if (actualAddress == null) {
|
||||||
synchronized (this) {
|
actualAddress = new AddressImpl(address, wildcardConfiguration);
|
||||||
if (containsWildCard) {
|
|
||||||
for (Address destAdd : this.addresses.values()) {
|
assert actualAddress.containsWildCard() == containsWildCard;
|
||||||
if (destAdd.matches(actualAddress)) {
|
if (containsWildCard) {
|
||||||
destAdd.addLinkedAddress(actualAddress);
|
for (Address destAdd : addresses.values()) {
|
||||||
actualAddress.addLinkedAddress(destAdd);
|
if (destAdd.matches(actualAddress)) {
|
||||||
}
|
destAdd.addLinkedAddress(actualAddress);
|
||||||
}
|
actualAddress.addLinkedAddress(destAdd);
|
||||||
} else {
|
}
|
||||||
for (Address destAdd : wildCardAddresses.values()) {
|
}
|
||||||
if (actualAddress.matches(destAdd)) {
|
} else {
|
||||||
destAdd.addLinkedAddress(actualAddress);
|
for (Address destAdd : wildCardAddresses.values()) {
|
||||||
actualAddress.addLinkedAddress(destAdd);
|
if (actualAddress.matches(destAdd)) {
|
||||||
|
destAdd.addLinkedAddress(actualAddress);
|
||||||
|
actualAddress.addLinkedAddress(destAdd);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// only publish when complete
|
||||||
|
addressMap.put(address, actualAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return actualAddress;
|
|
||||||
}
|
}
|
||||||
|
return actualAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeAndUpdateAddressMap(final Address address) throws Exception {
|
private void removeAndUpdateAddressMap(final Address address) throws Exception {
|
||||||
|
|
|
@ -152,6 +152,26 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
||||||
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
|
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void tesWildcardOnClusterUpdate() throws Exception {
|
||||||
|
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
|
||||||
|
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
|
||||||
|
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.A", "oneOnA")));
|
||||||
|
Assert.assertTrue(ad.addBinding(new BindingFake("Queue1.#", "one")));
|
||||||
|
|
||||||
|
Field wildcardAddressField = WildcardAddressManager.class.getDeclaredField("wildCardAddresses");
|
||||||
|
wildcardAddressField.setAccessible(true);
|
||||||
|
Map<SimpleString, Address> wildcardAddresses = (Map<SimpleString, Address>)wildcardAddressField.get(ad);
|
||||||
|
SimpleString addressOfInterest = SimpleString.toSimpleString("Queue1.#");
|
||||||
|
assertEquals(1, wildcardAddresses.get(addressOfInterest).getLinkedAddresses().size());
|
||||||
|
// whack the existing state, it should remain whacked!
|
||||||
|
wildcardAddresses.get(addressOfInterest).getLinkedAddresses().clear();
|
||||||
|
|
||||||
|
// simulate cluster, verify just reads linkedAddresses
|
||||||
|
ad.updateMessageLoadBalancingTypeForAddress(addressOfInterest, MessageLoadBalancingType.ON_DEMAND);
|
||||||
|
assertTrue("no addresses added", wildcardAddresses.get(addressOfInterest).getLinkedAddresses().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expected = ActiveMQQueueExistsException.class)
|
@Test(expected = ActiveMQQueueExistsException.class)
|
||||||
public void testWildCardAddAlreadyExistingBindingShouldThrowException() throws Exception {
|
public void testWildCardAddAlreadyExistingBindingShouldThrowException() throws Exception {
|
||||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
|
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
|
||||||
|
|
Loading…
Reference in New Issue