diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index f025998ba3..318f558db4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -410,14 +410,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index browser.incrementQueueRef(); } - void done() { - try { - browser.decrementQueueRef(); - } catch (Exception e) { - LOG.warn("decrement ref on browser: " + browser, e); - } - } - public QueueBrowserSubscription getBrowser() { return browser; } @@ -1602,12 +1594,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.readLock().unlock(); } - // Perhaps we should page always into the pagedInPendingDispatch - // list if - // !messages.isEmpty(), and then if - // !pagedInPendingDispatch.isEmpty() - // then we do a dispatch. - boolean hasBrowsers = browserDispatches.size() > 0; + boolean hasBrowsers = !browserDispatches.isEmpty(); if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { try { @@ -1618,12 +1605,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } if (hasBrowsers) { - PendingList alreadyDispatchedMessages = isPrioritizedMessages() ? + PendingList messagesInMemory = isPrioritizedMessages() ? new PrioritizedPendingList() : new OrderedPendingList(); pagedInMessagesLock.readLock().lock(); - try{ - alreadyDispatchedMessages.addAll(pagedInMessages); - }finally { + try { + messagesInMemory.addAll(pagedInMessages); + } finally { pagedInMessagesLock.readLock().unlock(); } @@ -1636,9 +1623,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index QueueBrowserSubscription browser = browserDispatch.getBrowser(); - LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); + LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size()); boolean added = false; - for (MessageReference node : alreadyDispatchedMessages) { + for (MessageReference node : messagesInMemory) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { @@ -1902,7 +1889,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index List result = null; PendingList resultList = null; - int toPageIn = Math.min(maxPageSize, messages.size()); + int toPageIn = maxPageSize; + messagesLock.readLock().lock(); + try { + toPageIn = Math.min(toPageIn, messages.size()); + } finally { + messagesLock.readLock().unlock(); + } int pagedInPendingSize = 0; pagedInPendingDispatchLock.readLock().lock(); try { @@ -1913,7 +1906,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (isLazyDispatch() && !force) { // Only page in the minimum number of messages which can be // dispatched immediately. - toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); + toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull()); } if (LOG.isDebugEnabled()) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java index 769cdbf781..200d215ce0 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -265,7 +265,9 @@ public class BrokerTest extends BrokerTestSupport { messages.add(m1); } - for (int i = 0; i < 4; i++) { + // a browse is a snapshot - only guarantee to see messages produced before + // the browser + for (int i = 0; i < 1; i++) { Message m1 = messages.get(i); Message m2 = receiveMessage(connection2); assertNotNull("m2 is null for index: " + i, m2); @@ -275,6 +277,11 @@ public class BrokerTest extends BrokerTestSupport { assertNoMessagesLeft(connection1); assertNoMessagesLeft(connection2); + + connection1.request(closeConnectionInfo(connectionInfo1)); + connection1.stop(); + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); }