From edacc2a8404d1a460fb08edd979285961802c0ac Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Mon, 15 Jun 2015 17:38:49 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5843 Adding a new property on PolicyEntry called includeBodyForAdvisory which will include the original message body when sending advisory messages that include the original message, instead of clearing it out. This is turned off by default. --- .../activemq/advisory/AdvisoryBroker.java | 27 ++++++-- .../broker/region/BaseDestination.java | 9 +++ .../broker/region/policy/PolicyEntry.java | 22 +++++++ .../activemq/advisory/AdvisoryTests.java | 63 ++++++++++++++++++- 4 files changed, 115 insertions(+), 6 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 39cd2fe303..7a6915d77e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.MessageReference; @@ -350,7 +351,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); fireAdvisory(context, topic, payload, null, advisoryMessage); @@ -367,7 +370,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); ActiveMQDestination destination = payload.getDestination(); @@ -388,7 +393,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); ActiveMQDestination destination = payload.getDestination(); @@ -409,7 +416,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } ActiveMQMessage advisoryMessage = new ActiveMQMessage(); if (sub instanceof TopicSubscription) { advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); @@ -498,7 +507,9 @@ public class AdvisoryBroker extends BrokerFilter { if (!messageReference.isAdvisory()) { ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); Message payload = messageReference.getMessage().copy(); - payload.clearBody(); + if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) { + payload.clearBody(); + } fireAdvisory(context, topic, payload); } } catch (Exception e) { @@ -551,6 +562,12 @@ public class AdvisoryBroker extends BrokerFilter { } } + protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) { + Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination); + return (destination instanceof BaseDestination && + ((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false; + } + private void handleFireFailure(String message, Throwable cause) { LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); LOG.debug("{} detail: {}", message, cause); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5d51b24300..da6ca4182e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -84,6 +84,7 @@ public abstract class BaseDestination implements Destination { private boolean advisoryForDelivery; private boolean advisoryForConsumed; private boolean sendAdvisoryIfNoConsumers; + private boolean includeBodyForAdvisory; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final BrokerService brokerService; protected final Broker regionBroker; @@ -466,6 +467,14 @@ public abstract class BaseDestination implements Destination { this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } + public boolean isIncludeBodyForAdvisory() { + return includeBodyForAdvisory; + } + + public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { + this.includeBodyForAdvisory = includeBodyForAdvisory; + } + /** * @return the dead letter strategy */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 26cfa6b156..2e8c2a7f9e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -81,6 +81,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean advisoryWhenFull; private boolean advisoryForDelivery; private boolean advisoryForConsumed; + private boolean includeBodyForAdvisory; private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; @@ -200,6 +201,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); + destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); } @@ -740,6 +742,26 @@ public class PolicyEntry extends DestinationMapEntry { this.advisoryForFastProducers = advisoryForFastProducers; } + /** + * Returns true if the original message body should be included when applicable + * for advisory messages + * + * @return + */ + public boolean isIncludeBodyForAdvisory() { + return includeBodyForAdvisory; + } + + /** + * Sets if the original message body should be included when applicable + * for advisory messages + * + * @param includeBodyForAdvisory + */ + public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { + this.includeBodyForAdvisory = includeBodyForAdvisory; + } + public void setMaxExpirePageSize(int maxExpirePageSize) { this.maxExpirePageSize = maxExpirePageSize; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 1ad1ef4070..ce072aa5fd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.util.Arrays; +import java.util.Collection; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -44,10 +47,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Test for advisory messages sent under the right circumstances. */ +@RunWith(Parameterized.class) public class AdvisoryTests { protected static final int MESSAGE_COUNT = 2000; @@ -55,9 +62,25 @@ public class AdvisoryTests { protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected int topicCount; - + protected final boolean includeBodyForAdvisory; protected final int EXPIRE_MESSAGE_PERIOD = 10000; + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + // Include the full body of the message + {true}, + // Don't include the full body of the message + {false} + }); + } + + public AdvisoryTests(boolean includeBodyForAdvisory) { + super(); + this.includeBodyForAdvisory = includeBodyForAdvisory; + } + @Test(timeout = 60000) public void testNoSlowConsumerAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -122,6 +145,11 @@ public class AdvisoryTests { Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Test(timeout = 60000) @@ -149,6 +177,9 @@ public class AdvisoryTests { ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); String originalId = payload.getJMSMessageID(); assertEquals(originalId, id); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Test(timeout = 60000) @@ -171,6 +202,11 @@ public class AdvisoryTests { Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD); assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Test(timeout = 60000) @@ -185,14 +221,24 @@ public class AdvisoryTests { for (int i = 0; i < 100; i++) { s.createConsumer(advisoryTopic); } + MessageConsumer advisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic((ActiveMQDestination) topic)); MessageProducer producer = s.createProducer(topic); int count = 10; for (int i = 0; i < count; i++) { BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); producer.send(m); } + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for DLQ advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); + // we should get here without StackOverflow } @@ -211,11 +257,17 @@ public class AdvisoryTests { int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); for (int i = 0; i < count; i++) { BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); producer.send(m); } Message msg = advisoryConsumer.receive(1000); assertNotNull(msg); + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + //Add assertion to make sure body is included for advisory topics + //when includeBodyForAdvisory is true + assertIncludeBodyForAdvisory(payload); } @Before @@ -258,6 +310,7 @@ public class AdvisoryTests { policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryWhenFull(true); + policy.setIncludeBodyForAdvisory(includeBodyForAdvisory); policy.setProducerFlowControl(false); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); @@ -269,4 +322,12 @@ public class AdvisoryTests { answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); } + + protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) { + if (includeBodyForAdvisory) { + assertNotNull(payload.getContent()); + } else { + assertNull(payload.getContent()); + } + } }