From 52d236d850aeca10a49aa54833141df2f51722a2 Mon Sep 17 00:00:00 2001 From: yang wei Date: Thu, 29 Nov 2018 22:52:16 +0800 Subject: [PATCH] ARTEMIS-2188 fix address size leak caused by large page message --- .../impl/journal/LargeServerMessageImpl.java | 23 +++++- .../integration/client/LargeMessageTest.java | 81 ++++++++++++++++++- 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 2824ff7f21..431c19de43 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; @@ -187,7 +188,15 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe public synchronized void incrementDelayDeletionCount() { delayDeletionCount.incrementAndGet(); try { - incrementRefCount(); + if (paged) { + RefCountMessageListener tmpContext = super.getContext(); + setContext(null); + incrementRefCount(); + setContext(tmpContext); + } else { + incrementRefCount(); + } + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorIncrementDelayDeletionCount(e); } @@ -226,7 +235,15 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe @Override public synchronized int decrementRefCount() throws Exception { - int currentRefCount = super.decrementRefCount(); + int currentRefCount; + if (paged) { + RefCountMessageListener tmpContext = super.getContext(); + setContext(null); + currentRefCount = super.decrementRefCount(); + setContext(tmpContext); + } else { + currentRefCount = super.decrementRefCount(); + } // We use <= as this could be used by load. // because of a failure, no references were loaded, so we have 0... and we still need to delete the associated @@ -234,7 +251,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe if (delayDeletionCount.get() <= 0) { checkDelete(); } - return currentRefCount; } @@ -534,5 +550,4 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe } } - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index 1d9075d213..eac724cddf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -2475,8 +2476,86 @@ public class LargeMessageTest extends LargeMessageTestBase { session.close(); } + @Test + public void testGlobalSizeBytesAndAddressSizeOnPage() throws Exception { + testGlobalSizeBytesAndAddressSize(true); + } + + @Test + public void testGlobalSizeBytesAndAddressSize() throws Exception { + testGlobalSizeBytesAndAddressSize(false); + } + + public void testGlobalSizeBytesAndAddressSize(boolean isPage) throws Exception { + ActiveMQServer server = createServer(true, isNetty(), storeType); + + server.start(); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(false, false); + + LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager()); + + fileMessage.setMessageID(1005); + + for (int i = 0; i < largeMessageSize; i++) { + fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)}); + } + + fileMessage.releaseResources(); + + session.createQueue(ADDRESS, ADDRESS, true); + + PagingStore store = server.getPagingManager().getPageStore(ADDRESS); + + if (isPage) { + store.startPaging(); + } + + ClientProducer prod = session.createProducer(ADDRESS); + + prod.send(fileMessage); + + fileMessage.deleteFile(); + + session.commit(); + + if (isPage) { + server.getPagingManager().getPageStore(ADDRESS).getCursorProvider().clearCache(); + } + + if (isPage) { + Assert.assertEquals(0, server.getPagingManager().getPageStore(ADDRESS).getAddressSize()); + Assert.assertEquals(0, server.getPagingManager().getGlobalSize()); + } else { + Assert.assertNotEquals(0, server.getPagingManager().getPageStore(ADDRESS).getAddressSize()); + Assert.assertNotEquals(0, server.getPagingManager().getGlobalSize()); + } + + session.start(); + + ClientConsumer cons = session.createConsumer(ADDRESS); + + ClientMessage msg = cons.receive(5000); + + Assert.assertNotNull(msg); + + msg.acknowledge(); + + session.commit(); + + Assert.assertEquals(0, server.getPagingManager().getPageStore(ADDRESS).getAddressSize()); + + Assert.assertEquals(0, server.getPagingManager().getGlobalSize()); + + session.close(); + + cons.close(); + } + // Private ------------------------------------------------------- // Inner classes ------------------------------------------------- -} +} \ No newline at end of file