NO-JIRA Fixing some Wait clauses that are too short

Using a small amount of wait on the Wait clause
was leading to a few test intermittent failing
as part of CI
most likely because of major GC events.
This commit is contained in:
Clebert Suconic 2020-04-16 11:34:10 -04:00
parent d9aacec8bb
commit 41f62b9de2
3 changed files with 93 additions and 93 deletions

View File

@ -78,11 +78,11 @@ public class RetroactiveAddressFailoverTest extends FailoverTestBase {
producer.send(message); producer.send(message);
} }
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> live.locateQueue(divertQueue).getMessageCount() == OFFSET, 3000, 50); org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> live.locateQueue(divertQueue).getMessageCount() == OFFSET);
crash(session); crash(session);
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(divertQueue).getMessageCount() == OFFSET, 3000, 50); org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(divertQueue).getMessageCount() == OFFSET);
for (int j = OFFSET; j < MESSAGE_COUNT + OFFSET; j++) { for (int j = OFFSET; j < MESSAGE_COUNT + OFFSET; j++) {
ClientMessage message = session.createMessage(true); ClientMessage message = session.createMessage(true);
@ -90,11 +90,11 @@ public class RetroactiveAddressFailoverTest extends FailoverTestBase {
producer.send(message); producer.send(message);
} }
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT, 3000, 50); org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT);
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST)); session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(queueName) != null); org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(queueName) != null);
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(queueName).getMessageCount() == MESSAGE_COUNT, 500, 50); org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> backup.locateQueue(queueName).getMessageCount() == MESSAGE_COUNT);
ClientConsumer consumer = session.createConsumer(queueName); ClientConsumer consumer = session.createConsumer(queueName);
for (int j = OFFSET; j < MESSAGE_COUNT + OFFSET; j++) { for (int j = OFFSET; j < MESSAGE_COUNT + OFFSET; j++) {

View File

@ -153,12 +153,12 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
producer.close(); producer.close();
final int finalI = i; final int finalI = i;
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessagesReplaced() == (COUNT * finalI), 3000, 50); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessagesReplaced() == (COUNT * finalI));
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == COUNT, 3000, 50); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == COUNT);
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST)); session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(queueName) != null); Wait.assertTrue(() -> server.locateQueue(queueName) != null);
Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == COUNT, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == COUNT);
ClientConsumer consumer = session.createConsumer(queueName); ClientConsumer consumer = session.createConsumer(queueName);
for (int j = 0; j < COUNT; j++) { for (int j = 0; j < COUNT; j++) {
session.start(); session.start();
@ -186,12 +186,12 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.getBodyBuffer().writeString(data + "1"); message.getBodyBuffer().writeString(data + "1");
producer.send(message); producer.send(message);
producer.close(); producer.close();
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1);
server.stop(); server.stop();
server.start(); server.start();
assertNotNull(server.locateQueue(divertMulticastQueue)); assertNotNull(server.locateQueue(divertMulticastQueue));
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(10));
locator = createInVMNonHALocator(); locator = createInVMNonHALocator();
sf = createSessionFactory(locator); sf = createSessionFactory(locator);
@ -205,7 +205,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName).setRoutingType(RoutingType.ANYCAST)); session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(queueName1) != null); Wait.assertTrue(() -> server.locateQueue(queueName1) != null);
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 2, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 2);
ClientConsumer consumer = session.createConsumer(queueName1); ClientConsumer consumer = session.createConsumer(queueName1);
session.start(); session.start();
@ -218,9 +218,9 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
assertEquals(data + "2", message.getBodyBuffer().readString()); assertEquals(data + "2", message.getBodyBuffer().readString());
consumer.close(); consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 2, 2000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 2);
} }
@Test @Test
@ -231,16 +231,16 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST); final SimpleString divertMulticastQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName)); server.addAddressInfo(new AddressInfo(addressName));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT);
server.stop(); server.stop();
server.start(); server.start();
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT * 2)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT * 2));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT * 2, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT * 2);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT * 2, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT * 2);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT);
} }
@Test @Test
@ -258,11 +258,11 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.setRoutingType(RoutingType.MULTICAST); message.setRoutingType(RoutingType.MULTICAST);
producer.send(message); producer.send(message);
producer.close(); producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 1);
session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName)); session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName));
Wait.assertTrue(() -> server.locateQueue(queueName1) != null); Wait.assertTrue(() -> server.locateQueue(queueName1) != null);
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 1);
ClientConsumer consumer = session.createConsumer(queueName1); ClientConsumer consumer = session.createConsumer(queueName1);
session.start(); session.start();
@ -271,9 +271,9 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
assertEquals(data, message.getBodyBuffer().readString()); assertEquals(data, message.getBodyBuffer().readString());
consumer.close(); consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 1);
} }
@Test @Test
@ -296,7 +296,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
producer.send(m); producer.send(m);
} }
producer.close(); producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == COUNT, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == COUNT);
MessageConsumer consumer = s.createConsumer(t); MessageConsumer consumer = s.createConsumer(t);
c.start(); c.start();
@ -317,11 +317,11 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName)); server.addAddressInfo(new AddressInfo(addressName));
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT * 2)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT * 2));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT * 2, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT * 2);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT * 2, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT * 2);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT)); server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getRingSize() == COUNT);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT, 1000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getRingSize() == COUNT);
} }
@Test @Test
@ -340,25 +340,25 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.getBodyBuffer().writeString(data + RoutingType.MULTICAST.toString()); message.getBodyBuffer().writeString(data + RoutingType.MULTICAST.toString());
message.setRoutingType(RoutingType.MULTICAST); message.setRoutingType(RoutingType.MULTICAST);
producer.send(message); producer.send(message);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1);
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 0);
message = session.createMessage(false); message = session.createMessage(false);
message.getBodyBuffer().writeString(data + RoutingType.ANYCAST.toString()); message.getBodyBuffer().writeString(data + RoutingType.ANYCAST.toString());
message.setRoutingType(RoutingType.ANYCAST); message.setRoutingType(RoutingType.ANYCAST);
producer.send(message); producer.send(message);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1);
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 1);
producer.close(); producer.close();
session.createQueue(new QueueConfiguration(multicastQueue).setAddress(addressName)); session.createQueue(new QueueConfiguration(multicastQueue).setAddress(addressName));
Wait.assertTrue(() -> server.locateQueue(multicastQueue) != null); Wait.assertTrue(() -> server.locateQueue(multicastQueue) != null);
Wait.assertTrue(() -> server.locateQueue(multicastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(multicastQueue).getMessageCount() == 1);
session.createQueue(new QueueConfiguration(anycastQueue).setAddress(addressName).setRoutingType(RoutingType.ANYCAST)); session.createQueue(new QueueConfiguration(anycastQueue).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(anycastQueue) != null); Wait.assertTrue(() -> server.locateQueue(anycastQueue) != null);
Wait.assertTrue(() -> server.locateQueue(anycastQueue).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(anycastQueue).getMessageCount() == 1);
ClientConsumer consumer = session.createConsumer(multicastQueue); ClientConsumer consumer = session.createConsumer(multicastQueue);
session.start(); session.start();
@ -367,8 +367,8 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
assertEquals(data + RoutingType.MULTICAST.toString(), message.getBodyBuffer().readString()); assertEquals(data + RoutingType.MULTICAST.toString(), message.getBodyBuffer().readString());
consumer.close(); consumer.close();
Wait.assertTrue(() -> server.locateQueue(multicastQueue).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(multicastQueue).getMessageCount() == 0);
Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> server.locateQueue(divertMulticastQueue).getMessageCount() == 1);
consumer.close(); consumer.close();
@ -379,8 +379,8 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
assertEquals(data + RoutingType.ANYCAST.toString(), message.getBodyBuffer().readString()); assertEquals(data + RoutingType.ANYCAST.toString(), message.getBodyBuffer().readString());
consumer.close(); consumer.close();
Wait.assertTrue(() -> server.locateQueue(anycastQueue).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(anycastQueue).getMessageCount() == 0);
Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> server.locateQueue(divertAnycastQueue).getMessageCount() == 1);
} }
@Test @Test
@ -399,11 +399,11 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.putLongProperty("xxx", 15); message.putLongProperty("xxx", 15);
producer.send(message); producer.send(message);
producer.close(); producer.close();
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 2, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 2);
server.createQueue(new QueueConfiguration(queueName1).setAddress(addressName).setFilterString("xxx > 10").setDurable(false)); server.createQueue(new QueueConfiguration(queueName1).setAddress(addressName).setFilterString("xxx > 10").setDurable(false));
Wait.assertTrue(() -> server.locateQueue(queueName1) != null); Wait.assertTrue(() -> server.locateQueue(queueName1) != null);
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 1, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 1);
ClientConsumer consumer = session.createConsumer(queueName1); ClientConsumer consumer = session.createConsumer(queueName1);
session.start(); session.start();
@ -412,8 +412,8 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
assertEquals(15, (long) message.getLongProperty("xxx")); assertEquals(15, (long) message.getLongProperty("xxx"));
consumer.close(); consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName1).getMessageCount() == 0);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 2, 2000, 100); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == 2);
} }
@Test @Test
@ -454,13 +454,13 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
producer.send(message); producer.send(message);
} }
producer.close(); producer.close();
Wait.assertTrue(() -> server.locateQueue(randomQueueName).getMessageCount() == MESSAGE_COUNT * 2, 500, 50); Wait.assertTrue(() -> server.locateQueue(randomQueueName).getMessageCount() == MESSAGE_COUNT * 2);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT, 500, 50); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT);
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName)); session.createQueue(new QueueConfiguration(queueName).setAddress(addressName));
Wait.assertTrue(() -> server.locateQueue(queueName) != null); Wait.assertTrue(() -> server.locateQueue(queueName) != null);
Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == MESSAGE_COUNT, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == MESSAGE_COUNT);
ClientConsumer consumer = session.createConsumer(queueName); ClientConsumer consumer = session.createConsumer(queueName);
session.start(); session.start();
@ -471,7 +471,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
} }
consumer.close(); consumer.close();
Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == 0, 500, 50); Wait.assertTrue(() -> server.locateQueue(queueName).getMessageCount() == 0);
Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT, 2000, 100); Wait.assertTrue(() -> server.locateQueue(divertQueue).getMessageCount() == MESSAGE_COUNT);
} }
} }

