diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 39cd2fe303..7a6915d77e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.MessageReference; @@ -350,7 +351,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); fireAdvisory(context, topic, payload, null, advisoryMessage); @@ -367,7 +370,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); ActiveMQDestination destination = payload.getDestination(); @@ -388,7 +393,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); ActiveMQDestination destination = payload.getDestination(); @@ -409,7 +416,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); if (sub instanceof TopicSubscription) { advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); @@ -498,7 +507,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } fireAdvisory(context, topic, payload); } } catch (Exception e) { @@ -551,6 +562,12 @@ public class AdvisoryBroker extends BrokerFilter { } } + protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) { + Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination); + return (destination instanceof BaseDestination && + ((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false; + } + private void handleFireFailure(String message, Throwable cause) { LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); LOG.debug("{} detail: {}", message, cause); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5d51b24300..da6ca4182e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -84,6 +84,7 @@ public abstract class BaseDestination implements Destination { private boolean advisoryForDelivery; private boolean advisoryForConsumed; private boolean sendAdvisoryIfNoConsumers; + private boolean includeBodyForAdvisory; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final BrokerService brokerService; protected final Broker regionBroker; @@ -466,6 +467,14 @@ public abstract class BaseDestination implements Destination { this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } + public boolean isIncludeBodyForAdvisory() { + return includeBodyForAdvisory; + } + + public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { + this.includeBodyForAdvisory = includeBodyForAdvisory; + } + /** * @return the dead letter strategy */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 26cfa6b156..2e8c2a7f9e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -81,6 +81,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean advisoryWhenFull; private boolean advisoryForDelivery; private boolean advisoryForConsumed; + private boolean includeBodyForAdvisory; private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; @@ -200,6 +201,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); + destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); } @@ -740,6 +742,26 @@ public class PolicyEntry extends DestinationMapEntry { this.advisoryForFastProducers = advisoryForFastProducers; } + /** + * Returns true if the original message body should be included when applicable + * for advisory messages + * + * @return + */ + public boolean isIncludeBodyForAdvisory() { + return includeBodyForAdvisory; + } + + /** + * Sets if the original message body should be included when applicable + * for advisory messages + * + * @param includeBodyForAdvisory + */ + public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { + this.includeBodyForAdvisory = includeBodyForAdvisory; + } + public void setMaxExpirePageSize(int maxExpirePageSize) { this.maxExpirePageSize = maxExpirePageSize; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 1ad1ef4070..ce072aa5fd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.util.Arrays; +import java.util.Collection; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -44,10 +47,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Test for advisory messages sent under the right circumstances. */ +@RunWith(Parameterized.class) public class AdvisoryTests { protected static final int MESSAGE_COUNT = 2000; @@ -55,9 +62,25 @@ public class AdvisoryTests { protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected int topicCount; - + protected final boolean includeBodyForAdvisory; protected final int EXPIRE_MESSAGE_PERIOD = 10000; + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + // Include the full body of the message + {true}, + // Don't include the full body of the message + {false} + }); + } + + public AdvisoryTests(boolean includeBodyForAdvisory) { + super(); + this.includeBodyForAdvisory = includeBodyForAdvisory; + } + @Test(timeout = 60000) public void testNoSlowConsumerAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -122,6 +145,11 @@ public class AdvisoryTests { Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Test(timeout = 60000) @@ -149,6 +177,9 @@ public class AdvisoryTests { ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); String originalId = payload.getJMSMessageID(); assertEquals(originalId, id); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Test(timeout = 60000) @@ -171,6 +202,11 @@ public class AdvisoryTests { Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD); assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Test(timeout = 60000) @@ -185,14 +221,24 @@ public class AdvisoryTests { for (int i = 0; i < 100; i++) { s.createConsumer(advisoryTopic); } + MessageConsumer advisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic((ActiveMQDestination) topic)); MessageProducer producer = s.createProducer(topic); int count = 10; for (int i = 0; i < count; i++) { BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); producer.send(m); } + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for DLQ advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); + // we should get here without StackOverflow } @@ -211,11 +257,17 @@ public class AdvisoryTests { int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); for (int i = 0; i < count; i++) { BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); producer.send(m); } Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Before @@ -258,6 +310,7 @@ public class AdvisoryTests { policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryWhenFull(true); + policy.setIncludeBodyForAdvisory(includeBodyForAdvisory); policy.setProducerFlowControl(false); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); @@ -269,4 +322,12 @@ public class AdvisoryTests { answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); } + + protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) { + if (includeBodyForAdvisory) { + assertNotNull(payload.getContent()); + } else { + assertNull(payload.getContent()); + } + } }