Adding a new property on PolicyEntry called includeBodyForAdvisory which will
include the original message body when sending advisory messages that include
the original message, instead of clearing it out.  This is turned off by
default.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-06-15 17:38:49 +00:00 committed by Timothy Bish
parent 540d8c7079
commit edacc2a840
4 changed files with 115 additions and 6 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -350,7 +351,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) { if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy(); Message payload = messageReference.getMessage().copy();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody(); payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
fireAdvisory(context, topic, payload, null, advisoryMessage); fireAdvisory(context, topic, payload, null, advisoryMessage);
@ -367,7 +370,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) { if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy(); Message payload = messageReference.getMessage().copy();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody(); payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
ActiveMQDestination destination = payload.getDestination(); ActiveMQDestination destination = payload.getDestination();
@ -388,7 +393,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) { if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy(); Message payload = messageReference.getMessage().copy();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody(); payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
ActiveMQDestination destination = payload.getDestination(); ActiveMQDestination destination = payload.getDestination();
@ -409,7 +416,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) { if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy(); Message payload = messageReference.getMessage().copy();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody(); payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
if (sub instanceof TopicSubscription) { if (sub instanceof TopicSubscription) {
advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
@ -498,7 +507,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) { if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy(); Message payload = messageReference.getMessage().copy();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody(); payload.clearBody();
}
fireAdvisory(context, topic, payload); fireAdvisory(context, topic, payload);
} }
} catch (Exception e) { } catch (Exception e) {
@ -551,6 +562,12 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) {
Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination);
return (destination instanceof BaseDestination &&
((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false;
}
private void handleFireFailure(String message, Throwable cause) { private void handleFireFailure(String message, Throwable cause) {
LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
LOG.debug("{} detail: {}", message, cause); LOG.debug("{} detail: {}", message, cause);

View File

@ -84,6 +84,7 @@ public abstract class BaseDestination implements Destination {
private boolean advisoryForDelivery; private boolean advisoryForDelivery;
private boolean advisoryForConsumed; private boolean advisoryForConsumed;
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
private boolean includeBodyForAdvisory;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService; protected final BrokerService brokerService;
protected final Broker regionBroker; protected final Broker regionBroker;
@ -466,6 +467,14 @@ public abstract class BaseDestination implements Destination {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
} }
public boolean isIncludeBodyForAdvisory() {
return includeBodyForAdvisory;
}
public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
this.includeBodyForAdvisory = includeBodyForAdvisory;
}
/** /**
* @return the dead letter strategy * @return the dead letter strategy
*/ */

View File

@ -81,6 +81,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean advisoryWhenFull; private boolean advisoryWhenFull;
private boolean advisoryForDelivery; private boolean advisoryForDelivery;
private boolean advisoryForConsumed; private boolean advisoryForConsumed;
private boolean includeBodyForAdvisory;
private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
@ -200,6 +201,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
} }
@ -740,6 +742,26 @@ public class PolicyEntry extends DestinationMapEntry {
this.advisoryForFastProducers = advisoryForFastProducers; this.advisoryForFastProducers = advisoryForFastProducers;
} }
/**
* Returns true if the original message body should be included when applicable
* for advisory messages
*
* @return
*/
public boolean isIncludeBodyForAdvisory() {
return includeBodyForAdvisory;
}
/**
* Sets if the original message body should be included when applicable
* for advisory messages
*
* @param includeBodyForAdvisory
*/
public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
this.includeBodyForAdvisory = includeBodyForAdvisory;
}
public void setMaxExpirePageSize(int maxExpirePageSize) { public void setMaxExpirePageSize(int maxExpirePageSize) {
this.maxExpirePageSize = maxExpirePageSize; this.maxExpirePageSize = maxExpirePageSize;
} }

View File

@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -44,10 +47,14 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/** /**
* Test for advisory messages sent under the right circumstances. * Test for advisory messages sent under the right circumstances.
*/ */
@RunWith(Parameterized.class)
public class AdvisoryTests { public class AdvisoryTests {
protected static final int MESSAGE_COUNT = 2000; protected static final int MESSAGE_COUNT = 2000;
@ -55,9 +62,25 @@ public class AdvisoryTests {
protected Connection connection; protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount; protected int topicCount;
protected final boolean includeBodyForAdvisory;
protected final int EXPIRE_MESSAGE_PERIOD = 10000; protected final int EXPIRE_MESSAGE_PERIOD = 10000;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Include the full body of the message
{true},
// Don't include the full body of the message
{false}
});
}
public AdvisoryTests(boolean includeBodyForAdvisory) {
super();
this.includeBodyForAdvisory = includeBodyForAdvisory;
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testNoSlowConsumerAdvisory() throws Exception { public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -122,6 +145,11 @@ public class AdvisoryTests {
Message msg = advisoryConsumer.receive(1000); Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -149,6 +177,9 @@ public class AdvisoryTests {
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID(); String originalId = payload.getJMSMessageID();
assertEquals(originalId, id); assertEquals(originalId, id);
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -171,6 +202,11 @@ public class AdvisoryTests {
Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD); Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -185,14 +221,24 @@ public class AdvisoryTests {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
s.createConsumer(advisoryTopic); s.createConsumer(advisoryTopic);
} }
MessageConsumer advisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic((ActiveMQDestination) topic));
MessageProducer producer = s.createProducer(topic); MessageProducer producer = s.createProducer(topic);
int count = 10; int count = 10;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m); producer.send(m);
} }
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for DLQ advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
// we should get here without StackOverflow // we should get here without StackOverflow
} }
@ -211,11 +257,17 @@ public class AdvisoryTests {
int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage(); BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m); producer.send(m);
} }
Message msg = advisoryConsumer.receive(1000); Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg); assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
} }
@Before @Before
@ -258,6 +310,7 @@ public class AdvisoryTests {
policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true); policy.setAdvisoryWhenFull(true);
policy.setIncludeBodyForAdvisory(includeBodyForAdvisory);
policy.setProducerFlowControl(false); policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10); strategy.setLimit(10);
@ -269,4 +322,12 @@ public class AdvisoryTests {
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
} }
protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {
if (includeBodyForAdvisory) {
assertNotNull(payload.getContent());
} else {
assertNull(payload.getContent());
}
}
} }