This closes #903

This commit is contained in:
Clebert Suconic 2016-12-01 21:38:19 -05:00
commit 18419863af
7 changed files with 86 additions and 1 deletions

View File

@ -109,7 +109,11 @@ public final class PagingManagerImpl implements PagingManager {
@Override
public PagingManagerImpl addSize(int size) {
globalSizeBytes.addAndGet(size);
long newSize = globalSizeBytes.addAndGet(size);
if (newSize < 0) {
ActiveMQServerLogger.LOGGER.negativeGlobalAddressSize(newSize);
}
if (size < 0) {
checkMemoryRelease();

View File

@ -705,6 +705,10 @@ public class PagingStoreImpl implements PagingStore {
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
long newSize = sizeInBytes.addAndGet(size);
if (newSize < 0) {
ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString());
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
checkReleaseMemory(globalFull, newSize);

View File

@ -1271,6 +1271,18 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void serverIsolatedOnNetwork();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222214,
value = "Destination {1} has an inconsistent and negative address size={0}.",
format = Message.Format.MESSAGE_FORMAT)
void negativeAddressSize(long size, String destination);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222215,
value = "Global Address Size has negative and inconsistent value as {0}",
format = Message.Format.MESSAGE_FORMAT)
void negativeGlobalAddressSize(long size);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)

View File

@ -126,6 +126,11 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
public int decrementRefCount() throws Exception {
int count = refCount.decrementAndGet();
if (count < 0) {
// this could happen on paged messages since they are not routed and incrementRefCount is never called
return count;
}
if (pagingStore != null) {
if (count == 0) {
pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());

View File

@ -28,8 +28,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -42,6 +45,23 @@ public class PageCountSyncOnNonTXTest extends ActiveMQTestBase {
Process process;
@Before
public void checkLoggerStart() throws Exception {
AssertionLoggerHandler.startCapture();
}
@After
public void checkLoggerEnd() throws Exception {
try {
// These are the message errors for the negative size address size
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -32,7 +32,10 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -52,6 +55,23 @@ public class PagingCounterTest extends ActiveMQTestBase {
// Public --------------------------------------------------------
@Before
public void checkLoggerStart() throws Exception {
AssertionLoggerHandler.startCapture();
}
@After
public void checkLoggerEnd() throws Exception {
try {
// These are the message errors for the negative size address size
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Test
public void testCounter() throws Exception {
ClientSessionFactory sf = createSessionFactory(sl);

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -103,6 +104,25 @@ public class PagingTest extends ActiveMQTestBase {
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@Before
public void checkLoggerStart() throws Exception {
AssertionLoggerHandler.startCapture();
}
@After
public void checkLoggerEnd() throws Exception {
try {
// These are the message errors for the negative size address size
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Override
@Before
public void setUp() throws Exception {