diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 01fce263f5..5646751670 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -109,7 +109,11 @@ public final class PagingManagerImpl implements PagingManager { @Override public PagingManagerImpl addSize(int size) { - globalSizeBytes.addAndGet(size); + long newSize = globalSizeBytes.addAndGet(size); + + if (newSize < 0) { + ActiveMQServerLogger.LOGGER.negativeGlobalAddressSize(newSize); + } if (size < 0) { checkMemoryRelease(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 88cfe46576..ce21081ba7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -705,6 +705,10 @@ public class PagingStoreImpl implements PagingStore { boolean globalFull = pagingManager.addSize(size).isGlobalFull(); long newSize = sizeInBytes.addAndGet(size); + if (newSize < 0) { + ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString()); + } + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { if (usingGlobalMaxSize && !globalFull || maxSize != -1) { checkReleaseMemory(globalFull, newSize); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 9b9570c25d..7e219c688e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1271,6 +1271,18 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void serverIsolatedOnNetwork(); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222214, + value = "Destination {1} has an inconsistent and negative address size={0}.", + format = Message.Format.MESSAGE_FORMAT) + void negativeAddressSize(long size, String destination); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222215, + value = "Global Address Size has negative and inconsistent value as {0}", + format = Message.Format.MESSAGE_FORMAT) + void negativeGlobalAddressSize(long size); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java index 984b682b4a..39e77ca533 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java @@ -126,6 +126,11 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { public int decrementRefCount() throws Exception { int count = refCount.decrementAndGet(); + if (count < 0) { + // this could happen on paged messages since they are not routed and incrementRefCount is never called + return count; + } + if (pagingStore != null) { if (count == 0) { pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java index 0707297664..886ab5fc05 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCountSyncOnNonTXTest.java @@ -28,8 +28,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -42,6 +45,23 @@ public class PageCountSyncOnNonTXTest extends ActiveMQTestBase { Process process; + @Before + public void checkLoggerStart() throws Exception { + AssertionLoggerHandler.startCapture(); + } + + @After + public void checkLoggerEnd() throws Exception { + try { + // These are the message errors for the negative size address size + Assert.assertFalse(AssertionLoggerHandler.findText("222214")); + Assert.assertFalse(AssertionLoggerHandler.findText("222215")); + } finally { + AssertionLoggerHandler.stopCapture(); + } + } + + @Override @Before public void setUp() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java index 2d27fd0461..c938652cb2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java @@ -32,7 +32,10 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -52,6 +55,23 @@ public class PagingCounterTest extends ActiveMQTestBase { // Public -------------------------------------------------------- + @Before + public void checkLoggerStart() throws Exception { + AssertionLoggerHandler.startCapture(); + } + + @After + public void checkLoggerEnd() throws Exception { + try { + // These are the message errors for the negative size address size + Assert.assertFalse(AssertionLoggerHandler.findText("222214")); + Assert.assertFalse(AssertionLoggerHandler.findText("222215")); + } finally { + AssertionLoggerHandler.stopCapture(); + } + } + + @Test public void testCounter() throws Exception { ClientSessionFactory sf = createSessionFactory(sl); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 00c0bdf2bd..63097cfa0c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -80,6 +80,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.jboss.logging.Logger; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -103,6 +104,25 @@ public class PagingTest extends ActiveMQTestBase { static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); + + @Before + public void checkLoggerStart() throws Exception { + AssertionLoggerHandler.startCapture(); + } + + @After + public void checkLoggerEnd() throws Exception { + try { + // These are the message errors for the negative size address size + Assert.assertFalse(AssertionLoggerHandler.findText("222214")); + Assert.assertFalse(AssertionLoggerHandler.findText("222215")); + } finally { + AssertionLoggerHandler.stopCapture(); + } + } + + + @Override @Before public void setUp() throws Exception {