From ba16efde647f2389ebc61fcfb11454d9baa4802c Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 29 Aug 2008 08:21:09 +0000 Subject: [PATCH] fix AMQ-1917 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@690144 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 59 +++-- .../org/apache/activemq/bugs/AMQ1917Test.java | 207 ++++++++++++++++++ 2 files changed, 236 insertions(+), 30 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 7d544286f9..7cfd68abec 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -210,11 +210,13 @@ public class Queue extends BaseDestination implements Task { LinkedList recoveries = new LinkedList(); public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + // synchronize with dispatch method so that no new messages are sent + // while setting up a subscription. avoid out of order messages, + // duplicates, etc. dispatchLock.lock(); try { sub.add(context, this); destinationStatistics.getConsumers().increment(); -// MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); // needs to be synchronized - so no contention with dispatching synchronized (consumers) { @@ -229,32 +231,28 @@ public class Queue extends BaseDestination implements Task { dispatchSelector.setExclusiveConsumer(exclusiveConsumer); } } - // synchronize with dispatch method so that no new messages are sent - // while - // setting up a subscription. avoid out of order messages, - // duplicates - // etc. + + // any newly paged in messages that are not dispatched are added to pagedInPending in iterate() doPageIn(false); - + synchronized (pagedInMessages) { RecoveryDispatch rd = new RecoveryDispatch(); rd.messages = new ArrayList(pagedInMessages.values()); rd.subscription = sub; recoveries.addLast(rd); } - if( sub instanceof QueueBrowserSubscription ) { ((QueueBrowserSubscription)sub).incrementQueueRef(); } if (!this.optimizedDispatch) { - wakeup(); + wakeup(); } }finally { dispatchLock.unlock(); } if (this.optimizedDispatch) { - // Outside of dispatchLock() to maintain the lock hierarchy of - // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878 + // Outside of dispatchLock() to maintain the lock hierarchy of + // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878 wakeup(); } } @@ -262,11 +260,10 @@ public class Queue extends BaseDestination implements Task { public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { destinationStatistics.getConsumers().decrement(); + // synchronize with dispatch method so that no new messages are sent + // while removing up a subscription. dispatchLock.lock(); try { - // synchronize with dispatch method so that no new messages are sent - // while - // removing up a subscription. synchronized (consumers) { removeFromConsumerList(sub); if (sub.getConsumerInfo().isExclusive()) { @@ -324,7 +321,6 @@ public class Queue extends BaseDestination implements Task { } public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { -// System.out.println(getName()+" send "+message.getMessageId()); final ConnectionContext context = producerExchange.getConnectionContext(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. @@ -934,9 +930,17 @@ public class Queue extends BaseDestination implements Task { for (QueueMessageReference node : rd.messages) { if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) { msgContext.setMessageReference(node); - if (rd.subscription.matches(node, msgContext)) { - rd.subscription.add(node); + if (rd.subscription.matches(node, msgContext)) { + rd.subscription.add(node); + } else { + // make sure it gets queued for dispatched again + dispatchLock.lock(); + try { + pagedInPendingDispatch.add(node); + } finally { + dispatchLock.unlock(); } + } } } @@ -949,24 +953,24 @@ public class Queue extends BaseDestination implements Task { } } - boolean result = false; + boolean pageInMoreMessages = false; synchronized (messages) { - result = !messages.isEmpty(); + pageInMoreMessages = !messages.isEmpty(); } // Kinda ugly.. but I think dispatchLock is the only mutex protecting the // pagedInPendingDispatch variable. dispatchLock.lock(); try { - result |= !pagedInPendingDispatch.isEmpty(); + pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); } finally { dispatchLock.unlock(); } // Perhaps we should page always into the pagedInPendingDispatch list is - // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() - // then we do a dispatch. - if (result) { + // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() + // then we do a dispatch. + if (pageInMoreMessages) { try { pageInMessages(false); @@ -1116,8 +1120,8 @@ public class Queue extends BaseDestination implements Task { int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size(); toPageIn = Math.min(toPageIn,getMaxPageSize()); if (isLazyDispatch()&& !force) { - // Only page in the minimum number of messages which can be dispatched immediately. - toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); + // Only page in the minimum number of messages which can be dispatched immediately. + toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); } if ((force || !consumers.isEmpty()) && toPageIn > 0) { messages.setMaxBatchSize(toPageIn); @@ -1158,21 +1162,17 @@ public class Queue extends BaseDestination implements Task { dispatchLock.lock(); try { if(!pagedInPendingDispatch.isEmpty()) { - // System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size()); // Try to first dispatch anything that had not been dispatched before. pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); -// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size()); } // and now see if we can dispatch the new stuff.. and append to the pending // list anything that does not actually get dispatched. if (list != null && !list.isEmpty()) { -// System.out.println(getName()+": dispatching from paged in: "+list.size()); if (pagedInPendingDispatch.isEmpty()) { pagedInPendingDispatch.addAll(doActualDispatch(list)); } else { pagedInPendingDispatch.addAll(list); } -// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size()); } } finally { dispatchLock.unlock(); @@ -1200,7 +1200,6 @@ public class Queue extends BaseDestination implements Task { if (!s.isFull()) { // Dispatch it. s.add(node); - //System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId()); target = s; break; } else { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java new file mode 100644 index 0000000000..a1993d8a7a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java @@ -0,0 +1,207 @@ +package org.apache.activemq.bugs; + +import junit.framework.TestCase; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; + + +public class AMQ1917Test extends TestCase { + + private static final int NUM_MESSAGES = 4000; + private static final int NUM_THREADS = 10; + public static final String REQUEST_QUEUE = "mock.in.queue"; + public static final String REPLY_QUEUE = "mock.out.queue"; + + Destination requestDestination = ActiveMQDestination.createDestination( + REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE); + Destination replyDestination = ActiveMQDestination.createDestination( + REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE); + + CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES); + CountDownLatch errorLatch = new CountDownLatch(1); + ThreadPoolExecutor tpe; + final String BROKER_URL = "tcp://localhost:61616"; + BrokerService broker = null; + private boolean working = true; + + // trival session/producer pool + final Session[] sessions = new Session[NUM_THREADS]; + final MessageProducer[] producers = new MessageProducer[NUM_THREADS]; + + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector(BROKER_URL); + broker.start(); + + BlockingQueue queue = new ArrayBlockingQueue(10000); + tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000, + TimeUnit.MILLISECONDS, queue); + ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory()); + tpe.setThreadFactory(limitedthreadFactory); + } + + public void tearDown() throws Exception { + broker.stop(); + tpe.shutdown(); + } + + public void testLoadedSendRecieveWithCorrelationId() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(BROKER_URL); + Connection connection = connectionFactory.createConnection(); + setupReceiver(connection); + + connection = connectionFactory.createConnection(); + connection.start(); + + // trival session/producer pool + for (int i=0; i NUM_THREADS) { + errorLatch.countDown(); + fail("too many threads requested"); + } + return factory.newThread(arg0); + } + } + } +