resolve https://issues.apache.org/activemq/browse/AMQ-2704 - add message audit to topic sub so that a regular sub can behave like store backed subscriptions which already suppress duplicates. Dup ocurrs from ring network topology where there are two equal and valid routes for a message, see test case

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@936390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-04-21 16:45:30 +00:00
parent 6a73c40c18
commit eb983f7c08
5 changed files with 189 additions and 15 deletions

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@ -62,6 +63,12 @@ public class TopicSubscription extends AbstractSubscription {
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
private boolean slowConsumer;
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
protected int maxAuditDepth = 1000;
protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@ -78,9 +85,15 @@ public class TopicSubscription extends AbstractSubscription {
this.matched.setSystemUsage(usageManager);
this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.matched.start();
if (enableAudit) {
audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
}
}
public void add(MessageReference node) throws Exception {
if (isDuplicate(node)) {
return;
}
enqueueCounter.incrementAndGet();
if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages which
@ -158,6 +171,19 @@ public class TopicSubscription extends AbstractSubscription {
}
}
private boolean isDuplicate(MessageReference node) {
boolean duplicate = false;
if (enableAudit && audit != null) {
duplicate = audit.isDuplicate(node);
if (LOG.isDebugEnabled()) {
if (duplicate) {
LOG.debug("ignoring duplicate add: " + node.getMessageId());
}
}
}
return duplicate;
}
/**
* Discard any expired messages from the matched list. Called from a
* synchronized block.
@ -313,6 +339,39 @@ public class TopicSubscription extends AbstractSubscription {
this.messageEvictionStrategy = messageEvictionStrategy;
}
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
if (audit != null) {
audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
}
}
public int getMaxAuditDepth() {
return maxAuditDepth;
}
public synchronized void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
if (audit != null) {
audit.setAuditDepth(maxAuditDepth);
}
}
public boolean isEnableAudit() {
return enableAudit;
}
public synchronized void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
if (enableAudit && audit==null) {
audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
}
}
// Implementation methods
// -------------------------------------------------------------------------
public boolean isFull() {

View File

@ -91,7 +91,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
* the cache. If subsequently, we pull out that message from the store (before its deleted)
* it will be a duplicate - but should be ignored
*/
//LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
}
storeHasMessages = true;
}
return recovered;

View File

@ -178,6 +178,11 @@ public class PolicyEntry extends DestinationMapEntry {
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
}
if (enableAudit) {
subscription.setEnableAudit(enableAudit);
subscription.setMaxProducersToAudit(maxProducersToAudit);
subscription.setMaxAuditDepth(maxAuditDepth);
}
}
public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
@ -680,9 +681,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
if (originallyCameFromRemote(md, sub)) {
if (suppressMessageDispatch(md, sub)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL: " + md.getMessage());
LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
}
// still ack as it may be durable
try {
@ -695,7 +696,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Message message = configureMessage(md);
if (LOG.isDebugEnabled()) {
LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
}
if (!message.isResponseRequired()) {
@ -776,25 +777,25 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
private boolean originallyCameFromRemote(MessageDispatch md, DemandSubscription sub) throws Exception {
private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
// See if this consumer's brokerPath tells us it came from the broker at the other end
// of the bridge. I think we should be making this decision based on the message's
// broker bread crumbs and not the consumer's? However, the message's broker bread
// crumbs are null, which is another matter.
boolean cameFromRemote = false;
boolean suppress = false;
Object consumerInfo = md.getMessage().getDataStructure();
if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
}
// for durable subs, suppression via filter leaves dangling acks so we need to
// check here and allow the ack irrespective
if (!cameFromRemote && sub.getLocalInfo().isDurable()) {
if (!suppress && sub.getLocalInfo().isDurable()) {
MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
cameFromRemote = !createNetworkBridgeFilter(null).matches(messageEvalContext);
suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext);
}
return cameFromRemote;
return suppress;
}
/**
@ -1154,7 +1155,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
} else {
// need to ack this message if it is ignored as it is durable so
// we check before we send. see: originallyCameFromRemote()
// we check before we send. see: suppressMessageDispatch()
}
}

View File

@ -17,15 +17,21 @@
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Topic;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.transport.failover.FailoverUriTest;
import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.util.MessageIdList;
/**
@ -242,6 +248,106 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
}
public void testAllConnectedBrokerNetworkSingleProducerTTL() throws Exception {
// duplicates are expected with ttl of 2 as each broker is connected to the next
// but the dups are suppressed by the store and now also by the topic sub when enableAudit
// default (true) is present in a matching destination policy entry
int networkTTL = 2;
boolean conduitSubs = true;
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerC", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
PolicyMap policyMap = new PolicyMap();
// enable audit is on by default just need to give it matching policy entry
// so it will be applied to the topic subscription
policyMap.setDefaultEntry(new PolicyEntry());
Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
broker.setDestinationPolicy(policyMap);
broker.setDeleteAllMessagesOnStartup(true);
}
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", true);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageConsumer clientB = createConsumer("BrokerB", dest);
MessageConsumer clientC = createConsumer("BrokerC", dest);
//let consumers propogate around the network
Thread.sleep(2000);
// Send messages
sendMessages("BrokerA", dest, 1);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(1);
msgsB.waitForMessagesToArrive(1);
msgsC.waitForMessagesToArrive(1);
// ensure we don't get any more messages
Thread.sleep(2000);
assertEquals(1, msgsA.getMessageCount());
assertEquals(1, msgsB.getMessageCount());
assertEquals(1, msgsC.getMessageCount());
}
public void testAllConnectedBrokerNetworkDurableSubTTL() throws Exception {
int networkTTL = 2;
boolean conduitSubs = true;
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerC", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", true);
// Setup consumers
MessageConsumer clientA = createDurableSubscriber("BrokerA", (Topic)dest, "clientA");
MessageConsumer clientB = createDurableSubscriber("BrokerB", (Topic)dest, "clientB");
MessageConsumer clientC = createDurableSubscriber("BrokerC", (Topic)dest, "clientC");
//let consumers propogate around the network
Thread.sleep(2000);
// Send messages
sendMessages("BrokerA", dest, 1);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(1);
msgsB.waitForMessagesToArrive(1);
msgsC.waitForMessagesToArrive(1);
// ensure we don't get any more messages
Thread.sleep(2000);
assertEquals(1, msgsA.getMessageCount());
assertEquals(1, msgsB.getMessageCount());
assertEquals(1, msgsC.getMessageCount());
}
/**
* BrokerA <-> BrokerB <-> BrokerC
*/
@ -284,9 +390,10 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
String options = new String("?persistent=false&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
}
public static Test suite() {