AMQ-9157 - Include consumer id as part of Dispatched advisory

This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-11 13:53:47 -05:00
parent 39da75abdb
commit c140d73fec
12 changed files with 27 additions and 17 deletions

View File

@ -489,8 +489,8 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
super.messageDispatched(context, messageReference);
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
super.messageDispatched(context, sub, messageReference);
try {
if (!messageReference.isAdvisory()) {
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
@ -502,6 +502,9 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
if (sub.getConsumerInfo() != null) {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
}
fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {

View File

@ -356,9 +356,10 @@ public interface Broker extends Region, Service {
/**
* Called when message is dispatched to a consumer
* @param context
* @param sub
* @param messageReference
*/
void messageDispatched(ConnectionContext context, MessageReference messageReference);
void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory

View File

@ -352,8 +352,8 @@ public class BrokerFilter implements Broker {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
getNext().messageDispatched(context, messageReference);
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
getNext().messageDispatched(context, sub, messageReference);
}
@Override

View File

@ -309,7 +309,7 @@ public class EmptyBroker implements Broker {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
}

View File

@ -349,7 +349,7 @@ public class ErrorBroker implements Broker {
}
@Override
public void messageDispatched(ConnectionContext context,MessageReference messageReference) {
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}

View File

@ -558,9 +558,9 @@ public abstract class BaseDestination implements Destination {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
if (advisoryForDispatched) {
broker.messageDispatched(context, messageReference);
broker.messageDispatched(context, sub, messageReference);
}
}

View File

@ -194,9 +194,10 @@ public interface Destination extends Service, Task, Message.MessageDestination {
* Called when message is dispatched to a consumer
*
* @param context
* @param sub
* @param messageReference
*/
void messageDispatched(ConnectionContext context, MessageReference messageReference);
void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory This will

View File

@ -325,8 +325,8 @@ public class DestinationFilter implements Destination {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
next.messageDispatched(context, messageReference);
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
next.messageDispatched(context, sub, messageReference);
}
@Override

View File

@ -760,7 +760,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
incrementPrefetchCounter(node);
nodeDest.messageDispatched(context, node);
nodeDest.messageDispatched(context, this, node);
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}",
info.getConsumerId(), message.getMessageId(), message.getDestination(),
getSubscriptionStatistics().getDispatched().getCount(), dispatched.size());

View File

@ -702,7 +702,7 @@ public class TopicSubscription extends AbstractSubscription {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
regionDestination.messageDispatched(context, node);
regionDestination.messageDispatched(context, TopicSubscription.this, node);
node.decrementReferenceCount();
}
@ -724,7 +724,7 @@ public class TopicSubscription extends AbstractSubscription {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
regionDestination.messageDispatched(context, node);
regionDestination.messageDispatched(context, this, node);
node.decrementReferenceCount();
}
}

View File

@ -542,12 +542,12 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
String msg = messageReference.getMessage().toString();
LOG.info("Message dispatched: {}", msg);
}
super.messageDispatched(context, messageReference);
super.messageDispatched(context, sub, messageReference);
}
@Override

View File

@ -271,6 +271,11 @@ public class AdvisoryTests {
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), dest.getQualifiedName());
//Make sure consumer id exists if dispatched advisory
if (AdvisorySupport.isMessageDispatchedAdvisoryTopic(advisoryTopic)) {
assertNotNull(message.getStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID));
}
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);