ARTEMIS-3491 Fix cluster connection restart
This commit is contained in:
parent
515ac3a7cb
commit
569c5994a5
|
@ -445,6 +445,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
} catch (Exception ignore) {
|
||||
}
|
||||
}
|
||||
records.clear();
|
||||
}
|
||||
|
||||
if (managementService != null) {
|
||||
|
|
|
@ -20,9 +20,13 @@ import java.util.Collection;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -125,6 +129,67 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleRestartClusterConnection() throws Exception {
|
||||
setupServer(0, true, isNetty());
|
||||
setupServer(1, true, isNetty());
|
||||
setupServer(2, true, isNetty());
|
||||
|
||||
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
|
||||
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 2, 0);
|
||||
setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
|
||||
|
||||
startServers(0, 1, 2);
|
||||
|
||||
waitForTopology(servers[0], 3);
|
||||
waitForTopology(servers[1], 3);
|
||||
waitForTopology(servers[2], 3);
|
||||
|
||||
ClusterConnection clusterConnection0 = getServer(0).getClusterManager().getClusterConnection("cluster0");
|
||||
ClusterConnection clusterConnection1 = getServer(1).getClusterManager().getClusterConnection("cluster1");
|
||||
ClusterConnection clusterConnection2 = getServer(2).getClusterManager().getClusterConnection("cluster2");
|
||||
|
||||
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection0).getRecords().size());
|
||||
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection1).getRecords().size());
|
||||
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection2).getRecords().size());
|
||||
|
||||
clusterConnection0.stop();
|
||||
clusterConnection1.stop();
|
||||
clusterConnection2.stop();
|
||||
|
||||
Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection0).getRecords().size());
|
||||
Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection1).getRecords().size());
|
||||
Assert.assertEquals(0, ((ClusterConnectionImpl)clusterConnection2).getRecords().size());
|
||||
|
||||
clusterConnection0.start();
|
||||
clusterConnection1.start();
|
||||
clusterConnection2.start();
|
||||
|
||||
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection0).getRecords().size());
|
||||
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection1).getRecords().size());
|
||||
Wait.assertEquals(2, () -> ((ClusterConnectionImpl)clusterConnection2).getRecords().size());
|
||||
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
setupSessionFactory(2, isNetty());
|
||||
|
||||
createQueue(0, "queues.testaddress", "queue0", null, false);
|
||||
createQueue(1, "queues.testaddress", "queue0", null, false);
|
||||
createQueue(2, "queues.testaddress", "queue0", null, false);
|
||||
|
||||
addConsumer(0, 0, "queue0", null);
|
||||
addConsumer(1, 1, "queue0", null);
|
||||
addConsumer(2, 2, "queue0", null);
|
||||
|
||||
waitForBindings(0, "queues.testaddress", 1, 1, true);
|
||||
waitForBindings(1, "queues.testaddress", 1, 1, true);
|
||||
waitForBindings(2, "queues.testaddress", 1, 1, true);
|
||||
|
||||
waitForBindings(0, "queues.testaddress", 2, 2, false);
|
||||
waitForBindings(1, "queues.testaddress", 2, 2, false);
|
||||
waitForBindings(2, "queues.testaddress", 2, 2, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteAddress() throws Exception {
|
||||
final String ADDRESS = "queues.testaddress";
|
||||
|
|
Loading…
Reference in New Issue