AMQ-9153: Fix slow consumer advisory for queue subscriptions

Due to changes with Queues to check if consumers are full before adding
more messages to the subscription, the Queue dispatch logic needed to be
updated to mark subscriptions as slow and send advisories if configured
instead of relying on the subscription itself to do it.

(cherry picked from commit 596ee31687)
This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-01 08:19:22 -04:00
parent fb89765fdf
commit bc9e728123
2 changed files with 48 additions and 5 deletions

View File

@ -2210,6 +2210,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// no further dispatch of list to a full consumer to
// avoid out of order message receipt
fullConsumers.add(s);
//For full consumers we need to mark that they are slow and
// then call the broker.slowConsumer() hook if implemented
if (s instanceof PrefetchSubscription) {
final PrefetchSubscription sub = (PrefetchSubscription) s;
if (!sub.isSlowConsumer()) {
sub.setSlowConsumer(true);
broker.slowConsumer(sub.getContext(), this, sub);
}
}
LOG.trace("Subscription full {}", s);
}
}

View File

@ -28,6 +28,7 @@ import java.util.HashSet;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@ -110,17 +111,48 @@ public class AdvisoryTests {
}
@Test(timeout = 60000)
public void testSlowConsumerAdvisory() throws Exception {
public void testQueueSlowConsumerAdvisory() throws Exception {
testSlowConsumerAdvisory(new ActiveMQQueue(getClass().getName()));
}
@Test(timeout = 60000)
public void testTopicSlowConsumerAdvisory() throws Exception {
broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(500);
broker.getDestinationPolicy().getDefaultEntry().setPendingMessageLimitStrategy(null);
testSlowConsumerAdvisory(new ActiveMQTopic(getClass().getName()));
}
@Test(timeout = 60000)
public void testDurableSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createDurableSubscriber(topic, "sub1");
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) topic);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
MessageProducer producer = s.createProducer(topic);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
private void testSlowConsumerAdvisory(Destination dest) throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(dest);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) dest);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
@ -343,6 +375,7 @@ public class AdvisoryTests {
}
ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection();
connection.setClientID("clientId");
connection.start();
}