diff --git a/tests/jms-tests/pom.xml b/tests/jms-tests/pom.xml
index 54563eca15..bf1878de03 100644
--- a/tests/jms-tests/pom.xml
+++ b/tests/jms-tests/pom.xml
@@ -64,6 +64,11 @@
artemis-server
${project.version}
+
+ org.apache.activemq
+ artemis-junit
+ ${project.version}
+
junit
junit
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java
index f4c99e7cd6..c2f0f588bc 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
+import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
@@ -2285,7 +2286,6 @@ public class MessageConsumerTest extends JMSTestCase {
}
}
- // This is commented out until http:// jira.jboss.com/jira/browse/JBMESSAGING-983 is complete
@Test
public void testStopConnectionDuringOnMessage() throws Exception {
if (log.isTraceEnabled()) {
@@ -2294,12 +2294,13 @@ public class MessageConsumerTest extends JMSTestCase {
final AtomicInteger messagesReceived = new AtomicInteger(0);
- MessageListener myListener = new MessageListener() {
- @Override
- public void onMessage(final Message message) {
- messagesReceived.incrementAndGet();
+ CountDownLatch messagesReceivedLatch = new CountDownLatch(1);
+
+ MessageListener myListener = message -> {
+ if (messagesReceived.incrementAndGet() == 10) {
+ messagesReceivedLatch.countDown();
try {
- Thread.sleep(100L);
+ Thread.sleep(200L);
} catch (InterruptedException e) {
// Ignore
}
@@ -2336,7 +2337,7 @@ public class MessageConsumerTest extends JMSTestCase {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
}
- Thread.sleep(500L);
+ messagesReceivedLatch.await(500, TimeUnit.MILLISECONDS);
log.trace("Stopping consumer connection");
consumerConnection.stop();
@@ -2349,17 +2350,12 @@ public class MessageConsumerTest extends JMSTestCase {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
}
- log.trace("Sleeping a bit to check that no messages are received");
- Thread.sleep(2000);
-
- ProxyAssertSupport.assertEquals("Should not receive any messages after the connection has been stopped", countAfterStop, messagesReceived.get());
+ ProxyAssertSupport.assertFalse("Should not receive any messages after the connection has been stopped", Wait.waitFor(() -> messagesReceived.get() > countAfterStop, 2000, 1000));
log.trace("Restarting consumer connection");
consumerConnection.start();
- log.trace("Sleeping to allow remaining messages to arrive");
- Thread.sleep(15000);
- ProxyAssertSupport.assertEquals("Should have received all messages after restarting", MESSAGE_COUNT, messagesReceived.get());
+ ProxyAssertSupport.assertTrue("Should have received all messages after restarting", Wait.waitFor(() -> messagesReceived.get() == MESSAGE_COUNT, 15000, 100));
} finally {
if (producerConnection != null) {
producerConnection.close();