diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java index 32f30e8c93..3df0b7863c 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -425,28 +425,7 @@ public class PageCursorProviderImpl implements PageCursorProvider // on that case we need to move to verify it in a different way if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) { - boolean complete = true; - - for (PageSubscription cursor : cursorList) - { - if (!cursor.isComplete(minPage)) - { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) - { - ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage); - } - - complete = false; - break; - } - else - { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) - { - ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage); - } - } - } + boolean complete = checkPageCompletion(cursorList, minPage); if (!pagingStore.isStarted()) { @@ -475,6 +454,10 @@ public class PageCursorProviderImpl implements PageCursorProvider for (long i = pagingStore.getFirstPage(); i < minPage; i++) { + if (!checkPageCompletion(cursorList, i)) + { + break; + } Page page = pagingStore.depage(); if (page == null) { @@ -577,6 +560,33 @@ public class PageCursorProviderImpl implements PageCursorProvider } + + private boolean checkPageCompletion(ArrayList cursorList, long minPage) + { + boolean complete = true; + + for (PageSubscription cursor : cursorList) + { + if (!cursor.isComplete(minPage)) + { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage); + } + + complete = false; + break; + } + else + { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage); + } + } + } + return complete; + } /** * @return */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java index ff333abf16..0e4751d8e0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java @@ -112,6 +112,120 @@ public class PagingTest extends ServiceTestBase locator = createInVMNonHALocator(); } + @Test + public void testPageOnLargeMessageMultipleQueues() throws Exception + { + Configuration config = createDefaultConfig(); + + final int PAGE_MAX = 20 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + HashMap map = new HashMap(); + + AddressSettings value = new AddressSettings(); + map.put(ADDRESS.toString(), value); + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + server.start(); + + final int numberOfBytes = 1024; + + locator.setBlockOnNonDurableSend(true); + locator.setBlockOnDurableSend(true); + locator.setBlockOnAcknowledge(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, true, true, false, 0); + + session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true); + session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true); + + ClientProducer producer = session.createProducer(ADDRESS); + + ClientMessage message = null; + + for (int i = 0; i < 201; i++) + { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[numberOfBytes]); + + for (int j = 1; j <= numberOfBytes; j++) + { + message.getBodyBuffer().writeInt(j); + } + + producer.send(message); + } + + + session.close(); + + server.stop(); + + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + server.start(); + + sf = createSessionFactory(locator); + + for (int ad = 0; ad < 2; ad++) + { + session = sf.createSession(false, false, false); + + ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + ad)); + + session.start(); + + for (int i = 0; i < 201; i++) + { + ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME); + + Assert.assertNotNull(message2); + + message2.acknowledge(); + + Assert.assertNotNull(message2); + } + + try + { + if (ad > -1) + { + session.commit(); + } + else + { + session.rollback(); + for (int i = 0; i < 100; i++) + { + ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME); + + Assert.assertNotNull(message2); + + message2.acknowledge(); + + Assert.assertNotNull(message2); + } + session.commit(); + + } + } + catch (Throwable e) + { + System.err.println("here!!!!!!!"); + e.printStackTrace(); + System.exit(-1); + } + + consumer.close(); + + session.close(); + } + } + @Test public void testPageCleanup() throws Exception {