NO-JIRA Adding Redistribution & Loadbalancing test with AMQP

This commit is contained in:
Clebert Suconic 2018-06-22 14:27:20 -04:00
parent 576cf7e73d
commit 4f0bb98667
1 changed files with 56 additions and 0 deletions

View File

@ -367,6 +367,62 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
}
@Test
public void testRedistributeAfterLoadBalanced() throws Exception {
startServers(MessageLoadBalancingType.ON_DEMAND);
ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
Connection[] connection = new Connection[NUMBER_OF_SERVERS];
Session[] session = new Session[NUMBER_OF_SERVERS];
MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
// this will pre create consumers to make sure messages are distributed evenly without redistribution
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
factory[node] = getJmsConnectionFactory(node);
connection[node] = factory[node].createConnection();
session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
}
waitForBindings(0, "queues.0", 1, 1, true);
waitForBindings(1, "queues.0", 1, 1, true);
waitForBindings(0, "queues.0", 1, 1, false);
waitForBindings(1, "queues.0", 1, 1, false);
// sending Messages.. they should be load balanced
{
ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
StringBuffer stringbuffer = new StringBuffer();
stringbuffer.append("hello");
if (i % 3 == 0) {
// making 1/3 of the messages to be large message
for (int j = 0; j < 300 * 1024; j++) {
stringbuffer.append(" ");
}
}
pd.send(sn.createTextMessage(stringbuffer.toString()));
}
cn.close();
}
Wait.assertEquals(NUMBER_OF_MESSAGES / 2, servers[1].locateQueue(queueName)::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES / 2, servers[0].locateQueue(queueName)::getMessageCount);
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
connection[1].close();
// this wil be after redistribution
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
}
private void receiveMessages(Connection connection,
MessageConsumer messageConsumer,
int messageCount,