first cut at resolution to: https://issues.apache.org/activemq/browse/AMQ-2753 - add priority network consumer dispatch policy that will ignore duplicates such that there can be redundancy in the network with the addition of suppressDuplicateTopicSubscriptions attribute on a network bridge

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@948911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-05-27 17:02:41 +00:00
parent b50ede668d
commit ecf068e23b
4 changed files with 175 additions and 13 deletions

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* dispatch policy that ignores lower priority duplicate network consumers,
* used in conjunction with network bridge suppresDuplicateTopicSubscriptions
*
* @org.apache.xbean.XBean
*/
public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy {
private static final Log LOG = LogFactory.getLog(PriorityNetworkDispatchPolicy.class);
@Override
public boolean dispatch(MessageReference node,
MessageEvaluationContext msgContext,
List<Subscription> consumers) throws Exception {
List<Subscription> duplicateFreeSubs = new ArrayList<Subscription>();
synchronized (consumers) {
for (Subscription sub: consumers) {
ConsumerInfo info = sub.getConsumerInfo();
if (info.isNetworkSubscription()) {
boolean highestPrioritySub = true;
for (Subscription candidate: duplicateFreeSubs) {
if (matches(candidate, info)) {
if (hasLowerPriority(candidate, info)) {
duplicateFreeSubs.remove(candidate);
} else {
// higher priority matching sub exists
highestPrioritySub = false;
if (LOG.isDebugEnabled()) {
LOG.debug("ignoring lower priority: " + candidate
+ "[" +candidate.getConsumerInfo().getNetworkConsumerIds() +", "
+ candidate.getConsumerInfo().getNetworkConsumerIds() +"] in favour of: "
+ sub
+ "[" +sub.getConsumerInfo().getNetworkConsumerIds() +", "
+ sub.getConsumerInfo().getNetworkConsumerIds() +"]");
}
}
}
}
if (highestPrioritySub) {
duplicateFreeSubs.add(sub);
}
} else {
duplicateFreeSubs.add(sub);
}
}
}
return super.dispatch(node, msgContext, duplicateFreeSubs);
}
private boolean hasLowerPriority(Subscription candidate,
ConsumerInfo info) {
return candidate.getConsumerInfo().getPriority() < info.getPriority();
}
private boolean matches(Subscription candidate, ConsumerInfo info) {
boolean matched = false;
for (ConsumerId candidateId: candidate.getConsumerInfo().getNetworkConsumerIds()) {
for (ConsumerId subId: info.getNetworkConsumerIds()) {
if (candidateId.equals(subId)) {
matched = true;
break;
}
}
}
return matched;
}
}

View File

@ -1020,7 +1020,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
boolean suppress = false; boolean suppress = false;
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) { if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
return suppress; return suppress;
} }

View File

@ -46,6 +46,7 @@ public class NetworkBridgeConfiguration {
private List<ActiveMQDestination> staticallyIncludedDestinations; private List<ActiveMQDestination> staticallyIncludedDestinations;
private boolean suppressDuplicateQueueSubscriptions = false; private boolean suppressDuplicateQueueSubscriptions = false;
private boolean suppressDuplicateTopicSubscriptions = true;
/** /**
@ -275,6 +276,18 @@ public class NetworkBridgeConfiguration {
suppressDuplicateQueueSubscriptions = val; suppressDuplicateQueueSubscriptions = val;
} }
public boolean isSuppressDuplicateTopicSubscriptions() {
return suppressDuplicateTopicSubscriptions;
}
/**
*
* @param val if true, duplicate network topic subscriptions (in a cyclic network) will be suppressed
*/
public void setSuppressDuplicateTopicSubscriptions(boolean val) {
suppressDuplicateTopicSubscriptions = val;
}
/** /**
* @return the brokerURL * @return the brokerURL
*/ */

View File

@ -18,6 +18,7 @@ package org.apache.activemq.usecases;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -34,16 +35,23 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import junit.framework.TestCase; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
public class NoDuplicateOnTopicNetworkTest extends TestCase { public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(NoDuplicateOnTopicNetworkTest.class); .getLog(NoDuplicateOnTopicNetworkTest.class);
@ -51,17 +59,23 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
private static final String BROKER_1 = "tcp://localhost:61626"; private static final String BROKER_1 = "tcp://localhost:61626";
private static final String BROKER_2 = "tcp://localhost:61636"; private static final String BROKER_2 = "tcp://localhost:61636";
private static final String BROKER_3 = "tcp://localhost:61646"; private static final String BROKER_3 = "tcp://localhost:61646";
private final static String TOPIC_NAME = "broadcast";
private BrokerService broker1; private BrokerService broker1;
private BrokerService broker2; private BrokerService broker2;
private BrokerService broker3; private BrokerService broker3;
public boolean suppressDuplicateTopicSubs = false;
public DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private boolean dynamicOnly = false; private boolean dynamicOnly = false;
// no duplicates in cyclic network if networkTTL <=1 // no duplicates in cyclic network if networkTTL <=1
// when > 1, subscriptions perculate around resulting in duplicates as there is no // when > 1, subscriptions perculate around resulting in duplicates as there is no
// memory of the original subscription. // memory of the original subscription.
// solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds() // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds()
private int ttl = 3; private int ttl = 3;
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
@ -76,6 +90,10 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
waitForBridgeFormation(); waitForBridgeFormation();
} }
public static Test suite() {
return suite(NoDuplicateOnTopicNetworkTest.class);
}
protected void waitForBridgeFormation() throws Exception { protected void waitForBridgeFormation() throws Exception {
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
@ -105,7 +123,17 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
networkConnector.setDecreaseNetworkConsumerPriority(true); networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDynamicOnly(dynamicOnly); networkConnector.setDynamicOnly(dynamicOnly);
networkConnector.setNetworkTTL(ttl); networkConnector.setNetworkTTL(ttl);
networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setDispatchPolicy(dispatchPolicy);
// the audit will suppress the duplicates as it defaults to true so this test
// checking for dups will fail. it is good to have it on in practice.
policy.setEnableAudit(false);
policyMap.put(new ActiveMQTopic(TOPIC_NAME), policy);
broker.setDestinationPolicy(policyMap);
broker.start(); broker.start();
return broker; return broker;
@ -119,13 +147,20 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
super.tearDown(); super.tearDown();
} }
public void initCombosForTestProducerConsumerTopic() {
this.addCombinationValues("suppresDuplicateTopicSubs", new Object[]{Boolean.TRUE, Boolean.FALSE});
this.addCombinationValues("dispatchPolicy", new Object[]{new PriorityNetworkDispatchPolicy(), new SimpleDispatchPolicy()});
}
public void testProducerConsumerTopic() throws Exception { public void testProducerConsumerTopic() throws Exception {
final String topicName = "broadcast";
final CountDownLatch consumerStarted = new CountDownLatch(1);
Thread producerThread = new Thread(new Runnable() { Thread producerThread = new Thread(new Runnable() {
public void run() { public void run() {
TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages(); TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages();
producer.setBrokerURL(BROKER_1); producer.setBrokerURL(BROKER_1);
producer.setTopicName(topicName); producer.setTopicName(TOPIC_NAME);
try { try {
producer.produce(); producer.produce();
} catch (JMSException e) { } catch (JMSException e) {
@ -138,9 +173,10 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
Thread consumerThread = new Thread(new Runnable() { Thread consumerThread = new Thread(new Runnable() {
public void run() { public void run() {
consumer.setBrokerURL(BROKER_2); consumer.setBrokerURL(BROKER_2);
consumer.setTopicName(topicName); consumer.setTopicName(TOPIC_NAME);
try { try {
consumer.consumer(); consumer.consumer();
consumerStarted.countDown();
consumer.getLatch().await(60, TimeUnit.SECONDS); consumer.getLatch().await(60, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
fail("Unexpected " + e); fail("Unexpected " + e);
@ -151,20 +187,32 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
consumerThread.start(); consumerThread.start();
LOG.info("Started Consumer"); LOG.info("Started Consumer");
assertTrue("consumer started eventually", consumerStarted.await(10, TimeUnit.SECONDS));
// ensure subscription has percolated though the network // ensure subscription has percolated though the network
Thread.sleep(2000); Thread.sleep(2000);
producerThread.start(); producerThread.start();
LOG.info("Started Producer"); LOG.info("Started Producer");
producerThread.join(); producerThread.join();
consumerThread.join(); consumerThread.join();
int duplicateCount = 0;
Map<String, String> map = new HashMap<String, String>(); Map<String, String> map = new HashMap<String, String>();
for (String msg : consumer.getMessageStrings()) { for (String msg : consumer.getMessageStrings()) {
assertTrue("is not a duplicate: " + msg, !map.containsKey(msg)); if (map.containsKey(msg)) {
LOG.info("got duplicate: " + msg);
duplicateCount++;
}
map.put(msg, msg); map.put(msg, msg);
} }
assertEquals("got all required messages: " + map.size(), consumer if (suppressDuplicateTopicSubs || dispatchPolicy instanceof PriorityNetworkDispatchPolicy) {
.getNumMessages(), map.size()); assertEquals("no duplicates", 0, duplicateCount);
assertEquals("got all required messages: " + map.size(), consumer
.getNumMessages(), map.size());
} else {
assertTrue("we got some duplicates", duplicateCount > 0);
}
} }
class TopicWithDuplicateMessages { class TopicWithDuplicateMessages {
@ -176,16 +224,18 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
private MessageProducer producer; private MessageProducer producer;
private MessageConsumer consumer; private MessageConsumer consumer;
private List<String> receivedStrings = new ArrayList<String>(); private List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>());
private int numMessages = 10; private int numMessages = 10;
private CountDownLatch recievedLatch = new CountDownLatch(numMessages); private CountDownLatch recievedLatch = new CountDownLatch(numMessages);
public CountDownLatch getLatch() { public CountDownLatch getLatch() {
return recievedLatch; return recievedLatch;
} }
public List<String> getMessageStrings() { public List<String> getMessageStrings() {
return receivedStrings; synchronized(receivedStrings) {
return new ArrayList<String>(receivedStrings);
}
} }
public String getBrokerURL() { public String getBrokerURL() {