View File

@ -67,12 +67,12 @@ public class RingQueueTest extends ActiveMQTestBase {
for (int i = 0, j = 0; i < 500; i += 2, j++) { for (int i = 0, j = 0; i < 500; i += 2, j++) {
ClientMessage m0 = createTextMessage(clientSession, "hello" + i); ClientMessage m0 = createTextMessage(clientSession, "hello" + i);
producer.send(m0); producer.send(m0);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
ClientMessage m1 = createTextMessage(clientSession, "hello" + (i + 1)); ClientMessage m1 = createTextMessage(clientSession, "hello" + (i + 1));
producer.send(m1); producer.send(m1);
int expectedMessagesReplaced = j + 1; int expectedMessagesReplaced = j + 1;
Wait.assertTrue(() -> queue.getMessagesReplaced() == expectedMessagesReplaced, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == expectedMessagesReplaced);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
ClientConsumer consumer = clientSession.createConsumer(qName); ClientConsumer consumer = clientSession.createConsumer(qName);
ClientMessage message = consumer.receiveImmediate(); ClientMessage message = consumer.receiveImmediate();
message.acknowledge(); message.acknowledge();
@ -96,39 +96,39 @@ public class RingQueueTest extends ActiveMQTestBase {
ClientMessage m0 = createTextMessage(clientSession, "hello0"); ClientMessage m0 = createTextMessage(clientSession, "hello0");
producer.send(m0); producer.send(m0);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
ClientConsumer consumer = clientSession.createConsumer(qName); ClientConsumer consumer = clientSession.createConsumer(qName);
ClientMessage message = consumer.receiveImmediate(); ClientMessage message = consumer.receiveImmediate();
assertNotNull(message); assertNotNull(message);
Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 1);
message.acknowledge(); message.acknowledge();
assertEquals("hello0", message.getBodyBuffer().readString()); assertEquals("hello0", message.getBodyBuffer().readString());
ClientMessage m1 = createTextMessage(clientSession, "hello1"); ClientMessage m1 = createTextMessage(clientSession, "hello1");
producer.send(m1); producer.send(m1);
Wait.assertTrue(() -> queue.getDeliveringCount() == 2, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 2);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 0, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 0);
Wait.assertTrue(() -> queue.getMessageCount() == 2, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 2);
clientSession.rollback(); clientSession.rollback();
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
consumer = clientSession.createConsumer(qName); consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate(); message = consumer.receiveImmediate();
assertNotNull(message); assertNotNull(message);
Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 1);
message.acknowledge(); message.acknowledge();
clientSession.commit(); clientSession.commit();
Wait.assertTrue(() -> queue.getMessagesAcknowledged() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessagesAcknowledged() == 1);
assertEquals("hello1", message.getBodyBuffer().readString()); assertEquals("hello1", message.getBodyBuffer().readString());
} }
@ -149,24 +149,24 @@ public class RingQueueTest extends ActiveMQTestBase {
producer.send(message); producer.send(message);
message = createTextMessage(clientSession, "hello1"); message = createTextMessage(clientSession, "hello1");
producer.send(message); producer.send(message);
Wait.assertTrue(() -> queue.getMessageCount() == 2, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 2);
Wait.assertTrue(() -> queue.getDeliveringCount() == 2, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 2);
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
consumer = clientSession.createConsumer(qName); consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate(); message = consumer.receiveImmediate();
assertNotNull(message); assertNotNull(message);
Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 1);
message.acknowledge(); message.acknowledge();
clientSession.commit(); clientSession.commit();
Wait.assertTrue(() -> queue.getMessagesAcknowledged() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessagesAcknowledged() == 1);
assertEquals("hello1", message.getBodyBuffer().readString()); assertEquals("hello1", message.getBodyBuffer().readString());
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 0, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 0);
Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
} }
@Test @Test
@ -186,16 +186,16 @@ public class RingQueueTest extends ActiveMQTestBase {
time += 500; time += 500;
m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
producer.send(m0); producer.send(m0);
Wait.assertTrue(() -> queue.getScheduledCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getScheduledCount() == 1);
Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0, 2000, 100); Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0);
time = System.currentTimeMillis(); time = System.currentTimeMillis();
time += 500; time += 500;
m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
producer.send(m0); producer.send(m0);
Wait.assertTrue(() -> queue.getScheduledCount() == 2, 2000, 100); Wait.assertTrue(() -> queue.getScheduledCount() == 2);
Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0, 2000, 100); Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 5000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 1, 3000, 100); Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 1);
} }
@Test @Test
@ -231,7 +231,7 @@ public class RingQueueTest extends ActiveMQTestBase {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
producer.send(clientSession.createMessage(true)); producer.send(clientSession.createMessage(true));
} }
Wait.assertTrue(() -> queue.getMessageCount() == 100, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 100);
queue.setRingSize(10); queue.setRingSize(10);
@ -242,19 +242,19 @@ public class RingQueueTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
} }
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 5, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 5);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(clientSession.createMessage(true)); producer.send(clientSession.createMessage(true));
} }
Wait.assertTrue(() -> queue.getMessageCount() == 10, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 10);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 5, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 5);
consumer = clientSession.createConsumer(qName); consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate(); message = consumer.receiveImmediate();
assertNotNull(message); assertNotNull(message);
message.acknowledge(); message.acknowledge();
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 9, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 9);
queue.setRingSize(5); queue.setRingSize(5);
@ -264,19 +264,19 @@ public class RingQueueTest extends ActiveMQTestBase {
message.acknowledge(); message.acknowledge();
} }
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 5, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 5);
producer.send(clientSession.createMessage(true)); producer.send(clientSession.createMessage(true));
Wait.assertTrue(() -> queue.getMessagesReplaced() == 6, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 6);
queue.setRingSize(10); queue.setRingSize(10);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
producer.send(clientSession.createMessage(true)); producer.send(clientSession.createMessage(true));
} }
Wait.assertTrue(() -> queue.getMessageCount() == 10, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 10);
producer.send(clientSession.createMessage(true)); producer.send(clientSession.createMessage(true));
Wait.assertTrue(() -> queue.getMessagesReplaced() == 7, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == 7);
Wait.assertTrue(() -> queue.getMessageCount() == 10, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 10);
} }
@Test @Test
@ -294,12 +294,12 @@ public class RingQueueTest extends ActiveMQTestBase {
ClientMessage message = createTextMessage(clientSession, "hello" + 0); ClientMessage message = createTextMessage(clientSession, "hello" + 0);
producer.send(message); producer.send(message);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
message = createTextMessage(clientSession, "hello" + (i + 1)); message = createTextMessage(clientSession, "hello" + (i + 1));
producer.send(message); producer.send(message);
final int finalI = i + 1; final int finalI = i + 1;
Wait.assertTrue(() -> queue.getMessagesReplaced() == finalI, 2000, 100); Wait.assertTrue(() -> queue.getMessagesReplaced() == finalI);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
ClientConsumer consumer = clientSession.createConsumer(qName); ClientConsumer consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate(); message = consumer.receiveImmediate();
assertNotNull(message); assertNotNull(message);
@ -323,12 +323,12 @@ public class RingQueueTest extends ActiveMQTestBase {
ClientMessage m0 = createTextMessage(clientSession, "hello" + 0); ClientMessage m0 = createTextMessage(clientSession, "hello" + 0);
producer.send(m0); producer.send(m0);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
ClientConsumer consumer = clientSession.createConsumer(qName); ClientConsumer consumer = clientSession.createConsumer(qName);
Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 1);
consumer.close(); consumer.close();
Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100); Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1);
} }
@Test @Test
@ -364,7 +364,7 @@ public class RingQueueTest extends ActiveMQTestBase {
fail(e.getMessage()); fail(e.getMessage());
} }
Wait.assertTrue("message count should be " + RING_SIZE + " but it's actually " + queue.getMessageCount(), () -> queue.getMessageCount() == RING_SIZE, 2000, 100); Wait.assertTrue("message count should be " + RING_SIZE + " but it's actually " + queue.getMessageCount(), () -> queue.getMessageCount() == RING_SIZE);
} }
@Override @Override