AMQ-9157 - Add a new optional advisory for dispatched messages

This commit is contained in:
Christopher L. Shannon (cshannon) 2022-10-31 18:27:03 -04:00 committed by Christopher L. Shannon
parent 2306a26aef
commit 76f612600d
15 changed files with 242 additions and 33 deletions

View File

@ -488,6 +488,27 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
super.messageDispatched(context, messageReference);
try {
if (!messageReference.isAdvisory()) {
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
ActiveMQTopic topic = AdvisorySupport.getMessageDispatchedAdvisoryTopic(baseDestination.getActiveMQDestination());
Message payload = messageReference.getMessage().copy();
if (!baseDestination.isIncludeBodyForAdvisory()) {
payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("delivered", e);
}
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
super.messageDiscarded(context, sub, messageReference);

View File

@ -353,6 +353,13 @@ public interface Broker extends Region, Service {
*/
void messageDelivered(ConnectionContext context, MessageReference messageReference);
/**
* Called when message is dispatched to a consumer
* @param context
* @param messageReference
*/
void messageDispatched(ConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics

View File

@ -351,6 +351,11 @@ public class BrokerFilter implements Broker {
getNext().messageDelivered(context, messageReference);
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
getNext().messageDispatched(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) {
getNext().messageDiscarded(context, sub, messageReference);

View File

@ -308,6 +308,11 @@ public class EmptyBroker implements Broker {
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
}

View File

@ -348,6 +348,11 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message);
}
@Override
public void messageDispatched(ConnectionContext context,MessageReference messageReference) {
throw new BrokerStoppedException(this.message);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
throw new BrokerStoppedException(this.message);

View File

@ -87,6 +87,7 @@ public abstract class BaseDestination implements Destination {
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean advisoryForDispatched;
private boolean sendAdvisoryIfNoConsumers;
private boolean sendDuplicateFromStoreToDLQ = false;
private boolean includeBodyForAdvisory;
@ -456,6 +457,14 @@ public abstract class BaseDestination implements Destination {
this.advisoryForConsumed = advisoryForConsumed;
}
public boolean isAdvisoryForDispatched() {
return advisoryForDispatched;
}
public void setAdvisoryForDispatched(boolean advisoryForDispatched) {
this.advisoryForDispatched = advisoryForDispatched;
}
/**
* @return the advisdoryForFastProducers
*/
@ -548,6 +557,13 @@ public abstract class BaseDestination implements Destination {
}
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
if (advisoryForDispatched) {
broker.messageDispatched(context, messageReference);
}
}
/**
* Called when a message is discarded - e.g. running low on memory This will
* happen only if the policy is enabled - e.g. non durable topics

View File

@ -190,6 +190,14 @@ public interface Destination extends Service, Task, Message.MessageDestination {
*/
void messageDelivered(ConnectionContext context, MessageReference messageReference);
/**
* Called when message is dispatched to a consumer
*
* @param context
* @param messageReference
*/
void messageDispatched(ConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory This will
* happen only if the policy is enabled - e.g. non durable topics

View File

@ -324,6 +324,11 @@ public class DestinationFilter implements Destination {
next.messageDelivered(context, messageReference);
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
next.messageDispatched(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
next.messageDiscarded(context, sub, messageReference);

View File

@ -760,6 +760,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
incrementPrefetchCounter(node);
nodeDest.messageDispatched(context, node);
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}",
info.getConsumerId(), message.getMessageId(), message.getDestination(),
getSubscriptionStatistics().getDispatched().getCount(), dispatched.size());

View File

@ -701,6 +701,7 @@ public class TopicSubscription extends AbstractSubscription {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
regionDestination.messageDispatched(context, node);
node.decrementReferenceCount();
}
@ -722,6 +723,7 @@ public class TopicSubscription extends AbstractSubscription {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
regionDestination.getDestinationStatistics().getInflight().increment();
regionDestination.messageDispatched(context, node);
node.decrementReferenceCount();
}
}

View File

@ -84,6 +84,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean advisoryForDispatched;
private boolean includeBodyForAdvisory;
private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
@ -275,6 +276,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("advisoryForConsumed", includedProperties)) {
destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
}
if (isUpdate("advisoryForDispatched", includedProperties)) {
destination.setAdvisoryForDispatched(isAdvisoryForDispatched());
}
if (isUpdate("advisoryForDelivery", includedProperties)) {
destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
}
@ -848,6 +852,14 @@ public class PolicyEntry extends DestinationMapEntry {
this.advisoryForConsumed = advisoryForConsumed;
}
public boolean isAdvisoryForDispatched() {
return advisoryForDispatched;
}
public void setAdvisoryForDispatched(boolean advisoryForDispatched) {
this.advisoryForDispatched = advisoryForDispatched;
}
/**
* @return the advisdoryForFastProducers
*/

View File

@ -526,10 +526,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
String msg = "Unable to display message.";
msg = messageReference.getMessage().toString();
String msg = messageReference.getMessage().toString();
LOG.info("Message consumed: {}", msg);
}
super.messageConsumed(context, messageReference);
@ -538,22 +535,25 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
String msg = "Unable to display message.";
msg = messageReference.getMessage().toString();
String msg = messageReference.getMessage().toString();
LOG.info("Message delivered: {}", msg);
}
super.messageDelivered(context, messageReference);
}
@Override
public void messageDispatched(ConnectionContext context, MessageReference messageReference) {
if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
String msg = messageReference.getMessage().toString();
LOG.info("Message dispatched: {}", msg);
}
super.messageDispatched(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
msg = messageReference.getMessage().toString();
String msg = messageReference.getMessage().toString();
LOG.info("Message discarded: {}", msg);
}
super.messageDiscarded(context, sub, messageReference);

View File

@ -53,6 +53,7 @@ public final class AdvisorySupport {
public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
public static final String MESSAGE_DISPATCHED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDispatched.";
public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
@ -247,6 +248,12 @@ public final class AdvisorySupport {
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getMessageDispatchedAdvisoryTopic(ActiveMQDestination destination) {
String name = MESSAGE_DISPATCHED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
}

View File

@ -663,7 +663,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
30, true, true, true, true, true, true, true, true, true);
setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
100, true, true);
@ -676,7 +676,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
//validate config
assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
30, true, true, true,true, true, true, true, true, true);
assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100,
100, true, true);
@ -684,7 +684,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
300, false, false, false,false, false, false, false, false, false);
setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000,
1000, false, false);
@ -693,14 +693,14 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
300, false, false, false,false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000,
1000, false, false);
//check new dest
assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
300, false, false, false, false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000,
1000, false, false);
}
@ -714,7 +714,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
30, true, true, true, true, true, true, true, true, true);
setAllTopicPolicyProperties(entry, 10000, true);
policyMap.setPolicyEntries(Arrays.asList(entry));
@ -726,14 +726,14 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
//validate config
assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
30, true, true, true, true, true, true, true, true, true);
assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);
//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
300, false, false, false, false, false, false, false, false, false);
setAllTopicPolicyProperties(entry, 100000, false);
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
@ -741,13 +741,13 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
300, false, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);
//check new dest
assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
300, false, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
}
@ -827,6 +827,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
properties.add("optimizeMessageStoreInFlightLimit");
properties.add("advisoryForConsumed");
properties.add("advisoryForDelivery");
properties.add("advisoryForDispatched");
properties.add("advisoryForDiscardingMessages");
properties.add("advisoryForSlowConsumers");
properties.add("advisoryForFastProducers");
@ -864,8 +865,9 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, boolean advisoryForFastProducers,
boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
boolean sendAdvisoryIfNoConsumers) {
entry.setProducerFlowControl(producerFlowControl);
entry.setAlwaysRetroactive(alwaysRetroactive);
@ -884,6 +886,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
entry.setOptimizeMessageStoreInFlightLimit(optimizeMessageStoreInFlightLimit);
entry.setAdvisoryForConsumed(advisoryForConsumed);
entry.setAdvisoryForDelivery(advisoryForDelivery);
entry.setAdvisoryForDispatched(advisoryForDispatched);
entry.setAdvisoryForDiscardingMessages(advisoryForDiscardingMessages);
entry.setAdvisoryForSlowConsumers(advisoryForSlowConsumers);
entry.setAdvisoryForFastProducers(advisoryForFastProducers);
@ -920,8 +923,9 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, boolean advisoryForFastProducers,
boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
boolean sendAdvisoryIfNoConsumers) {
assertEquals(producerFlowControl, dest.isProducerFlowControl());
@ -941,6 +945,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
assertEquals(optimizeMessageStoreInFlightLimit, dest.getOptimizeMessageStoreInFlightLimit());
assertEquals(advisoryForConsumed, dest.isAdvisoryForConsumed());
assertEquals(advisoryForDelivery, dest.isAdvisoryForDelivery());
assertEquals(advisoryForDispatched, dest.isAdvisoryForDispatched());
assertEquals(advisoryForDiscardingMessages, dest.isAdvisoryForDiscardingMessages());
assertEquals(advisoryForSlowConsumers, dest.isAdvisoryForSlowConsumers());
assertEquals(advisoryForFastProducers, dest.isAdvisoryForFastProducers());

View File

@ -23,8 +23,10 @@ import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -34,7 +36,9 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
@ -65,11 +69,9 @@ public class AdvisoryTests {
protected BrokerService broker;
protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount;
protected final boolean includeBodyForAdvisory;
protected final int EXPIRE_MESSAGE_PERIOD = 10000;
@Parameters(name = "includeBodyForAdvisory={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@ -163,13 +165,55 @@ public class AdvisoryTests {
}
@Test(timeout = 60000)
public void testMessageDeliveryAdvisory() throws Exception {
public void testQueueMessageDeliveryAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic);
}
@Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic);
}
@Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic);
}
@Test(timeout = 60000)
public void testTopicMessageDeliveryAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDeliveredAdvisoryTopic);
}
@Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisory() throws Exception {
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic);
}
@Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), AdvisorySupport::getMessageDispatchedAdvisoryTopic);
}
@Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisory() throws Exception {
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic);
}
@Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic);
}
@Test(timeout = 60000)
public void testQueueBrowserDispatchedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
Topic advisoryTopic = AdvisorySupport.getMessageDispatchedAdvisoryTopic(
(ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
@ -178,6 +222,41 @@ public class AdvisoryTests {
m.writeBytes(new byte[1024]);
producer.send(m);
QueueBrowser browser = s.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
//Should have 1 message to browser
assertTrue(enumeration.hasMoreElements());
assertNotNull(enumeration.nextElement());
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should always be tcp:// because that is the transport that is used to connect even though
//the nio transport is the first one in the list
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), ((ActiveMQDestination) queue).getQualifiedName());
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(dest);
assertNotNull(consumer);
Topic advisoryTopic = advisoryTopicSupplier.apply(dest);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
@ -186,7 +265,37 @@ public class AdvisoryTests {
//This should always be tcp:// because that is the transport that is used to connect even though
//the nio transport is the first one in the list
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), ((ActiveMQDestination) queue).getQualifiedName());
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), dest.getQualifiedName());
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
private void testDurableSubscriptionAdvisory(Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createDurableSubscriber(topic, "sub");
assertNotNull(consumer);
Topic advisoryTopic = advisoryTopicSupplier.apply((ActiveMQDestination) topic);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(topic);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//This should always be tcp:// because that is the transport that is used to connect even though
//the nio transport is the first one in the list
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), ((ActiveMQDestination) topic).getQualifiedName());
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
@ -405,6 +514,7 @@ public class AdvisoryTests {
policy.setAdvisoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDispatched(true);
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);