This commit is contained in:
Justin Bertram 2021-06-24 15:36:21 -05:00
commit 0275630b7e
2 changed files with 62 additions and 2 deletions

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -815,8 +816,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final Collection<Binding> bindingsForAddress = getDirectBindings(address);
if (force) {
for (Binding binding : bindingsForAddress) {
if (binding instanceof QueueBinding) {
((QueueBinding)binding).getQueue().deleteQueue(true);
if (binding instanceof LocalQueueBinding) {
((LocalQueueBinding)binding).getQueue().deleteQueue(true);
} else if (binding instanceof RemoteQueueBinding) {
removeBinding(binding.getUniqueName(), null, true);
}
}

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.jboss.logging.Logger;
import org.junit.Ignore;
@ -120,6 +125,58 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase {
}
@Test
public void testDeleteAddress() throws Exception {
final String ADDRESS = "queues.testaddress";
setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, ADDRESS, "queue0", null, false);
createQueue(1, ADDRESS, "queue0", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 1, 1, true);
waitForBindings(0, ADDRESS, 1, 1, false);
waitForBindings(1, ADDRESS, 1, 1, false);
// there should be both a local and a remote binding
Collection<Binding> bindings = servers[0].getPostOffice().getDirectBindings(SimpleString.toSimpleString(ADDRESS));
assertEquals(2, bindings.size());
// the remote binding should point to the SnF queue
SimpleString snf = null;
for (Binding binding : bindings) {
if (binding instanceof RemoteQueueBinding) {
snf = ((RemoteQueueBinding)binding).getQueue().getName();
}
}
assertNotNull(snf);
assertNotNull(servers[0].locateQueue(snf));
servers[0].getActiveMQServerControl().deleteAddress(ADDRESS, true);
// no bindings should remain but the SnF queue should still be there
bindings = servers[0].getPostOffice().getDirectBindings(SimpleString.toSimpleString(ADDRESS));
assertEquals(0, bindings.size());
assertNotNull(servers[0].locateQueue(snf));
}
@Test
public void testSimple_TwoNodes() throws Exception {
setupServer(0, false, isNetty());