This commit is contained in:
Clebert Suconic 2018-02-26 17:39:04 -05:00
commit 706abe8901
2 changed files with 15 additions and 14 deletions

View File

@ -64,6 +64,11 @@
<artifactId>artemis-server</artifactId> <artifactId>artemis-server</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-junit</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -2285,7 +2286,6 @@ public class MessageConsumerTest extends JMSTestCase {
} }
} }
// This is commented out until http:// jira.jboss.com/jira/browse/JBMESSAGING-983 is complete
@Test @Test
public void testStopConnectionDuringOnMessage() throws Exception { public void testStopConnectionDuringOnMessage() throws Exception {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
@ -2294,12 +2294,13 @@ public class MessageConsumerTest extends JMSTestCase {
final AtomicInteger messagesReceived = new AtomicInteger(0); final AtomicInteger messagesReceived = new AtomicInteger(0);
MessageListener myListener = new MessageListener() { CountDownLatch messagesReceivedLatch = new CountDownLatch(1);
@Override
public void onMessage(final Message message) { MessageListener myListener = message -> {
messagesReceived.incrementAndGet(); if (messagesReceived.incrementAndGet() == 10) {
messagesReceivedLatch.countDown();
try { try {
Thread.sleep(100L); Thread.sleep(200L);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Ignore // Ignore
} }
@ -2336,7 +2337,7 @@ public class MessageConsumerTest extends JMSTestCase {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i))); queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
} }
Thread.sleep(500L); messagesReceivedLatch.await(500, TimeUnit.MILLISECONDS);
log.trace("Stopping consumer connection"); log.trace("Stopping consumer connection");
consumerConnection.stop(); consumerConnection.stop();
@ -2349,17 +2350,12 @@ public class MessageConsumerTest extends JMSTestCase {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i))); queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
} }
log.trace("Sleeping a bit to check that no messages are received"); ProxyAssertSupport.assertFalse("Should not receive any messages after the connection has been stopped", Wait.waitFor(() -> messagesReceived.get() > countAfterStop, 2000, 1000));
Thread.sleep(2000);
ProxyAssertSupport.assertEquals("Should not receive any messages after the connection has been stopped", countAfterStop, messagesReceived.get());
log.trace("Restarting consumer connection"); log.trace("Restarting consumer connection");
consumerConnection.start(); consumerConnection.start();
log.trace("Sleeping to allow remaining messages to arrive"); ProxyAssertSupport.assertTrue("Should have received all messages after restarting", Wait.waitFor(() -> messagesReceived.get() == MESSAGE_COUNT, 15000, 100));
Thread.sleep(15000);
ProxyAssertSupport.assertEquals("Should have received all messages after restarting", MESSAGE_COUNT, messagesReceived.get());
} finally { } finally {
if (producerConnection != null) { if (producerConnection != null) {
producerConnection.close(); producerConnection.close();