This closes #115

This commit is contained in:
Timothy Bish 2015-06-16 13:37:08 -04:00
commit 7b5c8be377
4 changed files with 115 additions and 6 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
@ -350,7 +351,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
fireAdvisory(context, topic, payload, null, advisoryMessage);
@ -367,7 +370,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
ActiveMQDestination destination = payload.getDestination();
@ -388,7 +393,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
ActiveMQDestination destination = payload.getDestination();
@ -409,7 +416,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody();
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
if (sub instanceof TopicSubscription) {
advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
@ -498,7 +507,9 @@ public class AdvisoryBroker extends BrokerFilter {
if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
payload.clearBody();
}
fireAdvisory(context, topic, payload);
}
} catch (Exception e) {
@ -551,6 +562,12 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) {
Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination);
return (destination instanceof BaseDestination &&
((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false;
}
private void handleFireFailure(String message, Throwable cause) {
LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
LOG.debug("{} detail: {}", message, cause);

View File

@ -84,6 +84,7 @@ public abstract class BaseDestination implements Destination {
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean sendAdvisoryIfNoConsumers;
private boolean includeBodyForAdvisory;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
protected final Broker regionBroker;
@ -466,6 +467,14 @@ public abstract class BaseDestination implements Destination {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
public boolean isIncludeBodyForAdvisory() {
return includeBodyForAdvisory;
}
public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
this.includeBodyForAdvisory = includeBodyForAdvisory;
}
/**
* @return the dead letter strategy
*/

View File

@ -81,6 +81,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean includeBodyForAdvisory;
private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
@ -200,6 +201,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
}
@ -740,6 +742,26 @@ public class PolicyEntry extends DestinationMapEntry {
this.advisoryForFastProducers = advisoryForFastProducers;
}
/**
* Returns true if the original message body should be included when applicable
* for advisory messages
*
* @return
*/
public boolean isIncludeBodyForAdvisory() {
return includeBodyForAdvisory;
}
/**
* Sets if the original message body should be included when applicable
* for advisory messages
*
* @param includeBodyForAdvisory
*/
public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
this.includeBodyForAdvisory = includeBodyForAdvisory;
}
public void setMaxExpirePageSize(int maxExpirePageSize) {
this.maxExpirePageSize = maxExpirePageSize;
}

View File

@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -44,10 +47,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Test for advisory messages sent under the right circumstances.
*/
@RunWith(Parameterized.class)
public class AdvisoryTests {
protected static final int MESSAGE_COUNT = 2000;
@ -55,9 +62,25 @@ public class AdvisoryTests {
protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount;
protected final boolean includeBodyForAdvisory;
protected final int EXPIRE_MESSAGE_PERIOD = 10000;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Include the full body of the message
{true},
// Don't include the full body of the message
{false}
});
}
public AdvisoryTests(boolean includeBodyForAdvisory) {
super();
this.includeBodyForAdvisory = includeBodyForAdvisory;
}
@Test(timeout = 60000)
public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -122,6 +145,11 @@ public class AdvisoryTests {
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
@Test(timeout = 60000)
@ -149,6 +177,9 @@ public class AdvisoryTests {
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID();
assertEquals(originalId, id);
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
@Test(timeout = 60000)
@ -171,6 +202,11 @@ public class AdvisoryTests {
Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
@Test(timeout = 60000)
@ -185,14 +221,24 @@ public class AdvisoryTests {
for (int i = 0; i < 100; i++) {
s.createConsumer(advisoryTopic);
}
MessageConsumer advisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic((ActiveMQDestination) topic));
MessageProducer producer = s.createProducer(topic);
int count = 10;
for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for DLQ advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
// we should get here without StackOverflow
}
@ -211,11 +257,17 @@ public class AdvisoryTests {
int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2);
for (int i = 0; i < count; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
//Add assertion to make sure body is included for advisory topics
//when includeBodyForAdvisory is true
assertIncludeBodyForAdvisory(payload);
}
@Before
@ -258,6 +310,7 @@ public class AdvisoryTests {
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);
policy.setIncludeBodyForAdvisory(includeBodyForAdvisory);
policy.setProducerFlowControl(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
@ -269,4 +322,12 @@ public class AdvisoryTests {
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {
if (includeBodyForAdvisory) {
assertNotNull(payload.getContent());
} else {
assertNull(payload.getContent());
}
}
}