mirror of https://github.com/apache/activemq.git
AMQ-9157 - Dispatched advisory should not fire for queue browsers
The dispatched advisory doesn't really make sense to send for queue browsers, just like we don't send a consumed advisory, as it's more of an admin type funtion to look at the contents of a queue but it's not a real consumer that is receiving and acking messages.
This commit is contained in:
parent
133c7025c8
commit
85502a526d
|
@ -492,7 +492,8 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
|
public void messageDispatched(ConnectionContext context, Subscription sub, MessageReference messageReference) {
|
||||||
super.messageDispatched(context, sub, messageReference);
|
super.messageDispatched(context, sub, messageReference);
|
||||||
try {
|
try {
|
||||||
if (!messageReference.isAdvisory()) {
|
//Don't dispatch for queue browsers
|
||||||
|
if (!messageReference.isAdvisory() && !sub.getConsumerInfo().isBrowser()) {
|
||||||
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
||||||
ActiveMQTopic topic = AdvisorySupport.getMessageDispatchedAdvisoryTopic(baseDestination.getActiveMQDestination());
|
ActiveMQTopic topic = AdvisorySupport.getMessageDispatchedAdvisoryTopic(baseDestination.getActiveMQDestination());
|
||||||
Message payload = messageReference.getMessage().copy();
|
Message payload = messageReference.getMessage().copy();
|
||||||
|
|
|
@ -208,7 +208,7 @@ public class AdvisoryTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testQueueBrowserDispatchedAdvisory() throws Exception {
|
public void testQueueBrowserDispatchedAdvisoryNotSent() throws Exception {
|
||||||
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
Queue queue = s.createQueue(getClass().getName());
|
Queue queue = s.createQueue(getClass().getName());
|
||||||
|
|
||||||
|
@ -232,19 +232,9 @@ public class AdvisoryTests {
|
||||||
assertTrue(enumeration.hasMoreElements());
|
assertTrue(enumeration.hasMoreElements());
|
||||||
assertNotNull(enumeration.nextElement());
|
assertNotNull(enumeration.nextElement());
|
||||||
|
|
||||||
|
//We should not be sending an advisory for dispatching to a browser
|
||||||
Message msg = advisoryConsumer.receive(1000);
|
Message msg = advisoryConsumer.receive(1000);
|
||||||
assertNotNull(msg);
|
assertNull(msg);
|
||||||
ActiveMQMessage message = (ActiveMQMessage) msg;
|
|
||||||
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
|
|
||||||
|
|
||||||
//This should always be tcp:// because that is the transport that is used to connect even though
|
|
||||||
//the nio transport is the first one in the list
|
|
||||||
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
|
|
||||||
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), ((ActiveMQDestination) queue).getQualifiedName());
|
|
||||||
|
|
||||||
//Add assertion to make sure body is included for advisory topics
|
|
||||||
//when includeBodyForAdvisory is true
|
|
||||||
assertIncludeBodyForAdvisory(payload);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception {
|
private void testMessageConsumerAdvisory(ActiveMQDestination dest, Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue