Fixed the QueueSubscriptionTest. When multiple producers would pound all at once at a destionation. Some messages would not get dispatched. Due to timing issue with the consumer list held by the destination.

Added some more synchronization blocks and now everything is happy again.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360167 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-30 21:14:03 +00:00
parent 90e21b4e8f
commit 475925394a
10 changed files with 71 additions and 54 deletions

View File

@ -63,16 +63,16 @@ abstract public class AbstractRegion implements Region {
synchronized(destinationsMutex){ synchronized(destinationsMutex){
destinations.put(destination,dest); destinations.put(destination,dest);
destinationMap.put(destination,dest); destinationMap.put(destination,dest);
}
// Add all consumers that are interested in the destination.
// Add all consumers that are interested in the destination. for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = (Subscription) iter.next();
Subscription sub = (Subscription) iter.next(); if( sub.matches(destination) ) {
if( sub.matches(destination) ) { dest.addSubscription(context, sub);
dest.addSubscription(context, sub); }
} }
return dest;
} }
return dest;
} }
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)

View File

@ -20,12 +20,13 @@ import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -55,7 +56,7 @@ public class Queue implements Destination {
private final Log log; private final Log log;
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); protected final List consumers = new CopyOnWriteArrayList();
protected final LinkedList messages = new LinkedList(); protected final LinkedList messages = new LinkedList();
protected final Valve dispatchValve = new Valve(true); protected final Valve dispatchValve = new Valve(true);
protected final UsageManager usageManager; protected final UsageManager usageManager;
@ -123,7 +124,9 @@ public class Queue implements Destination {
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try { try {
consumers.add(sub); synchronized (consumers) {
consumers.add(sub);
}
highestSubscriptionPriority = calcHighestSubscriptionPriority(); highestSubscriptionPriority = calcHighestSubscriptionPriority();
msgContext.setDestination(destination); msgContext.setDestination(destination);
@ -167,7 +170,9 @@ public class Queue implements Destination {
dispatchValve.turnOff(); dispatchValve.turnOff();
try { try {
consumers.remove(sub); synchronized (consumers) {
consumers.remove(sub);
}
sub.remove(context, this); sub.remove(context, this);
highestSubscriptionPriority = calcHighestSubscriptionPriority(); highestSubscriptionPriority = calcHighestSubscriptionPriority();
@ -350,6 +355,7 @@ public class Queue implements Destination {
} }
private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Throwable { private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Throwable {
dispatchValve.increment(); dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try { try {
@ -358,8 +364,12 @@ public class Queue implements Destination {
messages.add(node); messages.add(node);
} }
if (consumers.isEmpty()) synchronized(consumers) {
return; if (consumers.isEmpty()) {
log.debug("No subscriptions registered, will not dispatch message at this time.");
return;
}
}
msgContext.setDestination(destination); msgContext.setDestination(destination);
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
@ -374,10 +384,12 @@ public class Queue implements Destination {
private int calcHighestSubscriptionPriority() { private int calcHighestSubscriptionPriority() {
int rc = Integer.MIN_VALUE; int rc = Integer.MIN_VALUE;
for (Iterator iter = consumers.iterator(); iter.hasNext();) { synchronized (consumers) {
Subscription sub = (Subscription) iter.next(); for (Iterator iter = consumers.iterator(); iter.hasNext();) {
if (sub.getConsumerInfo().getPriority() > rc) { Subscription sub = (Subscription) iter.next();
rc = sub.getConsumerInfo().getPriority(); if (sub.getConsumerInfo().getPriority() > rc) {
rc = sub.getConsumerInfo().getPriority();
}
} }
} }
return rc; return rc;

View File

@ -37,6 +37,7 @@ public class QueueSubscription extends PrefetchSubscription {
public void add(MessageReference node) throws Throwable { public void add(MessageReference node) throws Throwable {
super.add(node); super.add(node);
} }
/** /**
* In the queue case, mark the node as dropped and then a gc cycle will remove it from * In the queue case, mark the node as dropped and then a gc cycle will remove it from
* the queue. * the queue.

View File

@ -96,7 +96,9 @@ public class Topic implements Destination {
if (sub.getConsumerInfo().isRetroactive()) { if (sub.getConsumerInfo().isRetroactive()) {
subscriptionRecoveryPolicy.recover(context, this, sub); subscriptionRecoveryPolicy.recover(context, this, sub);
} }
consumers.add(sub); synchronized(consumers) {
consumers.add(sub);
}
} }
} }
@ -108,8 +110,11 @@ public class Topic implements Destination {
dispatchValve.turnOff(); dispatchValve.turnOff();
try { try {
if (initialActivation) if (initialActivation) {
consumers.add(sub); synchronized(consumers) {
consumers.add(sub);
}
}
if (store != null) { if (store != null) {
String clientId = sub.getClientId(); String clientId = sub.getClientId();
@ -166,7 +171,9 @@ public class Topic implements Destination {
public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable { public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable {
destinationStatistics.getConsumers().decrement(); destinationStatistics.getConsumers().decrement();
consumers.remove(sub); synchronized(consumers) {
consumers.remove(sub);
}
sub.remove(context, this); sub.remove(context, this);
} }
@ -302,9 +309,11 @@ public class Topic implements Destination {
if (!subscriptionRecoveryPolicy.add(context, message)) { if (!subscriptionRecoveryPolicy.add(context, message)) {
return; return;
} }
if (consumers.isEmpty()) { synchronized(consumers) {
onMessageWithNoConsumers(context, message); if (consumers.isEmpty()) {
return; onMessageWithNoConsumers(context, message);
return;
}
} }
msgContext.setDestination(destination); msgContext.setDestination(destination);

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -44,6 +44,6 @@ public interface DispatchPolicy {
* *
* @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched * @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched
*/ */
boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable; boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable;
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -24,6 +23,7 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
/** /**
* Simple dispatch policy that sends a message to every subscription that * Simple dispatch policy that sends a message to every subscription that
@ -35,14 +35,12 @@ import java.util.Iterator;
*/ */
public class RoundRobinDispatchPolicy implements DispatchPolicy { public class RoundRobinDispatchPolicy implements DispatchPolicy {
private final Object mutex = new Object(); public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable {
public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable {
// Big synch here so that only 1 message gets dispatched at a time. Ensures // Big synch here so that only 1 message gets dispatched at a time. Ensures
// Everyone sees the same order and that the consumer list is not used while // Everyone sees the same order and that the consumer list is not used while
// it's being rotated. // it's being rotated.
synchronized(mutex) { synchronized(consumers) {
int count = 0; int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) { for (Iterator iter = consumers.iterator(); iter.hasNext();) {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -24,6 +23,7 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
/** /**
* Simple dispatch policy that sends a message to every subscription that * Simple dispatch policy that sends a message to every subscription that
@ -35,7 +35,7 @@ import java.util.Iterator;
*/ */
public class SimpleDispatchPolicy implements DispatchPolicy { public class SimpleDispatchPolicy implements DispatchPolicy {
public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable { public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable {
int count = 0; int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) { for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next(); Subscription sub = (Subscription) iter.next();

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -24,6 +23,7 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
/** /**
* Dispatch policy that causes every subscription to see messages in the same order. * Dispatch policy that causes every subscription to see messages in the same order.
@ -33,15 +33,12 @@ import java.util.Iterator;
* @version $Revision$ * @version $Revision$
*/ */
public class StrictOrderDispatchPolicy implements DispatchPolicy { public class StrictOrderDispatchPolicy implements DispatchPolicy {
int i=0;
private final Object mutex = new Object();
public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, CopyOnWriteArrayList consumers) throws Throwable { public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Throwable {
// Big synch here so that only 1 message gets dispatched at a time. Ensures // Big synch here so that only 1 message gets dispatched at a time. Ensures
// Everyone sees the same order. // Everyone sees the same order.
synchronized(mutex) { synchronized(consumers) {
int count = 0; int count = 0;
i++;
for (Iterator iter = consumers.iterator(); iter.hasNext();) { for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next(); Subscription sub = (Subscription) iter.next();

View File

@ -196,7 +196,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
} }
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {

View File

@ -30,6 +30,18 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
topic = false; topic = false;
} }
public void testManyProducersOneConsumer() throws Exception {
consumerCount = 1;
producerCount = 10;
messageCount = 100;
messageSize = 1; // 1 byte
prefetchCount = 10;
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
}
public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception {
consumerCount = 2; consumerCount = 2;
producerCount = 1; producerCount = 1;
@ -102,18 +114,6 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
assertTotalMessagesReceived(messageCount * producerCount); assertTotalMessagesReceived(messageCount * producerCount);
} }
public void testManyProducersOneConsumer() throws Exception {
consumerCount = 1;
producerCount = 50;
messageCount = 100;
messageSize = 1; // 1 byte
prefetchCount = 10;
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
}
public void testManyProducersManyConsumers() throws Exception { public void testManyProducersManyConsumers() throws Exception {
consumerCount = 50; consumerCount = 50;
producerCount = 50; producerCount = 50;
@ -137,7 +137,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
startConsumers(consumerFactory, dest); startConsumers(consumerFactory, dest);
// Wait for consumers to setup // Wait for consumers to setup
Thread.sleep(1000); // Thread.sleep(1000);
startProducers(dest, messageCount); startProducers(dest, messageCount);