NO-JIRA: tweaks to address some sporadic test failures

This commit is contained in:
Robbie Gemmell 2020-09-23 17:19:55 +01:00
parent d7ba252eed
commit acdd8b4dce
2 changed files with 30 additions and 37 deletions

View File

@ -52,10 +52,10 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getQueueName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); Wait.assertEquals(1L, queueView::getMessageCount, 5000, 10);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
@ -124,10 +124,10 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getQueueName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); Wait.assertEquals(1L, queueView::getMessageCount, 5000, 10);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
@ -156,11 +156,10 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getQueueName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); Wait.assertEquals(1L, queueView::getMessageCount, 5000, 10);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(receive); assertNotNull(receive);
@ -187,10 +186,10 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getQueueName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount()); Wait.assertEquals(1L, queueView::getMessageCount, 5000, 10);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
@ -220,10 +219,10 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
sender.close(); sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName());
Queue queueView = getProxyToQueue(getQueueName()); Queue queueView = getProxyToQueue(getQueueName());
assertEquals(10, queueView.getMessageCount()); Wait.assertEquals(10L, queueView::getMessageCount, 5000, 10);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(10); receiver.flow(10);
for (int i = 9; i >= 0; --i) { for (int i = 9; i >= 0; --i) {
@ -234,7 +233,7 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
} }
receiver.close(); receiver.close();
Wait.assertEquals(0, queueView::getMessageCount); Wait.assertEquals(0L, queueView::getMessageCount, 5000, 10);
connection.close(); connection.close();
} }

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp.interop; package org.apache.activemq.artemis.tests.integration.amqp.interop;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
@ -45,7 +48,7 @@ public class AmqpCoreTest extends JMSClientTestSupport {
super.setUp(); super.setUp();
} }
@Test @Test(timeout = 60000)
public void testMultipleCoreReceiving() throws Exception { public void testMultipleCoreReceiving() throws Exception {
Connection coreJmsConn = this.createCoreConnection(); Connection coreJmsConn = this.createCoreConnection();
@ -71,11 +74,9 @@ public class AmqpCoreTest extends JMSClientTestSupport {
sendAmqpMessages("exampleQueueAddress", total); sendAmqpMessages("exampleQueueAddress", total);
assertTrue("not enough message received: " + handler1.getNumMsg() + " expected: " + total, handler1.waitForMessages(total)); handler1.assertMessagesReceived(total);
assertTrue("not enough message received: " + handler2.getNumMsg() + " expected: " + total, handler2.waitForMessages(total)); handler2.assertMessagesReceived(total);
assertTrue("not enough message received: " + handler3.getNumMsg() + " expected: " + total, handler3.waitForMessages(total)); handler3.assertMessagesReceived(total);
} finally { } finally {
coreJmsConn.close(); coreJmsConn.close();
} }
@ -99,8 +100,8 @@ public class AmqpCoreTest extends JMSClientTestSupport {
private class CoreMessageHandler implements MessageHandler { private class CoreMessageHandler implements MessageHandler {
int id; int id;
int numMsg = 0; AtomicInteger numMsg = new AtomicInteger();
volatile boolean zeroLen = false; AtomicBoolean zeroLen = new AtomicBoolean();
CoreMessageHandler(int id) { CoreMessageHandler(int id) {
this.id = id; this.id = id;
@ -111,23 +112,16 @@ public class AmqpCoreTest extends JMSClientTestSupport {
instanceLog.debug("received: " + message.getBodySize()); instanceLog.debug("received: " + message.getBodySize());
if (message.getBodySize() == 0) { if (message.getBodySize() == 0) {
instanceLog.debug("xxx found zero len message!"); instanceLog.debug("xxx found zero len message!");
zeroLen = true; zeroLen.set(true);
} }
addMessage(message);
instanceLog.debug("[receiver " + id + "] recieved: " + numMsg.incrementAndGet());
} }
private synchronized void addMessage(ClientMessage message) { public void assertMessagesReceived(int num) throws Exception {
numMsg++; Wait.assertEquals(num, numMsg::get, 30000);
instanceLog.debug("[receiver " + id + "] recieved: " + numMsg);
}
public synchronized boolean waitForMessages(int num) throws Exception { assertFalse(zeroLen.get());
assertFalse(zeroLen);
return Wait.waitFor(() -> numMsg == num, 30000);
}
public int getNumMsg() {
return numMsg;
} }
} }