diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java index 6703c91643..9523205296 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java @@ -664,6 +664,8 @@ public class QueueImpl implements Queue // no-op scheduledRunners.decrementAndGet(); } + + checkDepage(); } } @@ -2188,12 +2190,32 @@ public class QueueImpl implements Queue } } - if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending) + checkDepage(); + } + + private void checkDepage() + { + if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext()) { scheduleDepage(false); } } + + /** + * This is a common check we do before scheduling depaging.. or while depaging. + * Before scheduling a depage runnable we verify if it fits / needs depaging. + * We also check for while needsDepage While depaging. + * This is just to avoid a copy & paste dependency + * @return + */ + private boolean needsDepage() + { + return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize(); + } + + + private SimpleString extractGroupID(MessageReference ref) { if (internalQueue) @@ -2268,7 +2290,7 @@ public class QueueImpl implements Queue this.directDeliver = false; int depaged = 0; - while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext()) + while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext()) { depaged++; PagedReference reference = pageIterator.next(); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java index da8d09400e..aa12993182 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java @@ -213,7 +213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener if (browseOnly) { - browserDeliverer = new BrowserDeliverer(messageQueue.iterator()); + browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator()); } else { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java index 56369c830a..ff333abf16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java @@ -5184,6 +5184,8 @@ public class PagingTest extends ServiceTestBase * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly: * * -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:/tests/config/logging.properties + * + * Note: Idea should get these from the pom and you shouldn't need to do this. */ public void testFailMessagesNonDurable() throws Exception { @@ -5859,6 +5861,120 @@ public class PagingTest extends ServiceTestBase } + @Test + public void testMultiFiltersBrowsing() throws Throwable + { + internalTestMultiFilters(true); + } + + @Test + public void testMultiFiltersRegularConsumer() throws Throwable + { + internalTestMultiFilters(false); + } + + public void internalTestMultiFilters(boolean browsing) throws Throwable + { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultConfig() + .setJournalSyncNonTransactional(false); + + server = createServer(true, + config, + PagingTest.PAGE_SIZE, + PagingTest.PAGE_MAX, + new HashMap()); + + server.start(); + + try + { + ServerLocator locator = createInVMNonHALocator(); + locator.setBlockOnDurableSend(true); + ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(true, true, 0); + + session.createQueue(ADDRESS.toString(), "Q1", null, true); + + PagingStore store = server.getPagingManager().getPageStore(ADDRESS); + + ClientProducer prod = session.createProducer(ADDRESS); + + ClientMessage msg = null; + store.startPaging(); + + for (int i = 0; i < 100; i++) + { + msg = session.createMessage(true); + msg.putStringProperty("color", "red"); + msg.putIntProperty("count", i); + prod.send(msg); + + if (i > 0 && i % 10 == 0) + { + store.startPaging(); + store.forceAnotherPage(); + } + } + + for (int i = 0; i < 100; i++) + { + msg = session.createMessage(true); + msg.putStringProperty("color", "green"); + msg.putIntProperty("count", i); + prod.send(msg); + + if (i > 0 && i % 10 == 0) + { + store.startPaging(); + store.forceAnotherPage(); + } + } + + session.commit(); + + session.close(); + + session = sf.createSession(false, false, 0); + session.start(); + + + ClientConsumer cons1; + + if (browsing) + { + cons1 = session.createConsumer("Q1", "color='green'", true); + } + else + { + cons1 = session.createConsumer("Q1", "color='red'", false); + } + + for (int i = 0; i < 100; i++) + { + msg = cons1.receive(5000); + + System.out.println("Received " + msg); + assertNotNull(msg); + if (!browsing) + { + msg.acknowledge(); + } + } + + session.commit(); + + session.close(); + } + finally + { + server.stop(); + } + + } + + @Test public void testPendingACKOutOfOrder() throws Throwable {