ARTEMIS-1610 Properly remove an address from the WildcardAddressManager
When an address is removed from the address manager its linked addresses also need to be removed if there are no more bindings for the address. Also adding a null check on bindings of linked addresses when a new binding is added
This commit is contained in:
parent
14f149d755
commit
67a9220d24
|
@ -70,7 +70,7 @@ public interface AddressManager {
|
|||
* it will throw an exception if the address doesn't exist */
|
||||
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes) throws Exception;
|
||||
|
||||
AddressInfo removeAddressInfo(SimpleString address);
|
||||
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
||||
|
||||
AddressInfo getAddressInfo(SimpleString address);
|
||||
|
||||
|
|
|
@ -325,7 +325,7 @@ public class SimpleAddressManager implements AddressManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AddressInfo removeAddressInfo(SimpleString address) {
|
||||
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
|
||||
return addressInfoMap.remove(address);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.postoffice.Address;
|
|||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -95,8 +96,10 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
|||
} else {
|
||||
for (Address destAdd : add.getLinkedAddresses()) {
|
||||
Bindings bindings = super.getBindingsForRoutingAddress(destAdd.getAddress());
|
||||
for (Binding b : bindings.getBindings()) {
|
||||
super.addMappingInternal(binding.getAddress(), b);
|
||||
if (bindings != null) {
|
||||
for (Binding b : bindings.getBindings()) {
|
||||
super.addMappingInternal(binding.getAddress(), b);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,6 +129,15 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
|||
return binding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
|
||||
final AddressInfo removed = super.removeAddressInfo(address);
|
||||
if (removed != null) {
|
||||
removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration));
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
super.clear();
|
||||
|
|
|
@ -31,12 +31,14 @@ import javax.jms.TextMessage;
|
|||
import java.util.Enumeration;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
|
@ -815,4 +817,75 @@ public class JmsConsumerTest extends JMSTestBase {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for ARTEMIS-1610
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testConsumerAfterWildcardAddressRemoval() throws Exception {
|
||||
String queue1 = "queue.#";
|
||||
String topic1 = "durable.#";
|
||||
String topic2 = "durable.test";
|
||||
|
||||
//Create a new address along with 1 queue for it (this cases a wildcard address to get registered
|
||||
//inside the WildcardAddressManager manager when the binding is created)
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue1), RoutingType.ANYCAST));
|
||||
server.createQueue(SimpleString.toSimpleString(queue1), RoutingType.ANYCAST,
|
||||
SimpleString.toSimpleString(queue1), null, false, false);
|
||||
|
||||
//create addresses for both topics
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic1), RoutingType.MULTICAST));
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic2), RoutingType.MULTICAST));
|
||||
|
||||
//Remove the wildcard address associated with topic2
|
||||
server.removeAddressInfo(SimpleString.toSimpleString(topic1), null);
|
||||
|
||||
conn = cf.createConnection();
|
||||
conn.setClientID("clientId");
|
||||
conn.start();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
//Verify consumer can be created without issue - this caused a NPE
|
||||
//before ARTEMIS-1610
|
||||
sess.createConsumer(sess.createTopic(topic2));
|
||||
sess.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for ARTEMIS-1610
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testConsumerAfterWildcardConsumer() throws Exception {
|
||||
String queue1 = "queue.#";
|
||||
String topic1 = "durable.#";
|
||||
String topic2 = "durable.test";
|
||||
|
||||
//Create a new address along with 1 queue for it (this cases a wildcard address to get registered
|
||||
//inside the WildcardAddressManager manager when the binding is created)
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue1), RoutingType.ANYCAST));
|
||||
server.createQueue(SimpleString.toSimpleString(queue1), RoutingType.ANYCAST,
|
||||
SimpleString.toSimpleString(queue1), null, false, false);
|
||||
|
||||
//create addresses for both topics
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic1), RoutingType.MULTICAST));
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic2), RoutingType.MULTICAST));
|
||||
|
||||
conn = cf.createConnection();
|
||||
conn.setClientID("clientId");
|
||||
conn.start();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
//create and close consumer on wildcard
|
||||
MessageConsumer c = sess.createConsumer(sess.createTopic(topic1));
|
||||
c.close();
|
||||
|
||||
//Verify consumer can be created without issue - this caused a NPE
|
||||
//before ARTEMIS-1610
|
||||
//will create binding for this topic and will link addresses with durable.# so need to do a null check
|
||||
//on binding add of linked addresses
|
||||
sess.createConsumer(sess.createTopic(topic2));
|
||||
sess.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,12 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.unit.core.postoffice.impl;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.postoffice.Address;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
|
@ -31,6 +35,7 @@ import org.apache.activemq.artemis.core.server.Bindable;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -99,6 +104,37 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
|||
assertEquals("Exception happened during the process", 0, errors);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for ARTEMIS-1610
|
||||
* @throws Exception
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testWildCardAddressRemoval() throws Exception {
|
||||
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.#"), RoutingType.MULTICAST));
|
||||
ad.addBinding(new BindingFake("Topic1.topic", "two"));
|
||||
ad.addBinding(new BindingFake("Queue1.#", "one"));
|
||||
|
||||
Field wildcardAddressField = WildcardAddressManager.class.getDeclaredField("wildCardAddresses");
|
||||
wildcardAddressField.setAccessible(true);
|
||||
Map<SimpleString, Address> wildcardAddresses = (Map<SimpleString, Address>)wildcardAddressField.get(ad);
|
||||
|
||||
//Calling this method will trigger the wildcard to be added to the wildcard map internal
|
||||
//to WildcardAddressManager
|
||||
ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.#"));
|
||||
|
||||
//Remove the address
|
||||
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.#"));
|
||||
|
||||
//Verify the address was cleaned up properly
|
||||
assertEquals(1, wildcardAddresses.size());
|
||||
assertNull(ad.getAddressInfo(SimpleString.toSimpleString("Topic1.#")));
|
||||
assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#")));
|
||||
}
|
||||
|
||||
class BindingFactoryFake implements BindingsFactory {
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue