diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java new file mode 100644 index 0000000000..444e752a0f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PriorityNetworkDispatchPolicy.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * dispatch policy that ignores lower priority duplicate network consumers, + * used in conjunction with network bridge suppresDuplicateTopicSubscriptions + * + * @org.apache.xbean.XBean + */ +public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy { + + private static final Log LOG = LogFactory.getLog(PriorityNetworkDispatchPolicy.class); + @Override + public boolean dispatch(MessageReference node, + MessageEvaluationContext msgContext, + List consumers) throws Exception { + + List duplicateFreeSubs = new ArrayList(); + synchronized (consumers) { + for (Subscription sub: consumers) { + ConsumerInfo info = sub.getConsumerInfo(); + if (info.isNetworkSubscription()) { + boolean highestPrioritySub = true; + for (Subscription candidate: duplicateFreeSubs) { + if (matches(candidate, info)) { + if (hasLowerPriority(candidate, info)) { + duplicateFreeSubs.remove(candidate); + } else { + // higher priority matching sub exists + highestPrioritySub = false; + if (LOG.isDebugEnabled()) { + LOG.debug("ignoring lower priority: " + candidate + + "[" +candidate.getConsumerInfo().getNetworkConsumerIds() +", " + + candidate.getConsumerInfo().getNetworkConsumerIds() +"] in favour of: " + + sub + + "[" +sub.getConsumerInfo().getNetworkConsumerIds() +", " + + sub.getConsumerInfo().getNetworkConsumerIds() +"]"); + } + } + } + } + if (highestPrioritySub) { + duplicateFreeSubs.add(sub); + } + } else { + duplicateFreeSubs.add(sub); + } + } + } + + return super.dispatch(node, msgContext, duplicateFreeSubs); + } + + private boolean hasLowerPriority(Subscription candidate, + ConsumerInfo info) { + return candidate.getConsumerInfo().getPriority() < info.getPriority(); + } + + private boolean matches(Subscription candidate, ConsumerInfo info) { + boolean matched = false; + for (ConsumerId candidateId: candidate.getConsumerInfo().getNetworkConsumerIds()) { + for (ConsumerId subId: info.getNetworkConsumerIds()) { + if (candidateId.equals(subId)) { + matched = true; + break; + } + } + } + return matched; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index d4fa014418..3d7c123344 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1020,7 +1020,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); boolean suppress = false; - if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()) { + if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || + consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) { return suppress; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 68562ab994..95aefef753 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -46,6 +46,7 @@ public class NetworkBridgeConfiguration { private List staticallyIncludedDestinations; private boolean suppressDuplicateQueueSubscriptions = false; + private boolean suppressDuplicateTopicSubscriptions = true; /** @@ -275,6 +276,18 @@ public class NetworkBridgeConfiguration { suppressDuplicateQueueSubscriptions = val; } + public boolean isSuppressDuplicateTopicSubscriptions() { + return suppressDuplicateTopicSubscriptions; + } + + /** + * + * @param val if true, duplicate network topic subscriptions (in a cyclic network) will be suppressed + */ + public void setSuppressDuplicateTopicSubscriptions(boolean val) { + suppressDuplicateTopicSubscriptions = val; + } + /** * @return the brokerURL */ diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java index 176397c57b..62bc34f618 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.usecases; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,16 +35,23 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import junit.framework.TestCase; +import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DispatchPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy; +import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class NoDuplicateOnTopicNetworkTest extends TestCase { +public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { private static final Log LOG = LogFactory .getLog(NoDuplicateOnTopicNetworkTest.class); @@ -51,17 +59,23 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { private static final String BROKER_1 = "tcp://localhost:61626"; private static final String BROKER_2 = "tcp://localhost:61636"; private static final String BROKER_3 = "tcp://localhost:61646"; + private final static String TOPIC_NAME = "broadcast"; private BrokerService broker1; private BrokerService broker2; private BrokerService broker3; + public boolean suppressDuplicateTopicSubs = false; + public DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); + private boolean dynamicOnly = false; // no duplicates in cyclic network if networkTTL <=1 // when > 1, subscriptions perculate around resulting in duplicates as there is no // memory of the original subscription. // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds() private int ttl = 3; - + + + @Override protected void setUp() throws Exception { super.setUp(); @@ -76,6 +90,10 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { waitForBridgeFormation(); } + public static Test suite() { + return suite(NoDuplicateOnTopicNetworkTest.class); + } + protected void waitForBridgeFormation() throws Exception { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { @@ -105,7 +123,17 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { networkConnector.setDecreaseNetworkConsumerPriority(true); networkConnector.setDynamicOnly(dynamicOnly); networkConnector.setNetworkTTL(ttl); + networkConnector.setSuppressDuplicateTopicSubscriptions(suppressDuplicateTopicSubs); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(dispatchPolicy); + // the audit will suppress the duplicates as it defaults to true so this test + // checking for dups will fail. it is good to have it on in practice. + policy.setEnableAudit(false); + policyMap.put(new ActiveMQTopic(TOPIC_NAME), policy); + broker.setDestinationPolicy(policyMap); broker.start(); return broker; @@ -119,13 +147,20 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { super.tearDown(); } + public void initCombosForTestProducerConsumerTopic() { + this.addCombinationValues("suppresDuplicateTopicSubs", new Object[]{Boolean.TRUE, Boolean.FALSE}); + this.addCombinationValues("dispatchPolicy", new Object[]{new PriorityNetworkDispatchPolicy(), new SimpleDispatchPolicy()}); + } + public void testProducerConsumerTopic() throws Exception { - final String topicName = "broadcast"; + + final CountDownLatch consumerStarted = new CountDownLatch(1); + Thread producerThread = new Thread(new Runnable() { public void run() { TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages(); producer.setBrokerURL(BROKER_1); - producer.setTopicName(topicName); + producer.setTopicName(TOPIC_NAME); try { producer.produce(); } catch (JMSException e) { @@ -138,9 +173,10 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { Thread consumerThread = new Thread(new Runnable() { public void run() { consumer.setBrokerURL(BROKER_2); - consumer.setTopicName(topicName); + consumer.setTopicName(TOPIC_NAME); try { consumer.consumer(); + consumerStarted.countDown(); consumer.getLatch().await(60, TimeUnit.SECONDS); } catch (Exception e) { fail("Unexpected " + e); @@ -151,20 +187,32 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { consumerThread.start(); LOG.info("Started Consumer"); + assertTrue("consumer started eventually", consumerStarted.await(10, TimeUnit.SECONDS)); + // ensure subscription has percolated though the network Thread.sleep(2000); + producerThread.start(); LOG.info("Started Producer"); producerThread.join(); consumerThread.join(); + int duplicateCount = 0; Map map = new HashMap(); for (String msg : consumer.getMessageStrings()) { - assertTrue("is not a duplicate: " + msg, !map.containsKey(msg)); + if (map.containsKey(msg)) { + LOG.info("got duplicate: " + msg); + duplicateCount++; + } map.put(msg, msg); } - assertEquals("got all required messages: " + map.size(), consumer - .getNumMessages(), map.size()); + if (suppressDuplicateTopicSubs || dispatchPolicy instanceof PriorityNetworkDispatchPolicy) { + assertEquals("no duplicates", 0, duplicateCount); + assertEquals("got all required messages: " + map.size(), consumer + .getNumMessages(), map.size()); + } else { + assertTrue("we got some duplicates", duplicateCount > 0); + } } class TopicWithDuplicateMessages { @@ -176,16 +224,18 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase { private MessageProducer producer; private MessageConsumer consumer; - private List receivedStrings = new ArrayList(); + private List receivedStrings = Collections.synchronizedList(new ArrayList()); private int numMessages = 10; private CountDownLatch recievedLatch = new CountDownLatch(numMessages); public CountDownLatch getLatch() { return recievedLatch; } - + public List getMessageStrings() { - return receivedStrings; + synchronized(receivedStrings) { + return new ArrayList(receivedStrings); + } } public String getBrokerURL() {