diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index a88c9b4e5a..2e3f8e3ec1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -445,6 +445,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } catch (Exception ignore) { } } + records.clear(); } if (managementService != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java index 4884ed1696..6c3454ae9d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java @@ -20,9 +20,13 @@ import java.util.Collection; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.tests.util.Wait; import org.jboss.logging.Logger; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -125,6 +129,67 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase { } + @Test + public void testSimpleRestartClusterConnection() throws Exception { + setupServer(0, true, isNetty()); + setupServer(1, true, isNetty()); + setupServer(2, true, isNetty()); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 2, 0); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); + + startServers(0, 1, 2); + + waitForTopology(servers[0], 3); + waitForTopology(servers[1], 3); + waitForTopology(servers[2], 3); + + ClusterConnection clusterConnection0 = getServer(0).getClusterManager().getClusterConnection("cluster0"); + ClusterConnection clusterConnection1 = getServer(1).getClusterManager().getClusterConnection("cluster1"); + ClusterConnection clusterConnection2 = getServer(2).getClusterManager().getClusterConnection("cluster2"); + + Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection0).getRecords().size()); + Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection1).getRecords().size()); + Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection2).getRecords().size()); + + clusterConnection0.stop(); + clusterConnection1.stop(); + clusterConnection2.stop(); + + Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection0).getRecords().size()); + Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection1).getRecords().size()); + Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection2).getRecords().size()); + + clusterConnection0.start(); + clusterConnection1.start(); + clusterConnection2.start(); + + Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection0).getRecords().size()); + Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection1).getRecords().size()); + Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection2).getRecords().size()); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + createQueue(2, "queues.testaddress", "queue0", null, false); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + addConsumer(2, 2, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(2, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 2, 2, false); + waitForBindings(1, "queues.testaddress", 2, 2, false); + waitForBindings(2, "queues.testaddress", 2, 2, false); + } + @Test public void testDeleteAddress() throws Exception { final String ADDRESS = "queues.testaddress";