This commit is contained in:
Clebert Suconic 2021-09-18 10:45:55 -04:00
commit a1d2a37679
2 changed files with 66 additions and 0 deletions

View File

@ -445,6 +445,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
} catch (Exception ignore) {
}
}
records.clear();
}
if (managementService != null) {

View File

@ -20,9 +20,13 @@ import java.util.Collection;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -125,6 +129,67 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase {
}
@Test
public void testSimpleRestartClusterConnection() throws Exception {
setupServer(0, true, isNetty());
setupServer(1, true, isNetty());
setupServer(2, true, isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 2, 0);
setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
startServers(0, 1, 2);
waitForTopology(servers[0], 3);
waitForTopology(servers[1], 3);
waitForTopology(servers[2], 3);
ClusterConnection clusterConnection0 = getServer(0).getClusterManager().getClusterConnection("cluster0");
ClusterConnection clusterConnection1 = getServer(1).getClusterManager().getClusterConnection("cluster1");
ClusterConnection clusterConnection2 = getServer(2).getClusterManager().getClusterConnection("cluster2");
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection0).getRecords().size());
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection1).getRecords().size());
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection2).getRecords().size());
clusterConnection0.stop();
clusterConnection1.stop();
clusterConnection2.stop();
Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection0).getRecords().size());
Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection1).getRecords().size());
Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection2).getRecords().size());
clusterConnection0.start();
clusterConnection1.start();
clusterConnection2.start();
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection0).getRecords().size());
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection1).getRecords().size());
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection2).getRecords().size());
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 2, 2, false);
waitForBindings(1, "queues.testaddress", 2, 2, false);
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
@Test
public void testDeleteAddress() throws Exception {
final String ADDRESS = "queues.testaddress";