ARTEMIS-3366 deleteAddress can remove SnF queue
This commit is contained in:
parent
8b33279667
commit
45ae64a01c
|
@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||||
import org.apache.activemq.artemis.core.server.RouteContextList;
|
import org.apache.activemq.artemis.core.server.RouteContextList;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
@ -815,8 +816,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
final Collection<Binding> bindingsForAddress = getDirectBindings(address);
|
final Collection<Binding> bindingsForAddress = getDirectBindings(address);
|
||||||
if (force) {
|
if (force) {
|
||||||
for (Binding binding : bindingsForAddress) {
|
for (Binding binding : bindingsForAddress) {
|
||||||
if (binding instanceof QueueBinding) {
|
if (binding instanceof LocalQueueBinding) {
|
||||||
((QueueBinding)binding).getQueue().deleteQueue(true);
|
((LocalQueueBinding)binding).getQueue().deleteQueue(true);
|
||||||
|
} else if (binding instanceof RemoteQueueBinding) {
|
||||||
|
removeBinding(binding.getUniqueName(), null, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
@ -120,6 +125,58 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteAddress() throws Exception {
|
||||||
|
final String ADDRESS = "queues.testaddress";
|
||||||
|
|
||||||
|
setupServer(0, true, isNetty());
|
||||||
|
setupServer(1, true, isNetty());
|
||||||
|
|
||||||
|
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
||||||
|
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
||||||
|
|
||||||
|
startServers(0, 1);
|
||||||
|
|
||||||
|
waitForTopology(servers[0], 2);
|
||||||
|
waitForTopology(servers[1], 2);
|
||||||
|
|
||||||
|
setupSessionFactory(0, isNetty());
|
||||||
|
setupSessionFactory(1, isNetty());
|
||||||
|
|
||||||
|
createQueue(0, ADDRESS, "queue0", null, false);
|
||||||
|
createQueue(1, ADDRESS, "queue0", null, false);
|
||||||
|
|
||||||
|
addConsumer(0, 0, "queue0", null);
|
||||||
|
addConsumer(1, 1, "queue0", null);
|
||||||
|
|
||||||
|
waitForBindings(0, ADDRESS, 1, 1, true);
|
||||||
|
waitForBindings(1, ADDRESS, 1, 1, true);
|
||||||
|
|
||||||
|
waitForBindings(0, ADDRESS, 1, 1, false);
|
||||||
|
waitForBindings(1, ADDRESS, 1, 1, false);
|
||||||
|
|
||||||
|
// there should be both a local and a remote binding
|
||||||
|
Collection<Binding> bindings = servers[0].getPostOffice().getDirectBindings(SimpleString.toSimpleString(ADDRESS));
|
||||||
|
assertEquals(2, bindings.size());
|
||||||
|
|
||||||
|
// the remote binding should point to the SnF queue
|
||||||
|
SimpleString snf = null;
|
||||||
|
for (Binding binding : bindings) {
|
||||||
|
if (binding instanceof RemoteQueueBinding) {
|
||||||
|
snf = ((RemoteQueueBinding)binding).getQueue().getName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(snf);
|
||||||
|
assertNotNull(servers[0].locateQueue(snf));
|
||||||
|
|
||||||
|
servers[0].getActiveMQServerControl().deleteAddress(ADDRESS, true);
|
||||||
|
|
||||||
|
// no bindings should remain but the SnF queue should still be there
|
||||||
|
bindings = servers[0].getPostOffice().getDirectBindings(SimpleString.toSimpleString(ADDRESS));
|
||||||
|
assertEquals(0, bindings.size());
|
||||||
|
assertNotNull(servers[0].locateQueue(snf));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimple_TwoNodes() throws Exception {
|
public void testSimple_TwoNodes() throws Exception {
|
||||||
setupServer(0, false, isNetty());
|
setupServer(0, false, isNetty());
|
||||||
|
|
Loading…
Reference in New Issue