Merge pull request #951 from cshannon/AMQ-9192

AMQ-9192 - Fix flaky AdvisoryTests
This commit is contained in:
Christopher L. Shannon 2023-01-10 15:29:10 -05:00 committed by GitHub
commit 362d28cddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 5 deletions

View File

@ -66,13 +66,14 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class AdvisoryTests { public class AdvisoryTests {
protected static final int MESSAGE_COUNT = 2000; protected static final int MESSAGE_COUNT = 100;
protected BrokerService broker; protected BrokerService broker;
protected Connection connection; protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected final boolean includeBodyForAdvisory; protected final boolean includeBodyForAdvisory;
protected final boolean persistent; protected final boolean persistent;
protected final int EXPIRE_MESSAGE_PERIOD = 3000; protected final int EXPIRE_MESSAGE_PERIOD = 3000;
protected final int DEFAULT_PREFETCH = 50;
@Parameters(name = "includeBodyForAdvisory={0}, persistent={1}") @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
@ -126,7 +127,7 @@ public class AdvisoryTests {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTopicSlowConsumerAdvisory() throws Exception { public void testTopicSlowConsumerAdvisory() throws Exception {
broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(500); broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(10);
broker.getDestinationPolicy().getDefaultEntry().setPendingMessageLimitStrategy(null); broker.getDestinationPolicy().getDefaultEntry().setPendingMessageLimitStrategy(null);
testSlowConsumerAdvisory(new ActiveMQTopic(getClass().getName())); testSlowConsumerAdvisory(new ActiveMQTopic(getClass().getName()));
} }
@ -597,8 +598,7 @@ public class AdvisoryTests {
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer // start throwing messages at the consumer
MessageProducer producer = s.createProducer(topic); MessageProducer producer = s.createProducer(topic);
int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); for (int i = 0; i < MESSAGE_COUNT; i++) {
for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]); m.writeBytes(new byte[1024]);
producer.send(m); producer.send(m);
@ -672,7 +672,11 @@ public class AdvisoryTests {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
try {
connection.close(); connection.close();
} catch (Exception e) {
//swallow exception so we can still stop the broker even on error
}
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
} }
@ -704,6 +708,10 @@ public class AdvisoryTests {
policy.setAdvisoryWhenFull(true); policy.setAdvisoryWhenFull(true);
policy.setIncludeBodyForAdvisory(includeBodyForAdvisory); policy.setIncludeBodyForAdvisory(includeBodyForAdvisory);
policy.setProducerFlowControl(false); policy.setProducerFlowControl(false);
policy.setDurableTopicPrefetch(DEFAULT_PREFETCH);
policy.setTopicPrefetch(DEFAULT_PREFETCH);
policy.setQueuePrefetch(DEFAULT_PREFETCH);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10); strategy.setLimit(10);
policy.setPendingMessageLimitStrategy(strategy); policy.setPendingMessageLimitStrategy(strategy);