Fix ScaleDownTest
This commit is contained in:
parent
19147113cb
commit
2a2a94733b
|
@ -76,6 +76,8 @@ public class ScaleDownTest extends ClusterTestBase {
|
||||||
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
||||||
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
||||||
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
||||||
|
servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
|
||||||
|
servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
|
||||||
startServers(0, 1);
|
startServers(0, 1);
|
||||||
setupSessionFactory(0, isNetty());
|
setupSessionFactory(0, isNetty());
|
||||||
setupSessionFactory(1, isNetty());
|
setupSessionFactory(1, isNetty());
|
||||||
|
@ -156,10 +158,6 @@ public class ScaleDownTest extends ClusterTestBase {
|
||||||
createQueue(0, addressName2, queueName2, null, false);
|
createQueue(0, addressName2, queueName2, null, false);
|
||||||
createQueue(1, addressName2, queueName2, null, false);
|
createQueue(1, addressName2, queueName2, null, false);
|
||||||
|
|
||||||
// add consumers to node 1 to force any messages we send into the sf queue
|
|
||||||
addConsumer(0, 1, queueName1, null);
|
|
||||||
addConsumer(1, 1, queueName2, null);
|
|
||||||
|
|
||||||
// find and pause the sf queue so no messages actually move from node 0 to node 1
|
// find and pause the sf queue so no messages actually move from node 0 to node 1
|
||||||
String sfQueueName = null;
|
String sfQueueName = null;
|
||||||
for (Map.Entry<SimpleString, Binding> entry : servers[0].getPostOffice().getAllBindings().entrySet()) {
|
for (Map.Entry<SimpleString, Binding> entry : servers[0].getPostOffice().getAllBindings().entrySet()) {
|
||||||
|
@ -175,17 +173,22 @@ public class ScaleDownTest extends ClusterTestBase {
|
||||||
|
|
||||||
assertNotNull(sfQueueName);
|
assertNotNull(sfQueueName);
|
||||||
|
|
||||||
// send messages to node 0 that will get stuck in the paused sf queue going to node 1
|
// send messages to node 0
|
||||||
send(0, addressName1, TEST_SIZE, false, null);
|
send(0, addressName1, TEST_SIZE, false, null);
|
||||||
send(0, addressName2, TEST_SIZE, false, null);
|
send(0, addressName2, TEST_SIZE, false, null);
|
||||||
removeConsumer(0);
|
|
||||||
removeConsumer(1);
|
|
||||||
|
|
||||||
// at this point on node 0 there should be 0 messages in testQueue and TEST_SIZE messages in the sfQueue
|
// add consumers to node 1 to force messages messages to redistribute to node 2 through the paused sf queue
|
||||||
|
addConsumer(0, 1, queueName1, null);
|
||||||
|
addConsumer(1, 1, queueName2, null);
|
||||||
|
|
||||||
|
// at this point on node 0 there should be 0 messages in test queues and TEST_SIZE * 2 messages in the sf queue
|
||||||
Assert.assertEquals(0, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
|
Assert.assertEquals(0, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
|
||||||
Assert.assertEquals(0, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
|
Assert.assertEquals(0, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
|
||||||
Assert.assertEquals(TEST_SIZE * 2, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(sfQueueName))).getQueue()));
|
Assert.assertEquals(TEST_SIZE * 2, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(sfQueueName))).getQueue()));
|
||||||
|
|
||||||
|
removeConsumer(0);
|
||||||
|
removeConsumer(1);
|
||||||
|
|
||||||
// trigger scaleDown from node 0 to node 1
|
// trigger scaleDown from node 0 to node 1
|
||||||
servers[0].stop();
|
servers[0].stop();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue