diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 61e0a71c4a..557cd46d75 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -957,7 +957,7 @@ public class Queue extends BaseDestination implements Task { } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); - } + } } synchronized(messagesWaitingForSpace) { while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) { @@ -1122,50 +1122,54 @@ public class Queue extends BaseDestination implements Task { private void doDispatch(List list) throws Exception { if (list != null) { List consumers; - synchronized (this.consumers) { - consumers = new ArrayList(this.consumers); - } + dispatchLock.lock(); + try { + synchronized (this.consumers) { + consumers = new ArrayList(this.consumers); + } - for (MessageReference node : list) { - Subscription target = null; - List targets = null; - for (Subscription s : consumers) { - if (dispatchSelector.canSelect(s, node)) { - if (!s.isFull()) { - s.add(node); - target = s; - break; - } else { - if (targets == null) { - targets = new ArrayList(); + + for (MessageReference node : list) { + Subscription target = null; + List targets = null; + for (Subscription s : consumers) { + if (dispatchSelector.canSelect(s, node)) { + if (!s.isFull()) { + s.add(node); + target = s; + break; + } else { + if (targets == null) { + targets = new ArrayList(); + } + targets.add(s); + } + } + } + if (target == null && targets != null) { + // pick the least loaded to add the message too + for (Subscription s : targets) { + if (target == null + || target.getInFlightUsage() > s.getInFlightUsage()) { + target = s; + } + } + if (target != null) { + target.add(node); + } + } + if (target != null && !strictOrderDispatch && consumers.size() > 1 && + !dispatchSelector.isExclusiveConsumer(target)) { + synchronized (this.consumers) { + if( removeFromConsumerList(target) ) { + addToConsumerList(target); + consumers = new ArrayList(this.consumers); } - targets.add(s); } } } - if (target == null && targets != null) { - // pick the least loaded to add the message too - for (Subscription s : targets) { - if (target == null - || target.getInFlightUsage() > s - .getInFlightUsage()) { - target = s; - } - } - if (target != null) { - target.add(node); - } - } - if (target != null && !strictOrderDispatch && consumers.size() > 1 && - !dispatchSelector.isExclusiveConsumer(target)) { - synchronized (this.consumers) { - if( removeFromConsumerList(target) ) { - addToConsumerList(target); - consumers = new ArrayList(this.consumers); - } - } - } - + } finally { + dispatchLock.unlock(); } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java new file mode 100644 index 0000000000..51682fc612 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -0,0 +1,311 @@ +package org.apache.activemq.broker.region; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +import javax.jms.InvalidSelectorException; +import javax.management.ObjectName; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.Response; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.thread.TaskRunnerFactory; + +public class SubscriptionAddRemoveQueueTest extends TestCase { + + Queue queue; + Message msg = new ActiveMQMessage(); + ConsumerInfo info = new ConsumerInfo(); + List subs = new ArrayList(); + ConnectionContext context = new ConnectionContext(); + int numSubscriptions = 1000; + boolean working = true; + int senders = 20; + + + @Override + public void setUp() throws Exception { + BrokerService brokerService = new BrokerService(); + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + DestinationStatistics parentStats = new DestinationStatistics(); + parentStats.setEnabled(true); + + TaskRunnerFactory taskFactory = null; + MessageStore store = null; + + msg.setDestination(destination); + info.setDestination(destination); + info.setPrefetchSize(100); + + queue = new Queue(brokerService, destination, store, parentStats, taskFactory); + queue.initialize(); + } + + public void testNoDispatchToRemovedConsumers() throws Exception { + Runnable sender = new Runnable() { + public void run() { + while (working) { + try { + queue.sendMessage(context, msg); + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception in sendMessage, ex:" + e); + } + } + } + }; + + Runnable subRemover = new Runnable() { + public void run() { + for (Subscription sub : subs) { + try { + queue.removeSubscription(context, sub); + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception in removeSubscription, ex:" + e); + } + } + } + }; + + for (int i=0;i result = executor.submit(subRemover); + result.get(); + working = false; + assertEquals("there are no subscriptions", 0, queue.getDestinationStatistics().getConsumers().getCount()); + + for (SimpleImmediateDispatchSubscription sub : subs) { + assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched)); + } + + } + + private boolean hasSomeLocks(List dispatched) { + boolean hasLock = false; + for (MessageReference mr: dispatched) { + QueueMessageReference qmr = (QueueMessageReference) mr; + if (qmr.getLockOwner() != null) { + hasLock = true; + break; + } + } + return hasLock; + } + + public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner { + + List dispatched = + Collections.synchronizedList(new ArrayList()); + + public void acknowledge(ConnectionContext context, MessageAck ack) + throws Exception { + // TODO Auto-generated method stub + + } + + public void add(MessageReference node) throws Exception { + // immediate dispatch + QueueMessageReference qmr = (QueueMessageReference)node; + qmr.lock(this); + dispatched.add(qmr); + } + + public void add(ConnectionContext context, Destination destination) + throws Exception { + // TODO Auto-generated method stub + + } + + public void destroy() { + // TODO Auto-generated method stub + + } + + public void gc() { + // TODO Auto-generated method stub + + } + + public ConsumerInfo getConsumerInfo() { + return info; + } + + public long getDequeueCounter() { + // TODO Auto-generated method stub + return 0; + } + + public long getDispatchedCounter() { + // TODO Auto-generated method stub + return 0; + } + + public int getDispatchedQueueSize() { + // TODO Auto-generated method stub + return 0; + } + + public long getEnqueueCounter() { + // TODO Auto-generated method stub + return 0; + } + + public int getInFlightSize() { + // TODO Auto-generated method stub + return 0; + } + + public int getInFlightUsage() { + // TODO Auto-generated method stub + return 0; + } + + public ObjectName getObjectName() { + // TODO Auto-generated method stub + return null; + } + + public int getPendingQueueSize() { + // TODO Auto-generated method stub + return 0; + } + + public int getPrefetchSize() { + // TODO Auto-generated method stub + return 0; + } + + public String getSelector() { + // TODO Auto-generated method stub + return null; + } + + public boolean isBrowser() { + // TODO Auto-generated method stub + return false; + } + + public boolean isFull() { + // TODO Auto-generated method stub + return false; + } + + public boolean isHighWaterMark() { + // TODO Auto-generated method stub + return false; + } + + public boolean isLowWaterMark() { + // TODO Auto-generated method stub + return false; + } + + public boolean isRecoveryRequired() { + // TODO Auto-generated method stub + return false; + } + + public boolean isSlave() { + // TODO Auto-generated method stub + return false; + } + + public boolean matches(MessageReference node, + MessageEvaluationContext context) throws IOException { + return true; + } + + public boolean matches(ActiveMQDestination destination) { + // TODO Auto-generated method stub + return false; + } + + public void processMessageDispatchNotification( + MessageDispatchNotification mdn) throws Exception { + // TODO Auto-generated method stub + + } + + public Response pullMessage(ConnectionContext context, MessagePull pull) + throws Exception { + // TODO Auto-generated method stub + return null; + } + + public List remove(ConnectionContext context, + Destination destination) throws Exception { + return new ArrayList(dispatched); + } + + public void setObjectName(ObjectName objectName) { + // TODO Auto-generated method stub + + } + + public void setSelector(String selector) + throws InvalidSelectorException, UnsupportedOperationException { + // TODO Auto-generated method stub + + } + + public void updateConsumerPrefetch(int newPrefetch) { + // TODO Auto-generated method stub + + } + + public boolean addRecoveredMessage(ConnectionContext context, + MessageReference message) throws Exception { + // TODO Auto-generated method stub + return false; + } + + public ActiveMQDestination getActiveMQDestination() { + // TODO Auto-generated method stub + return null; + } + + public int getLockPriority() { + // TODO Auto-generated method stub + return 0; + } + + public boolean isLockExclusive() { + // TODO Auto-generated method stub + return false; + } + + } +}