diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index c77a1c4c79..6e1cfa4409 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -67,6 +67,7 @@ import static org.apache.activemq.artemis.utils.collections.IterableStream.itera public class ScaleDownTest extends ClusterTestBase { private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672"; + private static final int RECEIVE_TIMEOUT_MILLIS = 1000; private boolean useScaleDownGroupName; @@ -134,7 +135,7 @@ public class ScaleDownTest extends ClusterTestBase { // consume a message from queue 2 addConsumer(1, 0, queueName2, null, false); - ClientMessage clientMessage = consumers[1].getConsumer().receive(250); + ClientMessage clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); consumers[1].getSession().commit(); @@ -149,26 +150,26 @@ public class ScaleDownTest extends ClusterTestBase { // get the 2 messages from queue 1 addConsumer(0, 1, queueName1, null); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNull(clientMessage); removeConsumer(0); // get the 1 message from queue 2 addConsumer(0, 1, queueName2, null); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNull(clientMessage); removeConsumer(0); } @@ -317,23 +318,23 @@ public class ScaleDownTest extends ClusterTestBase { // get the messages from node 1 addConsumer(0, 1, queueName1, null); for (int i = 0; i < TEST_SIZE; i++) { - ClientMessage clientMessage = consumers[0].getConsumer().receive(250); + ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } - ClientMessage clientMessage = consumers[0].getConsumer().receive(250); + ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNull(clientMessage); removeConsumer(0); addConsumer(0, 1, queueName2, null); for (int i = 0; i < TEST_SIZE; i++) { - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNull(clientMessage); removeConsumer(0); } @@ -355,7 +356,7 @@ public class ScaleDownTest extends ClusterTestBase { // consume a message from node 0 addConsumer(1, 0, queueName2, null, false); - ClientMessage clientMessage = consumers[1].getConsumer().receive(250); + ClientMessage clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); consumers[1].getSession().commit(); @@ -365,26 +366,26 @@ public class ScaleDownTest extends ClusterTestBase { // get the 2 messages from queue 1 addConsumer(0, 1, queueName1, null); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNull(clientMessage); removeConsumer(0); // get the 1 message from queue 2 addConsumer(0, 1, queueName2, null); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNull(clientMessage); removeConsumer(0); } @@ -407,7 +408,7 @@ public class ScaleDownTest extends ClusterTestBase { Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST); // get the 1 message from queue 2 addConsumer(0, 1, queueName, null); - ClientMessage clientMessage = consumers[0].getConsumer().receive(250); + ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); @@ -448,7 +449,7 @@ public class ScaleDownTest extends ClusterTestBase { Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST); // get the 1 message from queue 2 addConsumer(0, 1, queueName, null); - ClientMessage clientMessage = consumers[0].getConsumer().receive(250); + ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } @@ -482,13 +483,13 @@ public class ScaleDownTest extends ClusterTestBase { // get the 1 message from queue 1 addConsumer(0, 1, queueName1, null); - clientMessage = consumers[0].getConsumer().receive(250); + clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // get the 1 message from queue 2 addConsumer(1, 1, queueName2, null); - clientMessage = consumers[1].getConsumer().receive(250); + clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } @@ -540,7 +541,7 @@ public class ScaleDownTest extends ClusterTestBase { session.start(); for (int i = 0; i < 5; i++) { - ClientMessage msg = consumer.receive(250); + ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT_MILLIS); byte[] body = new byte[msg.getBodySize()]; msg.getBodyBuffer().readBytes(body); Assert.assertTrue(new String(body).contains("Bob the giant pig " + i)); @@ -594,7 +595,7 @@ public class ScaleDownTest extends ClusterTestBase { session.start(); for (int nmsg = 0; nmsg < 10; nmsg++) { - ClientMessage msg = consumer.receive(250); + ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT_MILLIS); Assert.assertNotNull(msg); @@ -641,10 +642,10 @@ public class ScaleDownTest extends ClusterTestBase { addConsumer(0, 1, queueName, null); for (int i = 0; i < messageCount; i++) { - Assert.assertNotNull(consumers[0].getConsumer().receive(250)); + Assert.assertNotNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); } - Assert.assertNull(consumers[0].getConsumer().receive(250)); + Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); removeConsumer(0); } @@ -680,10 +681,10 @@ public class ScaleDownTest extends ClusterTestBase { addConsumer(0, 1, queueName, null); for (int i = 0; i < messageCount; i++) { - Assert.assertEquals(i, consumers[0].getConsumer().receive(250).getIntProperty("order").intValue()); + Assert.assertEquals(i, consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS).getIntProperty("order").intValue()); } - Assert.assertNull(consumers[0].getConsumer().receive(250)); + Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); removeConsumer(0); } @@ -721,17 +722,17 @@ public class ScaleDownTest extends ClusterTestBase { String compare; ClientMessage message; if (i % 2 == 0) { - message = consumers[0].getConsumer().receive(250); + message = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); compare = "0"; } else { - message = consumers[1].getConsumer().receive(250); + message = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); compare = "1"; } Assert.assertEquals(compare, message.getStringProperty(ClusterTestBase.FILTER_PROP)); } - Assert.assertNull(consumers[0].getConsumer().receive(250)); - Assert.assertNull(consumers[1].getConsumer().receive(250)); + Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); + Assert.assertNull(consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); removeConsumer(0); removeConsumer(1); }