ARTEMIS-1627 - Support removing addresses that do not have direct
bindings If there are no direct bindings on an address and only linked bindings then the address should be able to be removed from the broker
This commit is contained in:
parent
dafcbb70a3
commit
8b6df5b73a
|
@ -46,6 +46,8 @@ public interface AddressManager {
|
|||
|
||||
Bindings getMatchingBindings(SimpleString address) throws Exception;
|
||||
|
||||
Bindings getDirectBindings(SimpleString address) throws Exception;
|
||||
|
||||
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
|
||||
|
||||
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
|
||||
|
|
|
@ -94,6 +94,8 @@ public interface PostOffice extends ActiveMQComponent {
|
|||
|
||||
Bindings getMatchingBindings(SimpleString address) throws Exception;
|
||||
|
||||
Bindings getDirectBindings(SimpleString address) throws Exception;
|
||||
|
||||
Map<SimpleString, Binding> getAllBindings();
|
||||
|
||||
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
|
||||
|
|
|
@ -532,7 +532,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeRemoveAddress(address) : null);
|
||||
Bindings bindingsForAddress = getBindingsForAddress(address);
|
||||
final Bindings bindingsForAddress = getDirectBindings(address);
|
||||
if (bindingsForAddress.getBindings().size() > 0) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
|
||||
}
|
||||
|
@ -701,6 +701,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
return addressManager.getMatchingBindings(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bindings getDirectBindings(final SimpleString address) throws Exception {
|
||||
return addressManager.getDirectBindings(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<SimpleString, Binding> getAllBindings() {
|
||||
return addressManager.getBindings();
|
||||
|
|
|
@ -55,7 +55,7 @@ public class SimpleAddressManager implements AddressManager {
|
|||
/**
|
||||
* HashMap<Address, Binding>
|
||||
*/
|
||||
private final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<>();
|
||||
protected final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* HashMap<QueueName, Binding>
|
||||
|
@ -136,6 +136,19 @@ public class SimpleAddressManager implements AddressManager {
|
|||
return bindings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bindings getDirectBindings(final SimpleString address) throws Exception {
|
||||
Bindings bindings = bindingsFactory.createBindings(address);
|
||||
|
||||
for (Binding binding : nameMap.values()) {
|
||||
if (binding.getAddress().equals(address)) {
|
||||
bindings.addBinding(binding);
|
||||
}
|
||||
}
|
||||
|
||||
return bindings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {
|
||||
|
||||
|
|
|
@ -133,6 +133,8 @@ public class WildcardAddressManager extends SimpleAddressManager {
|
|||
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
|
||||
final AddressInfo removed = super.removeAddressInfo(address);
|
||||
if (removed != null) {
|
||||
//Remove from mappings so removeAndUpdateAddressMap processes and cleanup
|
||||
mappings.remove(address);
|
||||
removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration));
|
||||
}
|
||||
return removed;
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jms.consumer;
|
||||
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSConsumer;
|
||||
import javax.jms.JMSContext;
|
||||
|
@ -28,8 +31,6 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.QueueBrowser;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -888,4 +889,49 @@ public class JmsConsumerTest extends JMSTestBase {
|
|||
sess.createConsumer(sess.createTopic(topic2));
|
||||
sess.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* ARTEMIS-1627 - Verify that a address can be removed when there are no direct
|
||||
* bindings on the address but does have bindings on a linked address
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testAddressRemovalWithWildcardConsumer() throws Exception {
|
||||
testAddressRemovalWithWithConsumers("durable.#", "durable.test");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddressRemovalWithNonWildcardConsumer() throws Exception {
|
||||
testAddressRemovalWithWithConsumers("durable.test", "durable.#");
|
||||
}
|
||||
|
||||
private void testAddressRemovalWithWithConsumers(String topic1, String topic2) throws Exception {
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic1), RoutingType.MULTICAST));
|
||||
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic2), RoutingType.MULTICAST));
|
||||
|
||||
conn = cf.createConnection();
|
||||
conn.setClientID("clientId");
|
||||
conn.start();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageConsumer c1 = sess.createDurableConsumer(sess.createTopic(topic1), "sub1");
|
||||
c1.close();
|
||||
|
||||
// Make sure topic2 address can be removed and the bindings still exist for topic1
|
||||
server.removeAddressInfo(SimpleString.toSimpleString(topic2), null);
|
||||
assertEquals(1, server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(topic1))
|
||||
.getBindings().size());
|
||||
|
||||
// Re-create address by creating a consumer on the topic and make sure the
|
||||
// wildcard and the direct consumer still receive the messages
|
||||
c1 = sess.createDurableConsumer(sess.createTopic(topic1), "sub1");
|
||||
MessageConsumer c2 = sess.createDurableConsumer(sess.createTopic(topic2), "sub2");
|
||||
MessageProducer p1 = sess.createProducer(sess.createTopic("durable.test"));
|
||||
p1.send(sess.createTextMessage("test"));
|
||||
|
||||
assertNotNull(c1.receive(1000));
|
||||
assertNotNull(c2.receive(1000));
|
||||
sess.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
|||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.postoffice.Address;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
|
@ -135,6 +136,65 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
|||
assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWildCardAddressRemovalDifferentWildcard() throws Exception {
|
||||
|
||||
final WildcardConfiguration configuration = new WildcardConfiguration();
|
||||
configuration.setAnyWords('>');
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null);
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
|
||||
ad.addBinding(new BindingFake("Topic1.>", "one"));
|
||||
|
||||
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
|
||||
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
|
||||
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
|
||||
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
|
||||
|
||||
//Remove the address
|
||||
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.test"));
|
||||
|
||||
//should still have 1 address and binding
|
||||
assertEquals(1, ad.getAddresses().size());
|
||||
assertEquals(1, ad.getBindings().size());
|
||||
|
||||
ad.removeBinding(SimpleString.toSimpleString("one"), null);
|
||||
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.>"));
|
||||
|
||||
assertEquals(0, ad.getAddresses().size());
|
||||
assertEquals(0, ad.getBindings().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWildCardAddressDirectBindings() throws Exception {
|
||||
|
||||
final WildcardConfiguration configuration = new WildcardConfiguration();
|
||||
configuration.setAnyWords('>');
|
||||
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null);
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test.test1"), RoutingType.MULTICAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test.test2"), RoutingType.MULTICAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic2.>"), RoutingType.MULTICAST));
|
||||
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic2.test"), RoutingType.MULTICAST));
|
||||
ad.addBinding(new BindingFake("Topic1.>", "one"));
|
||||
ad.addBinding(new BindingFake("Topic1.test", "two"));
|
||||
ad.addBinding(new BindingFake("Topic2.test", "three"));
|
||||
|
||||
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
|
||||
assertEquals(2, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
|
||||
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test1")).getBindings().size());
|
||||
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test2")).getBindings().size());
|
||||
|
||||
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
|
||||
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
|
||||
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).getBindings().size());
|
||||
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).getBindings().size());
|
||||
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).getBindings().size());
|
||||
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).getBindings().size());
|
||||
|
||||
}
|
||||
|
||||
class BindingFactoryFake implements BindingsFactory {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -147,6 +147,12 @@ public class FakePostOffice implements PostOffice {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bindings getDirectBindings(final SimpleString address) {
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getNotificationLock() {
|
||||
|
||||
|
|
Loading…
Reference in New Issue