diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index 6e1cfa4409..66d3c353c3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -67,7 +67,6 @@ import static org.apache.activemq.artemis.utils.collections.IterableStream.itera public class ScaleDownTest extends ClusterTestBase { private static final String AMQP_ACCEPTOR_URI = "tcp://127.0.0.1:5672"; - private static final int RECEIVE_TIMEOUT_MILLIS = 1000; private boolean useScaleDownGroupName; @@ -135,7 +134,7 @@ public class ScaleDownTest extends ClusterTestBase { // consume a message from queue 2 addConsumer(1, 0, queueName2, null, false); - ClientMessage clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage clientMessage = consumers[1].getConsumer().receive(1000); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); consumers[1].getSession().commit(); @@ -150,26 +149,26 @@ public class ScaleDownTest extends ClusterTestBase { // get the 2 messages from queue 1 addConsumer(0, 1, queueName1, null); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(1000); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(1000); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receiveImmediate(); Assert.assertNull(clientMessage); removeConsumer(0); // get the 1 message from queue 2 addConsumer(0, 1, queueName2, null); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(1000); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receiveImmediate(); Assert.assertNull(clientMessage); removeConsumer(0); } @@ -318,23 +317,23 @@ public class ScaleDownTest extends ClusterTestBase { // get the messages from node 1 addConsumer(0, 1, queueName1, null); for (int i = 0; i < TEST_SIZE; i++) { - ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } - ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage clientMessage = consumers[0].getConsumer().receiveImmediate(); Assert.assertNull(clientMessage); removeConsumer(0); addConsumer(0, 1, queueName2, null); for (int i = 0; i < TEST_SIZE; i++) { - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receiveImmediate(); Assert.assertNull(clientMessage); removeConsumer(0); } @@ -356,7 +355,7 @@ public class ScaleDownTest extends ClusterTestBase { // consume a message from node 0 addConsumer(1, 0, queueName2, null, false); - ClientMessage clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage clientMessage = consumers[1].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); consumers[1].getSession().commit(); @@ -366,26 +365,26 @@ public class ScaleDownTest extends ClusterTestBase { // get the 2 messages from queue 1 addConsumer(0, 1, queueName1, null); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receiveImmediate(); Assert.assertNull(clientMessage); removeConsumer(0); // get the 1 message from queue 2 addConsumer(0, 1, queueName2, null); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // ensure there are no more messages on queue 1 - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receiveImmediate(); Assert.assertNull(clientMessage); removeConsumer(0); } @@ -408,7 +407,7 @@ public class ScaleDownTest extends ClusterTestBase { Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST); // get the 1 message from queue 2 addConsumer(0, 1, queueName, null); - ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); @@ -449,7 +448,7 @@ public class ScaleDownTest extends ClusterTestBase { Assert.assertEquals(((QueueImpl)((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName))).getBindable()).getRoutingType(), RoutingType.ANYCAST); // get the 1 message from queue 2 addConsumer(0, 1, queueName, null); - ClientMessage clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } @@ -483,13 +482,13 @@ public class ScaleDownTest extends ClusterTestBase { // get the 1 message from queue 1 addConsumer(0, 1, queueName1, null); - clientMessage = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); // get the 1 message from queue 2 addConsumer(1, 1, queueName2, null); - clientMessage = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + clientMessage = consumers[1].getConsumer().receive(250); Assert.assertNotNull(clientMessage); clientMessage.acknowledge(); } @@ -541,7 +540,7 @@ public class ScaleDownTest extends ClusterTestBase { session.start(); for (int i = 0; i < 5; i++) { - ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage msg = consumer.receive(250); byte[] body = new byte[msg.getBodySize()]; msg.getBodyBuffer().readBytes(body); Assert.assertTrue(new String(body).contains("Bob the giant pig " + i)); @@ -595,7 +594,7 @@ public class ScaleDownTest extends ClusterTestBase { session.start(); for (int nmsg = 0; nmsg < 10; nmsg++) { - ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT_MILLIS); + ClientMessage msg = consumer.receive(250); Assert.assertNotNull(msg); @@ -642,10 +641,10 @@ public class ScaleDownTest extends ClusterTestBase { addConsumer(0, 1, queueName, null); for (int i = 0; i < messageCount; i++) { - Assert.assertNotNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); + Assert.assertNotNull(consumers[0].getConsumer().receive(250)); } - Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); + Assert.assertNull(consumers[0].getConsumer().receiveImmediate()); removeConsumer(0); } @@ -681,10 +680,10 @@ public class ScaleDownTest extends ClusterTestBase { addConsumer(0, 1, queueName, null); for (int i = 0; i < messageCount; i++) { - Assert.assertEquals(i, consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS).getIntProperty("order").intValue()); + Assert.assertEquals(i, consumers[0].getConsumer().receive(250).getIntProperty("order").intValue()); } - Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); + Assert.assertNull(consumers[0].getConsumer().receiveImmediate()); removeConsumer(0); } @@ -722,17 +721,17 @@ public class ScaleDownTest extends ClusterTestBase { String compare; ClientMessage message; if (i % 2 == 0) { - message = consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + message = consumers[0].getConsumer().receive(250); compare = "0"; } else { - message = consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS); + message = consumers[1].getConsumer().receive(250); compare = "1"; } Assert.assertEquals(compare, message.getStringProperty(ClusterTestBase.FILTER_PROP)); } - Assert.assertNull(consumers[0].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); - Assert.assertNull(consumers[1].getConsumer().receive(RECEIVE_TIMEOUT_MILLIS)); + Assert.assertNull(consumers[0].getConsumer().receiveImmediate()); + Assert.assertNull(consumers[1].getConsumer().receiveImmediate()); removeConsumer(0); removeConsumer(1); }