no jira - only delay on expected messages and no wait on null. speed up test - thanks rgemmell

This commit is contained in:
gtully 2021-06-03 15:17:48 +01:00
parent 3f67d93592
commit f4f31df97b
1 changed files with 30 additions and 31 deletions

View File

@ -67,7 +67,6 @@ import static org.apache.activemq.artemis.utils.collections.IterableStream.itera
public class ScaleDownTest extends ClusterTestBase { public class ScaleDownTest extends ClusterTestBase {
private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672"; private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672";
private static final int RECEIVE_TIMEOUT_MILLIS = 1000;
private boolean useScaleDownGroupName; private boolean useScaleDownGroupName;
@ -135,7 +134,7 @@ public class ScaleDownTest extends ClusterTestBase {
// consume a message from queue 2 // consume a message from queue 2
addConsumer(1, 0, queueName2, null, false); addConsumer(1, 0, queueName2, null, false);
ClientMessage clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage clientMessage = consumers[1].getConsumer().receive(1000);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
consumers[1].getSession().commit(); consumers[1].getSession().commit();
@ -150,26 +149,26 @@ public class ScaleDownTest extends ClusterTestBase {
// get the 2 messages from queue 1 // get the 2 messages from queue 1
addConsumer(0, 1, queueName1, null); addConsumer(0, 1, queueName1, null);
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(1000);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(1000);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
// ensure there are no more messages on queue 1 // ensure there are no more messages on queue 1
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receiveImmediate();
Assert.assertNull(clientMessage); Assert.assertNull(clientMessage);
removeConsumer(0); removeConsumer(0);
// get the 1 message from queue 2 // get the 1 message from queue 2
addConsumer(0, 1, queueName2, null); addConsumer(0, 1, queueName2, null);
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(1000);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
// ensure there are no more messages on queue 1 // ensure there are no more messages on queue 1
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receiveImmediate();
Assert.assertNull(clientMessage); Assert.assertNull(clientMessage);
removeConsumer(0); removeConsumer(0);
} }
@ -318,23 +317,23 @@ public class ScaleDownTest extends ClusterTestBase {
// get the messages from node 1 // get the messages from node 1
addConsumer(0, 1, queueName1, null); addConsumer(0, 1, queueName1, null);
for (int i = 0; i < TEST_SIZE; i++) { for (int i = 0; i < TEST_SIZE; i++) {
ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
} }
ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage clientMessage = consumers[0].getConsumer().receiveImmediate();
Assert.assertNull(clientMessage); Assert.assertNull(clientMessage);
removeConsumer(0); removeConsumer(0);
addConsumer(0, 1, queueName2, null); addConsumer(0, 1, queueName2, null);
for (int i = 0; i < TEST_SIZE; i++) { for (int i = 0; i < TEST_SIZE; i++) {
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
} }
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receiveImmediate();
Assert.assertNull(clientMessage); Assert.assertNull(clientMessage);
removeConsumer(0); removeConsumer(0);
} }
@ -356,7 +355,7 @@ public class ScaleDownTest extends ClusterTestBase {
// consume a message from node 0 // consume a message from node 0
addConsumer(1, 0, queueName2, null, false); addConsumer(1, 0, queueName2, null, false);
ClientMessage clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
consumers[1].getSession().commit(); consumers[1].getSession().commit();
@ -366,26 +365,26 @@ public class ScaleDownTest extends ClusterTestBase {
// get the 2 messages from queue 1 // get the 2 messages from queue 1
addConsumer(0, 1, queueName1, null); addConsumer(0, 1, queueName1, null);
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
// ensure there are no more messages on queue 1 // ensure there are no more messages on queue 1
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receiveImmediate();
Assert.assertNull(clientMessage); Assert.assertNull(clientMessage);
removeConsumer(0); removeConsumer(0);
// get the 1 message from queue 2 // get the 1 message from queue 2
addConsumer(0, 1, queueName2, null); addConsumer(0, 1, queueName2, null);
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
// ensure there are no more messages on queue 1 // ensure there are no more messages on queue 1
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receiveImmediate();
Assert.assertNull(clientMessage); Assert.assertNull(clientMessage);
removeConsumer(0); removeConsumer(0);
} }
@ -408,7 +407,7 @@ public class ScaleDownTest extends ClusterTestBase {
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST); Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
// get the 1 message from queue 2 // get the 1 message from queue 2
addConsumer(0, 1, queueName, null); addConsumer(0, 1, queueName, null);
ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
@ -449,7 +448,7 @@ public class ScaleDownTest extends ClusterTestBase {
Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST); Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST);
// get the 1 message from queue 2 // get the 1 message from queue 2
addConsumer(0, 1, queueName, null); addConsumer(0, 1, queueName, null);
ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
} }
@ -483,13 +482,13 @@ public class ScaleDownTest extends ClusterTestBase {
// get the 1 message from queue 1 // get the 1 message from queue 1
addConsumer(0, 1, queueName1, null); addConsumer(0, 1, queueName1, null);
clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
// get the 1 message from queue 2 // get the 1 message from queue 2
addConsumer(1, 1, queueName2, null); addConsumer(1, 1, queueName2, null);
clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); clientMessage = consumers[1].getConsumer().receive(250);
Assert.assertNotNull(clientMessage); Assert.assertNotNull(clientMessage);
clientMessage.acknowledge(); clientMessage.acknowledge();
} }
@ -541,7 +540,7 @@ public class ScaleDownTest extends ClusterTestBase {
session.start(); session.start();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage msg = consumer.receive(250);
byte[] body = new byte[msg.getBodySize()]; byte[] body = new byte[msg.getBodySize()];
msg.getBodyBuffer().readBytes(body); msg.getBodyBuffer().readBytes(body);
Assert.assertTrue(new String(body).contains("Bob the giant pig " + i)); Assert.assertTrue(new String(body).contains("Bob the giant pig " + i));
@ -595,7 +594,7 @@ public class ScaleDownTest extends ClusterTestBase {
session.start(); session.start();
for (int nmsg = 0; nmsg < 10; nmsg++) { for (int nmsg = 0; nmsg < 10; nmsg++) {
ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT_MILLIS); ClientMessage msg = consumer.receive(250);
Assert.assertNotNull(msg); Assert.assertNotNull(msg);
@ -642,10 +641,10 @@ public class ScaleDownTest extends ClusterTestBase {
addConsumer(0, 1, queueName, null); addConsumer(0, 1, queueName, null);
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
Assert.assertNotNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); Assert.assertNotNull(consumers[0].getConsumer().receive(250));
} }
Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); Assert.assertNull(consumers[0].getConsumer().receiveImmediate());
removeConsumer(0); removeConsumer(0);
} }
@ -681,10 +680,10 @@ public class ScaleDownTest extends ClusterTestBase {
addConsumer(0, 1, queueName, null); addConsumer(0, 1, queueName, null);
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
Assert.assertEquals(i, consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS).getIntProperty("order").intValue()); Assert.assertEquals(i, consumers[0].getConsumer().receive(250).getIntProperty("order").intValue());
} }
Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); Assert.assertNull(consumers[0].getConsumer().receiveImmediate());
removeConsumer(0); removeConsumer(0);
} }
@ -722,17 +721,17 @@ public class ScaleDownTest extends ClusterTestBase {
String compare; String compare;
ClientMessage message; ClientMessage message;
if (i % 2 == 0) { if (i % 2 == 0) {
message = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); message = consumers[0].getConsumer().receive(250);
compare = "0"; compare = "0";
} else { } else {
message = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); message = consumers[1].getConsumer().receive(250);
compare = "1"; compare = "1";
} }
Assert.assertEquals(compare, message.getStringProperty(ClusterTestBase.FILTER_PROP)); Assert.assertEquals(compare, message.getStringProperty(ClusterTestBase.FILTER_PROP));
} }
Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); Assert.assertNull(consumers[0].getConsumer().receiveImmediate());
Assert.assertNull(consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); Assert.assertNull(consumers[1].getConsumer().receiveImmediate());
removeConsumer(0); removeConsumer(0);
removeConsumer(1); removeConsumer(1);
} }