NO-JIRA Making AmqpFlowControlFailTest more accurate

this test was relying on internal details such as number of credits on the link.
The test was flaky and eventually failing or hunging.
This commit is contained in:
Clebert Suconic 2022-04-05 18:18:27 -04:00 committed by clebertsuconic
parent e774e4fcfb
commit bad1c26582
2 changed files with 30 additions and 9 deletions

View File

@ -1846,13 +1846,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
}
volatile CountDownLatch inUseLatch;
@Override
public void stop() {
super.stop();
// this will do a best effort to stop the current latch.
// no big deal if it failed. this is just to optimize this component stop.
CountDownLatch latch = inUseLatch;
if (latch != null) {
latch.countDown();
}
}
@Override
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : iterableOf(getLocalQueues())) {
if (!isStarted()) {
break;
}
try {
CountDownLatch latch = new CountDownLatch(1);
this.inUseLatch = latch;
queue.expireReferences(latch::countDown);
// the idea is in fact to block the Reaper while the Queue is executing reaping.
// This would avoid another eventual expiry to be called if the period for reaping is too small

View File

@ -19,13 +19,13 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -77,10 +78,10 @@ public class AmqpFlowControlFailTest {
}
@Test(timeout = 60000)
@Test(timeout = 10_000)
public void testAddressFullDisposition() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes);
@ -95,6 +96,7 @@ public class AmqpFlowControlFailTest {
rejected = true;
assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), expectedMessage),
e.getMessage().contains(expectedMessage));
break;
}
}
@ -112,10 +114,10 @@ public class AmqpFlowControlFailTest {
AmqpFlowControlFailTest.configureAddressPolicy(server);
}
@Test(timeout = 60000)
@Test
public void testMesagesNotSent() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
AmqpConnection connection = client.connect();
int messagesSent = 0;
try {
AmqpSession session = connection.createSession();
@ -130,22 +132,22 @@ public class AmqpFlowControlFailTest {
messagesSent++;
} catch (IOException e) {
rejected = true;
break;
}
}
assertTrue(rejected);
rejected = false;
assertEquals(0, sender.getSender().getCredit());
AmqpSession session2 = connection.createSession();
AmqpReceiver receiver = session2.createReceiver(getQueueName());
receiver.flow(messagesSent);
for (int i = 0; i < messagesSent; i++) {
AmqpMessage receive = receiver.receive();
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(receive);
receive.accept();
}
receiver.close();
session2.close();
Wait.assertEquals(1000, sender.getSender()::getCredit);
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[100];
@ -154,10 +156,10 @@ public class AmqpFlowControlFailTest {
sender.send(message);
} catch (IOException e) {
rejected = true;
break;
}
}
assertTrue(rejected);
assertEquals(0, sender.getSender().getCredit());
} finally {
connection.close();
}