This commit is contained in:
Clebert Suconic 2018-01-23 16:26:45 -05:00
commit 434841b14f
8 changed files with 140 additions and 4 deletions

View File

@ -46,6 +46,8 @@ public interface AddressManager {
Bindings getMatchingBindings(SimpleString address) throws Exception; Bindings getMatchingBindings(SimpleString address) throws Exception;
Bindings getDirectBindings(SimpleString address) throws Exception;
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;

View File

@ -94,6 +94,8 @@ public interface PostOffice extends ActiveMQComponent {
Bindings getMatchingBindings(SimpleString address) throws Exception; Bindings getMatchingBindings(SimpleString address) throws Exception;
Bindings getDirectBindings(SimpleString address) throws Exception;
Map<SimpleString, Binding> getAllBindings(); Map<SimpleString, Binding> getAllBindings();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;

View File

@ -532,7 +532,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public AddressInfo removeAddressInfo(SimpleString address) throws Exception { public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
synchronized (addressLock) { synchronized (addressLock) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeRemoveAddress(address) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeRemoveAddress(address) : null);
Bindings bindingsForAddress = getBindingsForAddress(address); final Bindings bindingsForAddress = getDirectBindings(address);
if (bindingsForAddress.getBindings().size() > 0) { if (bindingsForAddress.getBindings().size() > 0) {
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address); throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
} }
@ -701,6 +701,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return addressManager.getMatchingBindings(address); return addressManager.getMatchingBindings(address);
} }
@Override
public Bindings getDirectBindings(final SimpleString address) throws Exception {
return addressManager.getDirectBindings(address);
}
@Override @Override
public Map<SimpleString, Binding> getAllBindings() { public Map<SimpleString, Binding> getAllBindings() {
return addressManager.getBindings(); return addressManager.getBindings();

View File

@ -55,7 +55,7 @@ public class SimpleAddressManager implements AddressManager {
/** /**
* HashMap<Address, Binding> * HashMap<Address, Binding>
*/ */
private final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<>(); protected final ConcurrentMap<SimpleString, Bindings> mappings = new ConcurrentHashMap<>();
/** /**
* HashMap<QueueName, Binding> * HashMap<QueueName, Binding>
@ -136,6 +136,19 @@ public class SimpleAddressManager implements AddressManager {
return bindings; return bindings;
} }
@Override
public Bindings getDirectBindings(final SimpleString address) throws Exception {
Bindings bindings = bindingsFactory.createBindings(address);
for (Binding binding : nameMap.values()) {
if (binding.getAddress().equals(address)) {
bindings.addBinding(binding);
}
}
return bindings;
}
@Override @Override
public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception { public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception {

View File

@ -133,6 +133,8 @@ public class WildcardAddressManager extends SimpleAddressManager {
public AddressInfo removeAddressInfo(SimpleString address) throws Exception { public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
final AddressInfo removed = super.removeAddressInfo(address); final AddressInfo removed = super.removeAddressInfo(address);
if (removed != null) { if (removed != null) {
//Remove from mappings so removeAndUpdateAddressMap processes and cleanup
mappings.remove(address);
removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration)); removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration));
} }
return removed; return removed;

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.tests.integration.jms.consumer; package org.apache.activemq.artemis.tests.integration.jms.consumer;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSConsumer; import javax.jms.JMSConsumer;
import javax.jms.JMSContext; import javax.jms.JMSContext;
@ -28,8 +31,6 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser; import javax.jms.QueueBrowser;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -888,4 +889,49 @@ public class JmsConsumerTest extends JMSTestBase {
sess.createConsumer(sess.createTopic(topic2)); sess.createConsumer(sess.createTopic(topic2));
sess.close(); sess.close();
} }
/**
* ARTEMIS-1627 - Verify that a address can be removed when there are no direct
* bindings on the address but does have bindings on a linked address
*
* @throws Exception
*/
@Test
public void testAddressRemovalWithWildcardConsumer() throws Exception {
testAddressRemovalWithWithConsumers("durable.#", "durable.test");
}
@Test
public void testAddressRemovalWithNonWildcardConsumer() throws Exception {
testAddressRemovalWithWithConsumers("durable.test", "durable.#");
}
private void testAddressRemovalWithWithConsumers(String topic1, String topic2) throws Exception {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic1), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(topic2), RoutingType.MULTICAST));
conn = cf.createConnection();
conn.setClientID("clientId");
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = sess.createDurableConsumer(sess.createTopic(topic1), "sub1");
c1.close();
// Make sure topic2 address can be removed and the bindings still exist for topic1
server.removeAddressInfo(SimpleString.toSimpleString(topic2), null);
assertEquals(1, server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(topic1))
.getBindings().size());
// Re-create address by creating a consumer on the topic and make sure the
// wildcard and the direct consumer still receive the messages
c1 = sess.createDurableConsumer(sess.createTopic(topic1), "sub1");
MessageConsumer c2 = sess.createDurableConsumer(sess.createTopic(topic2), "sub2");
MessageProducer p1 = sess.createProducer(sess.createTopic("durable.test"));
p1.send(sess.createTextMessage("test"));
assertNotNull(c1.receive(1000));
assertNotNull(c2.receive(1000));
sess.close();
}
} }

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Address; import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
@ -135,6 +136,65 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#"))); assertNull(wildcardAddresses.get(SimpleString.toSimpleString("Topic1.#")));
} }
@Test
public void testWildCardAddressRemovalDifferentWildcard() throws Exception {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
ad.addBinding(new BindingFake("Topic1.>", "one"));
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
//Remove the address
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.test"));
//should still have 1 address and binding
assertEquals(1, ad.getAddresses().size());
assertEquals(1, ad.getBindings().size());
ad.removeBinding(SimpleString.toSimpleString("one"), null);
ad.removeAddressInfo(SimpleString.toSimpleString("Topic1.>"));
assertEquals(0, ad.getAddresses().size());
assertEquals(0, ad.getBindings().size());
}
@Test
public void testWildCardAddressDirectBindings() throws Exception {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test.test1"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test.test2"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic2.>"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic2.test"), RoutingType.MULTICAST));
ad.addBinding(new BindingFake("Topic1.>", "one"));
ad.addBinding(new BindingFake("Topic1.test", "two"));
ad.addBinding(new BindingFake("Topic2.test", "three"));
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(2, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test1")).getBindings().size());
assertEquals(1, ad.getBindingsForRoutingAddress(SimpleString.toSimpleString("Topic1.test.test2")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.>")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test1")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic1.test2")).getBindings().size());
assertEquals(0, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.>")).getBindings().size());
assertEquals(1, ad.getDirectBindings(SimpleString.toSimpleString("Topic2.test")).getBindings().size());
}
class BindingFactoryFake implements BindingsFactory { class BindingFactoryFake implements BindingsFactory {
@Override @Override

View File

@ -147,6 +147,12 @@ public class FakePostOffice implements PostOffice {
return null; return null;
} }
@Override
public Bindings getDirectBindings(final SimpleString address) {
return null;
}
@Override @Override
public Object getNotificationLock() { public Object getNotificationLock() {