From 1d1f0625db80538cc73d11f06c8989602d3e820a Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Thu, 7 Nov 2019 14:53:50 +0800 Subject: [PATCH] ARTEMIS-2544 Remove rolledback PageTransactionInfo to free up memory --- .../paging/impl/PageTransactionInfoImpl.java | 5 +- .../tests/integration/paging/PagingTest.java | 61 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java index 1b92b90e84..4684b8bad4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java @@ -113,7 +113,8 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { } catch (Exception e) { ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID); } - + } + if (pagingManager != null) { pagingManager.removeTransaction(this.transactionID); } return false; @@ -242,6 +243,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { if (lateDeliveries != null) { for (LateDelivery pos : lateDeliveries) { pos.getSubscription().lateDeliveryRollback(pos.getPagePosition()); + onUpdate(1, null, pos.getSubscription().getPagingStore().getPagingManager()); } lateDeliveries = null; } @@ -283,6 +285,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos); } cursor.positionIgnored(cursorPos); + onUpdate(1, null, cursor.getPagingStore().getPagingManager()); return true; } else { if (logger.isTraceEnabled()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index c020fbe760..cda5311f74 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; @@ -6838,6 +6839,66 @@ public class PagingTest extends ActiveMQTestBase { } } + @Test + public void testRollbackPageTransactionBeforeDelivery() throws Exception { + testRollbackPageTransaction(true); + } + + @Test + public void testRollbackPageTransactionAfterDelivery() throws Exception { + testRollbackPageTransaction(false); + } + + private void testRollbackPageTransaction(boolean rollbackBeforeDelivery) throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 2; + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + ClientSession session = sf.createSession(null, null, false, false, true, false, 0); + + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); + + Queue queue = server.locateQueue(PagingTest.ADDRESS); + + queue.getPageSubscription().getPagingStore().startPaging(); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + if (rollbackBeforeDelivery) { + sendMessages(session, producer, numberOfMessages); + session.rollback(); + assertEquals(server.getPagingManager().getTransactions().size(), 1); + PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next(); + // Make sure rollback happens before delivering messages + Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100); + ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); + session.start(); + Assert.assertNull(consumer.receiveImmediate()); + assertTrue(server.getPagingManager().getTransactions().isEmpty()); + } else { + ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); + session.start(); + sendMessages(session, producer, numberOfMessages); + Assert.assertNull(consumer.receiveImmediate()); + assertEquals(server.getPagingManager().getTransactions().size(), 1); + PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next(); + session.rollback(); + Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100); + assertTrue(server.getPagingManager().getTransactions().isEmpty()); + } + + session.close(); + } + @Override protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { Configuration configuration = super.createDefaultConfig(serverID, netty);