diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 0e37c9d3d0..9992afe404 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -63,16 +63,16 @@ abstract public class AbstractRegion implements Region { synchronized(destinationsMutex){ destinations.put(destination,dest); destinationMap.put(destination,dest); - } - - // Add all consumers that are interested in the destination. - for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { - Subscription sub = (Subscription) iter.next(); - if( sub.matches(destination) ) { - dest.addSubscription(context, sub); + + // Add all consumers that are interested in the destination. + for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { + Subscription sub = (Subscription) iter.next(); + if( sub.matches(destination) ) { + dest.addSubscription(context, sub); + } } + return dest; } - return dest; } public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 41a87617ce..ab44db4a99 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -20,12 +20,13 @@ import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; @@ -55,7 +56,7 @@ public class Queue implements Destination { private final Log log; protected final ActiveMQDestination destination; - protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); + protected final List consumers = new CopyOnWriteArrayList(); protected final LinkedList messages = new LinkedList(); protected final Valve dispatchValve = new Valve(true); protected final UsageManager usageManager; @@ -123,7 +124,9 @@ public class Queue implements Destination { MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); try { - consumers.add(sub); + synchronized (consumers) { + consumers.add(sub); + } highestSubscriptionPriority = calcHighestSubscriptionPriority(); msgContext.setDestination(destination); @@ -167,7 +170,9 @@ public class Queue implements Destination { dispatchValve.turnOff(); try { - consumers.remove(sub); + synchronized (consumers) { + consumers.remove(sub); + } sub.remove(context, this); highestSubscriptionPriority = calcHighestSubscriptionPriority(); @@ -350,6 +355,7 @@ public class Queue implements Destination { } private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Throwable { + dispatchValve.increment(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); try { @@ -358,8 +364,12 @@ public class Queue implements Destination { messages.add(node); } - if (consumers.isEmpty()) - return; + synchronized(consumers) { + if (consumers.isEmpty()) { + log.debug("No subscriptions registered, will not dispatch message at this time."); + return; + } + } msgContext.setDestination(destination); msgContext.setMessageReference(node); @@ -374,10 +384,12 @@ public class Queue implements Destination { private int calcHighestSubscriptionPriority() { int rc = Integer.MIN_VALUE; - for (Iterator iter = consumers.iterator(); iter.hasNext();) { - Subscription sub = (Subscription) iter.next(); - if (sub.getConsumerInfo().getPriority() > rc) { - rc = sub.getConsumerInfo().getPriority(); + synchronized (consumers) { + for (Iterator iter = consumers.iterator(); iter.hasNext();) { + Subscription sub = (Subscription) iter.next(); + if (sub.getConsumerInfo().getPriority() > rc) { + rc = sub.getConsumerInfo().getPriority(); + } } } return rc; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 311e0543d0..386315eef3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -37,6 +37,7 @@ public class QueueSubscription extends PrefetchSubscription { public void add(MessageReference node) throws Throwable { super.add(node); } + /** * In the queue case, mark the node as dropped and then a gc cycle will remove it from * the queue. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 3e9577fc34..408156df99 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -96,7 +96,9 @@ public class Topic implements Destination { if (sub.getConsumerInfo().isRetroactive()) { subscriptionRecoveryPolicy.recover(context, this, sub); } - consumers.add(sub); + synchronized(consumers) { + consumers.add(sub); + } } } @@ -108,8 +110,11 @@ public class Topic implements Destination { dispatchValve.turnOff(); try { - if (initialActivation) - consumers.add(sub); + if (initialActivation) { + synchronized(consumers) { + consumers.add(sub); + } + } if (store != null) { String clientId = sub.getClientId(); @@ -166,7 +171,9 @@ public class Topic implements Destination { public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable { destinationStatistics.getConsumers().decrement(); - consumers.remove(sub); + synchronized(consumers) { + consumers.remove(sub); + } sub.remove(context, this); } @@ -302,9 +309,11 @@ public class Topic implements Destination { if (!subscriptionRecoveryPolicy.add(context, message)) { return; } - if (consumers.isEmpty()) { - onMessageWithNoConsumers(context, message); - return; + synchronized(consumers) { + if (consumers.isEmpty()) { + onMessageWithNoConsumers(context, message); + return; + } } msgContext.setDestination(destination); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java index 37578be39a..0dba083b8c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.broker.region.policy; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.util.List; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; @@ -44,6 +44,6 @@ public interface DispatchPolicy { * * @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched */ - boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable; + boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java index d4eacd7537..5d486d1a36 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.region.policy; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; @@ -24,6 +23,7 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.filter.MessageEvaluationContext; import java.util.Iterator; +import java.util.List; /** * Simple dispatch policy that sends a message to every subscription that @@ -35,14 +35,12 @@ import java.util.Iterator; */ public class RoundRobinDispatchPolicy implements DispatchPolicy { - private final Object mutex = new Object(); - - public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable { + public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable { // Big synch here so that only 1 message gets dispatched at a time. Ensures // Everyone sees the same order and that the consumer list is not used while // it's being rotated. - synchronized(mutex) { + synchronized(consumers) { int count = 0; for (Iterator iter = consumers.iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java index b26e73f04a..90cf63b16b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.region.policy; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; @@ -24,6 +23,7 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.filter.MessageEvaluationContext; import java.util.Iterator; +import java.util.List; /** * Simple dispatch policy that sends a message to every subscription that @@ -35,7 +35,7 @@ import java.util.Iterator; */ public class SimpleDispatchPolicy implements DispatchPolicy { - public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable { + public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable { int count = 0; for (Iterator iter = consumers.iterator(); iter.hasNext();) { Subscription sub = (Subscription) iter.next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java index 14ec450b55..1f3c604b7d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.region.policy; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; @@ -24,6 +23,7 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.filter.MessageEvaluationContext; import java.util.Iterator; +import java.util.List; /** * Dispatch policy that causes every subscription to see messages in the same order. @@ -33,15 +33,12 @@ import java.util.Iterator; * @version $Revision$ */ public class StrictOrderDispatchPolicy implements DispatchPolicy { - int i=0; - private final Object mutex = new Object(); - public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable { + public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable { // Big synch here so that only 1 message gets dispatched at a time. Ensures // Everyone sees the same order. - synchronized(mutex) { + synchronized(consumers) { int count = 0; - i++; for (Iterator iter = consumers.iterator(); iter.hasNext();) { Subscription sub = (Subscription) iter.next(); diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java index 804d4681d2..153e7a66ac 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java @@ -196,7 +196,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport { } protected BrokerService createBroker() throws Exception { - return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true")); } protected void setUp() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java index 44117cc4f0..6d6515b8eb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java @@ -30,6 +30,18 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { topic = false; } + public void testManyProducersOneConsumer() throws Exception { + consumerCount = 1; + producerCount = 10; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + } + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { consumerCount = 2; producerCount = 1; @@ -102,18 +114,6 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { assertTotalMessagesReceived(messageCount * producerCount); } - public void testManyProducersOneConsumer() throws Exception { - consumerCount = 1; - producerCount = 50; - messageCount = 100; - messageSize = 1; // 1 byte - prefetchCount = 10; - - doMultipleClientsTest(); - - assertTotalMessagesReceived(messageCount * producerCount); - } - public void testManyProducersManyConsumers() throws Exception { consumerCount = 50; producerCount = 50; @@ -137,7 +137,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { startConsumers(consumerFactory, dest); // Wait for consumers to setup - Thread.sleep(1000); +// Thread.sleep(1000); startProducers(dest, messageCount);