diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index b814d9b2c2..93869d7c81 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -364,6 +364,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { ArrayList cursorList = cloneSubscriptions(); long minPage = checkMinPage(cursorList); + deliverIfNecessary(cursorList, minPage); logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage); @@ -599,6 +600,24 @@ public class PageCursorProviderImpl implements PageCursorProvider { } + private void deliverIfNecessary(Collection cursorList, long minPage) { + boolean currentWriting = minPage == pagingStore.getCurrentWritingPage() ? true : false; + for (PageSubscription cursor : cursorList) { + long firstPage = cursor.getFirstPage(); + if (firstPage == minPage) { + /** + * if first page is current writing page and it's not complete, or + * first page is before the current writing page, we need to trigger + * deliverAsync to delete messages in the pages. + */ + if (cursor.getQueue().getMessageCount() == 0 && (!currentWriting || !cursor.isComplete(firstPage))) { + cursor.getQueue().deliverAsync(); + break; + } + } + } + } + // Inner classes ------------------------------------------------- } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 65b5892722..180993fe85 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -6289,6 +6289,170 @@ public class PagingTest extends ActiveMQTestBase { server.stop(); } + @Test + public void testStopPagingWithoutConsumersIfTwoPages() throws Exception { + testStopPagingWithoutConsumersOnOneQueue(true); + } + + @Test + public void testStopPagingWithoutConsumersIfOnePage() throws Exception { + testStopPagingWithoutConsumersOnOneQueue(false); + } + + private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception { + boolean persistentMessages = true; + + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + try { + ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(false, false, false); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1 or both=true"), true); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2 or both=true"), true); + PagingStore store = server.getPagingManager().getPageStore(ADDRESS); + Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1")); + queue.getPageSubscription().getPagingStore().startPaging(); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + ClientMessage message = session.createMessage(persistentMessages); + message.putBooleanProperty("both", true); + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + bodyLocal.writeBytes(new byte[1024]); + producer.send(message); + session.commit(); + session.start(); + ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2")); + message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + assertNull(consumer.receiveImmediate()); + consumer.close(); + session.commit(); + + if (forceAnotherPage) { + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + + message = session.createMessage(persistentMessages); + message.putIntProperty("destQ", 1); + bodyLocal = message.getBodyBuffer(); + bodyLocal.writeBytes(new byte[1024]); + producer.send(message); + session.commit(); + + consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1")); + for (int i = 0; i < 2; i++) { + message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + session.commit(); + } + assertNull(consumer.receiveImmediate()); + consumer.close(); + session.close(); + + store.getCursorProvider().cleanup(); + waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); + sf.close(); + locator.close(); + } finally { + try { + server.stop(); + } catch (Throwable ignored) { + } + } + } + + @Test + public void testStopPagingWithoutMsgsOnOneQueue() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 500; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1"), true); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2"), true); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + ClientConsumer consumer1 = session.createConsumer(PagingTest.ADDRESS.concat("=1")); + session.start(); + ClientSession session2 = sf.createSession(false, false, false); + ClientConsumer consumer2 = session2.createConsumer(PagingTest.ADDRESS.concat("=2")); + session2.start(); + + ClientMessage message = null; + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + /** + * Here we first send messages and consume them to move every subscription to the next bookmarked page. + * Then we send messages and consume them again, expecting paging is stopped normally. + */ + for (int x = 0; x < 2; x++) { + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + message.putIntProperty("destQ", 1); + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + bodyLocal.writeBytes(body); + producer.send(message); + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); + assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); + for (int i = 0; i < numberOfMessages; i++) { + ClientMessage msg = consumer1.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + if (i % 500 == 0) { + session.commit(); + } + } + session.commit(); + assertNull(consumer1.receiveImmediate()); + waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); + } + + producer.close(); + consumer1.close(); + consumer2.close(); + session.close(); + session2.close(); + sf.close(); + locator.close(); + locator = null; + sf = null; + server.stop(); + } + + @Override protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { Configuration configuration = super.createDefaultConfig(serverID, netty);