diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 6a485fcc59..d7c9aa8fb3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -426,9 +426,10 @@ public class AdvisoryBroker extends BrokerFilter { super.messageExpired(context, messageReference, subscription); try { if (!messageReference.isAdvisory()) { - ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); + BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); + ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination()); Message payload = messageReference.getMessage().copy(); - if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + if (!baseDestination.isIncludeBodyForAdvisory()) { payload.clearBody(); } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); @@ -445,17 +446,15 @@ public class AdvisoryBroker extends BrokerFilter { super.messageConsumed(context, messageReference); try { if (!messageReference.isAdvisory()) { - ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); + BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); + ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination()); Message payload = messageReference.getMessage().copy(); - if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + if (!baseDestination.isIncludeBodyForAdvisory()) { payload.clearBody(); } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); - ActiveMQDestination destination = payload.getDestination(); - if (destination != null) { - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, destination.getQualifiedName()); - } + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); fireAdvisory(context, topic, payload, null, advisoryMessage); } } catch (Exception e) { @@ -468,17 +467,15 @@ public class AdvisoryBroker extends BrokerFilter { super.messageDelivered(context, messageReference); try { if (!messageReference.isAdvisory()) { - ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); + BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); + ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination()); Message payload = messageReference.getMessage().copy(); - if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + if (!baseDestination.isIncludeBodyForAdvisory()) { payload.clearBody(); } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); - ActiveMQDestination destination = payload.getDestination(); - if (destination != null) { - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, destination.getQualifiedName()); - } + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); fireAdvisory(context, topic, payload, null, advisoryMessage); } } catch (Exception e) { @@ -491,9 +488,10 @@ public class AdvisoryBroker extends BrokerFilter { super.messageDiscarded(context, sub, messageReference); try { if (!messageReference.isAdvisory()) { - ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); + BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); + ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination()); Message payload = messageReference.getMessage().copy(); - if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + if (!baseDestination.isIncludeBodyForAdvisory()) { payload.clearBody(); } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); @@ -502,10 +500,8 @@ public class AdvisoryBroker extends BrokerFilter { } advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); - ActiveMQDestination destination = payload.getDestination(); - if (destination != null) { - advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, destination.getQualifiedName()); - } + advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); + fireAdvisory(context, topic, payload, null, advisoryMessage); } } catch (Exception e) { @@ -716,9 +712,10 @@ public class AdvisoryBroker extends BrokerFilter { if (wasDLQd) { try { if (!messageReference.isAdvisory()) { - ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); + BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); + ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination()); Message payload = messageReference.getMessage().copy(); - if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + if (!baseDestination.isIncludeBodyForAdvisory()) { payload.clearBody(); } fireAdvisory(context, topic, payload); @@ -773,12 +770,6 @@ public class AdvisoryBroker extends BrokerFilter { } } - protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) { - Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination); - return (destination instanceof BaseDestination && - ((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false; - } - private void handleFireFailure(String message, Throwable cause) { LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); LOG.debug("{} detail: {}", message, cause); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 02c5fbe7e3..8c63c02fc9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -356,6 +356,8 @@ public class Topic extends BaseDestination implements Task { final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); + message.setRegionDestination(this); + // There is delay between the client sending it and it arriving at the // destination.. it may have expired. if (message.isExpired()) { @@ -494,7 +496,6 @@ public class Topic extends BaseDestination implements Task { synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); - message.setRegionDestination(this); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); Future result = null; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index cdd8d997bd..9acf7bfe73 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -44,9 +45,10 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -67,7 +69,7 @@ public class AdvisoryTests { protected final int EXPIRE_MESSAGE_PERIOD = 10000; - @Parameters + @Parameters(name = "includeBodyForAdvisory={0}") public static Collection data() { return Arrays.asList(new Object[][] { // Include the full body of the message @@ -293,6 +295,47 @@ public class AdvisoryTests { assertIncludeBodyForAdvisory(payload); } + @Test(timeout = 60000) + public void testMessageDeliveryVTAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic vt = new ActiveMQTopic("VirtualTopic.TEST"); + + ActiveMQQueue a = new ActiveMQQueue("Consumer.A.VirtualTopic.TEST"); + MessageConsumer consumer = s.createConsumer(a); + + ActiveMQQueue b = new ActiveMQQueue("Consumer.B.VirtualTopic.TEST"); + MessageConsumer consumerB = s.createConsumer(b); + + assertNotNull(consumer); + assertNotNull(consumerB); + + HashSet dests = new HashSet(); + dests.add(vt.getQualifiedName()); + dests.add(a.getQualifiedName()); + dests.add(b.getQualifiedName()); + + + Topic advisoryTopic = new ActiveMQTopic(AdvisorySupport.MESSAGE_DELIVERED_TOPIC_PREFIX + ">"); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + + // throw messages at the vt + MessageProducer producer = s.createProducer(vt); + + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + + Message msg = null; + while ((msg = advisoryConsumer.receive(1000)) != null) { + ActiveMQMessage message = (ActiveMQMessage) msg; + String dest = (String) message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION); + dests.remove(dest); + assertIncludeBodyForAdvisory((ActiveMQMessage) message.getDataStructure()); + } + + assertTrue("Got delivered for all: " + dests, dests.isEmpty()); + } + @Before public void setUp() throws Exception { if (broker == null) {