diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 69314c75b7..752540e886 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -296,16 +296,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { } /* - * Holder for subscription and pagedInMessages as a browser needs access to - * existing messages in the queue that have already been dispatched + * Holder for subscription that needs attention on next iterate + * browser needs access to existing messages in the queue that have already been dispatched */ class BrowserDispatch { - ArrayList messages; QueueBrowserSubscription browser; - public BrowserDispatch(QueueBrowserSubscription browserSubscription, Collection values) { - - messages = new ArrayList(values); + public BrowserDispatch(QueueBrowserSubscription browserSubscription) { browser = browserSubscription; browser.incrementQueueRef(); } @@ -362,18 +359,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { } if (sub instanceof QueueBrowserSubscription) { + // tee up for dispatch in next iterate QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; - - // do again in iterate to ensure new messages are dispatched - pageInMessages(false); - synchronized (pagedInMessages) { - if (!pagedInMessages.isEmpty()) { - BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values()); - browserDispatches.addLast(browserDispatch); - } + BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); + browserDispatches.addLast(browserDispatch); } } + if (!(this.optimizedDispatch || isSlave())) { wakeup(); } @@ -1157,7 +1150,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { - boolean pageInMoreMessages = false; + boolean pageInMoreMessages = false; synchronized (iteratingMutex) { // do early to allow dispatch of these waiting messages @@ -1175,31 +1168,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - BrowserDispatch rd; - while ((rd = getNextBrowserDispatch()) != null) { - pageInMoreMessages = true; - - try { - MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); - msgContext.setDestination(destination); - - QueueBrowserSubscription browser = rd.getBrowser(); - for (QueueMessageReference node : rd.messages) { - if (!node.isAcked()) { - msgContext.setMessageReference(node); - if (browser.matches(node, msgContext)) { - browser.add(node); - } - } - } - - rd.done(); - - } catch (Exception e) { - LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e); - } - } - if (firstConsumer) { firstConsumer = false; try { @@ -1228,6 +1196,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { LOG.error(e); } } + + BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch(); synchronized (messages) { pageInMoreMessages |= !messages.isEmpty(); @@ -1242,14 +1212,46 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Perhaps we should page always into the pagedInPendingDispatch list if // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() // then we do a dispatch. - if (pageInMoreMessages) { + if (pageInMoreMessages || pendingBrowserDispatch != null) { try { - pageInMessages(false); + pageInMessages(pendingBrowserDispatch != null); } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } } + + if (pendingBrowserDispatch != null) { + ArrayList alreadyDispatchedMessages = null; + synchronized (pagedInMessages) { + alreadyDispatchedMessages = new ArrayList(pagedInMessages.values()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser() + + ", already dispatched/paged count: " + alreadyDispatchedMessages.size()); + } + do { + try { + MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); + msgContext.setDestination(destination); + + QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser(); + for (QueueMessageReference node : alreadyDispatchedMessages) { + if (!node.isAcked()) { + msgContext.setMessageReference(node); + if (browser.matches(node, msgContext)) { + browser.add(node); + } + } + } + pendingBrowserDispatch.done(); + } catch (Exception e) { + LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e); + } + + } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null); + } + if (pendingWakeups.get() > 0) { pendingWakeups.decrementAndGet(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java index a057034bcc..bc839e476f 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java @@ -16,7 +16,9 @@ */ package org.apache.activemq; +import java.util.ArrayList; import java.util.Enumeration; +import java.util.List; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -26,7 +28,14 @@ import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; /** * @version $Revision: 1.4 $ @@ -127,5 +136,50 @@ public class JmsQueueBrowserTest extends JmsTestSupport { browser.close(); producer.close(); - } + } + + public void testQueueBrowserWith2Consumers() throws Exception { + final int numMessages = 1000; + connection.setAlwaysSyncSend(false); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + ActiveMQQueue destinationPrefetch10 = new ActiveMQQueue("TEST?jms.prefetchSize=10"); + ActiveMQQueue destinationPrefetch1 = new ActiveMQQueue("TEST?jms.prefetchsize=1"); + connection.start(); + + ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection(userName, password); + connection2.start(); + connections.add(connection2); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destinationPrefetch10); + + for (int i=0; i browserView = browser.getEnumeration(); + + List messages = new ArrayList(); + for (int i = 0; i < numMessages; i++) { + Message m1 = consumer.receive(5000); + assertNotNull("m1 is null for index: " + i, m1); + messages.add(m1); + } + + int i = 0; + for (; i < numMessages && browserView.hasMoreElements(); i++) { + Message m1 = messages.get(i); + Message m2 = browserView.nextElement(); + assertNotNull("m2 is null for index: " + i, m2); + assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID()); + } + assertEquals("got all: ", numMessages, i); + + assertFalse("nothing left in the browser", browserView.hasMoreElements()); + assertNull("consumer finished", consumer.receiveNoWait()); + } }