From b02ca01826b297f833258968aefe004938bdd40e Mon Sep 17 00:00:00 2001 From: "Jonas B. Lim" Date: Mon, 20 Nov 2006 06:20:12 +0000 Subject: [PATCH] applied patch for http://issues.apache.org/activemq/browse/AMQ-1006 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@477066 13f79535-47bb-0310-9956-ffa450edef68 --- .../policy/RoundRobinDispatchPolicy.java | 15 ++++++++----- .../policy/RoundRobinDispatchPolicyTest.java | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java index 18039d119c..844bd4032d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java @@ -53,6 +53,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy { synchronized(consumers) { int count = 0; + Subscription firstMatchingConsumer = null; for (Iterator iter = consumers.iterator(); iter.hasNext();) { Subscription sub = (Subscription) iter.next(); @@ -60,17 +61,21 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy { if (!sub.matches(node, msgContext)) continue; + if (firstMatchingConsumer == null) { + firstMatchingConsumer = sub; + } + sub.add(node); count++; } + if (firstMatchingConsumer != null) { // Rotate the consumer list. try { - if (consumers.size() > 1) - consumers.add(consumers.remove(0)); - } catch (Throwable bestEffort) { - log.error("Caught error rotating consumers",bestEffort); - } + consumers.remove(firstMatchingConsumer); + consumers.add(firstMatchingConsumer); + } catch (Throwable bestEffort) { } + } return count > 0; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java index cf981c1b78..6324adc806 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java @@ -23,6 +23,11 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.PolicyMap; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest { protected BrokerService createBroker() throws Exception { @@ -81,6 +86,23 @@ public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest { super.testManyProducersManyConsumers(); assertMessagesDividedAmongConsumers(); } + + public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception { + // Create consumer that won't consume any message + createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1"); + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + assertMessagesDividedAmongConsumers(); + } + + protected MessageConsumer createMessageConsumer(Connection conn, Destination dest, String selector) throws Exception { + connections.add(conn); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(dest, selector); + conn.start(); + + return consumer; + } public void assertMessagesDividedAmongConsumers() { assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount);