AMQ-9168 - Send message expired advisory and not discard advisory when

Topic subscriptions expire a message

This fixes topic subs to send the right advisory type, if enabled, when
the server discards a message on dispatch to a topic sub. Also add some
more expiration tests for other subscription types
This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-15 13:21:00 -05:00
parent 8554a1464c
commit 757a712890
3 changed files with 178 additions and 21 deletions

View File

@ -599,10 +599,13 @@ public class Topic extends BaseDestination implements Task {
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
if (sub instanceof DurableTopicSubscription) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(),
node.getMessageId(),
convertToNonRangedAck(ack, node));
}
}
messageConsumed(context, node);
}

View File

@ -187,7 +187,8 @@ public class TopicSubscription extends AbstractSubscription {
messagesToEvict = oldMessages.length;
for (int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage = oldMessages[i];
discard(oldMessage);
//Expired here is false as we are discarding due to the messageEvictingStrategy
discard(oldMessage, false);
}
}
// lets avoid an infinite loop if we are given a bad eviction strategy
@ -233,8 +234,7 @@ public class TopicSubscription extends AbstractSubscription {
matched.remove();
node.decrementReferenceCount();
if (broker.isExpired(node)) {
((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node, this);
((Destination) node.getRegionDestination()).messageExpired(getContext(), this, node);
}
break;
}
@ -654,7 +654,7 @@ public class TopicSubscription extends AbstractSubscription {
// Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message.
if (message.isExpired()) {
discard(message);
discard(message, true);
continue; // just drop it.
}
dispatch(message);
@ -739,19 +739,25 @@ public class TopicSubscription extends AbstractSubscription {
}
}
private void discard(MessageReference message) {
private void discard(MessageReference message, boolean expired) {
discarding = true;
try {
message.decrementReferenceCount();
matched.remove(message);
discarded.incrementAndGet();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}
LOG.debug("{}, discarding message {}", this, message);
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
dest.messageDiscarded(getContext(), this, message);
//If discard is due to expiration then use the messageExpired() callback
if (expired) {
LOG.debug("{}, expiring message {}", this, message);
dest.messageExpired(getContext(), this, message);
} else {
LOG.debug("{}, discarding message {}", this, message);
discarded.incrementAndGet();
dest.messageDiscarded(getContext(), this, message);
}
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
} finally {

View File

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -44,7 +45,11 @@ import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.NullMessageReference;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -52,6 +57,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageDispatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -70,7 +76,7 @@ public class AdvisoryTests {
protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected final boolean includeBodyForAdvisory;
protected final int EXPIRE_MESSAGE_PERIOD = 10000;
protected final int EXPIRE_MESSAGE_PERIOD = 3000;
@Parameters(name = "includeBodyForAdvisory={0}")
public static Collection<Object[]> data() {
@ -338,24 +344,143 @@ public class AdvisoryTests {
}
@Test(timeout = 60000)
public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
public void testMessageExpiredAdvisoryQueueSubClient() throws Exception {
testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName() + "client.timeout"),
300000, true, 500);
}
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
@Test(timeout = 60000)
public void testMessageExpiredAdvisoryQueueSubServer() throws Exception {
testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1,true, 500);
}
@Test(timeout = 60000)
public void testMessageExpiredAdvisoryQueueSubServerTask() throws Exception {
testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1000,false,
EXPIRE_MESSAGE_PERIOD * 2);
}
private void testMessageExpiredAdvisoryQueue(ActiveMQQueue dest, int ttl, boolean createConsumer, int receiveTimeout) throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1);
MessageProducer producer = s.createProducer(dest);
producer.setTimeToLive(ttl);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
MessageConsumer consumer = null;
if (createConsumer) {
consumer = s.createConsumer(dest);
assertNotNull(consumer);
}
Message msg = advisoryConsumer.receive(receiveTimeout);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
@Test(timeout = 60000)
public void testMessageExpiredAdvisoryTopicSub() throws Exception {
ActiveMQTopic dest = new ActiveMQTopic(getClass().getName());
//Set prefetch to 1 so acks will trigger expiration on dispatching more messages
broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(1);
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
MessageConsumer consumer = s.createConsumer(dest);
MessageConsumer expiredAdvisoryConsumer = s.createConsumer(AdvisorySupport.getExpiredMessageTopic(dest));
MessageConsumer discardedAdvisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDiscardedAdvisoryTopic(dest));
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
producer.setTimeToLive(10);
for (int i = 0; i < 10; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Thread.sleep(500);
//Receiving will trigger the server to discard on dispatch when acks are received
//Currently the advisory is only fired on dispatch from server or messages added to ta topic
//and not on expired acks from the client side as the original messages are not tracked in
//dispatch list so the advisory can't be fired
for (int i = 0; i < 10; i++) {
assertNull(consumer.receive(10));
}
//Should no longer receive discard advisories for expiration
assertNull(discardedAdvisoryConsumer.receive(1000));
Message msg = expiredAdvisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
@Test(timeout = 60000)
public void testMessageExpiredAdvisoryDurableClient() throws Exception {
testMessageExpiredDurableAdvisory(getClass().getName() + "client.timeout",
300000, true, 500);
}
@Test(timeout = 60000)
public void testMessageExpiredAdvisoryDurableServer() throws Exception {
testMessageExpiredDurableAdvisory(getClass().getName(), 1,true, 500);
}
@Test(timeout = 60000)
public void testMessageExpiredAdvisoryDurableServerTask() throws Exception {
testMessageExpiredDurableAdvisory(getClass().getName(), 2000,false, EXPIRE_MESSAGE_PERIOD * 2);
}
private void testMessageExpiredDurableAdvisory(String topic, int ttl, boolean bringDurableOnline,
int receiveTimeout) throws Exception {
ActiveMQTopic dest = new ActiveMQTopic(topic);
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
//create durable and send offline messages
MessageConsumer consumer = s.createDurableSubscriber(dest, "sub1");
consumer.close();
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
producer.setTimeToLive(ttl);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
//if flag is true then bring online to trigger expiration on dispatch
if (bringDurableOnline) {
consumer = s.createDurableSubscriber(dest, "sub1");
}
Message msg = advisoryConsumer.receive(receiveTimeout);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
@ -529,6 +654,29 @@ public class AdvisoryTests {
answer.addConnector("nio://localhost:0");
answer.addConnector("tcp://localhost:0").setName("OpenWire");
answer.setDeleteAllMessagesOnStartup(true);
// add a plugin to ensure the expiration happens on the client side rather
// than broker side.
answer.setPlugins(new BrokerPlugin[] { new BrokerPlugin() {
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new BrokerFilter(broker) {
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
ActiveMQDestination dest = messageDispatch.getDestination();
if (dest != null && !AdvisorySupport.isAdvisoryTopic(dest) && messageDispatch.getDestination()
.getPhysicalName().contains("client.timeout")) {
// Set the expiration to now
messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
}
super.preProcessDispatch(messageDispatch);
}
};
}
} });
}
protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {