ARTEMIS-748 Negative Address Size after paging
This commit is contained in:
parent
895ffb5b3c
commit
9c5a91b481
|
@ -109,7 +109,11 @@ public final class PagingManagerImpl implements PagingManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PagingManagerImpl addSize(int size) {
|
public PagingManagerImpl addSize(int size) {
|
||||||
globalSizeBytes.addAndGet(size);
|
long newSize = globalSizeBytes.addAndGet(size);
|
||||||
|
|
||||||
|
if (newSize < 0) {
|
||||||
|
ActiveMQServerLogger.LOGGER.negativeGlobalAddressSize(newSize);
|
||||||
|
}
|
||||||
|
|
||||||
if (size < 0) {
|
if (size < 0) {
|
||||||
checkMemoryRelease();
|
checkMemoryRelease();
|
||||||
|
|
|
@ -705,6 +705,10 @@ public class PagingStoreImpl implements PagingStore {
|
||||||
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
|
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
|
||||||
long newSize = sizeInBytes.addAndGet(size);
|
long newSize = sizeInBytes.addAndGet(size);
|
||||||
|
|
||||||
|
if (newSize < 0) {
|
||||||
|
ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString());
|
||||||
|
}
|
||||||
|
|
||||||
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
|
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
|
||||||
if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
|
if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
|
||||||
checkReleaseMemory(globalFull, newSize);
|
checkReleaseMemory(globalFull, newSize);
|
||||||
|
|
|
@ -1271,6 +1271,18 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
format = Message.Format.MESSAGE_FORMAT)
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
void serverIsolatedOnNetwork();
|
void serverIsolatedOnNetwork();
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222214,
|
||||||
|
value = "Destination {1} has an inconsistent and negative address size={0}.",
|
||||||
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void negativeAddressSize(long size, String destination);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222215,
|
||||||
|
value = "Global Address Size has negative and inconsistent value as {0}",
|
||||||
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void negativeGlobalAddressSize(long size);
|
||||||
|
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
|
|
@ -126,6 +126,11 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage {
|
||||||
public int decrementRefCount() throws Exception {
|
public int decrementRefCount() throws Exception {
|
||||||
int count = refCount.decrementAndGet();
|
int count = refCount.decrementAndGet();
|
||||||
|
|
||||||
|
if (count < 0) {
|
||||||
|
// this could happen on paged messages since they are not routed and incrementRefCount is never called
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
if (pagingStore != null) {
|
if (pagingStore != null) {
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
|
pagingStore.addSize(-getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
|
||||||
|
|
|
@ -28,8 +28,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -42,6 +45,23 @@ public class PageCountSyncOnNonTXTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
Process process;
|
Process process;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void checkLoggerStart() throws Exception {
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void checkLoggerEnd() throws Exception {
|
||||||
|
try {
|
||||||
|
// These are the message errors for the negative size address size
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
|
||||||
|
} finally {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
|
|
@ -32,7 +32,10 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -52,6 +55,23 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void checkLoggerStart() throws Exception {
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void checkLoggerEnd() throws Exception {
|
||||||
|
try {
|
||||||
|
// These are the message errors for the negative size address size
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
|
||||||
|
} finally {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCounter() throws Exception {
|
public void testCounter() throws Exception {
|
||||||
ClientSessionFactory sf = createSessionFactory(sl);
|
ClientSessionFactory sf = createSessionFactory(sl);
|
||||||
|
|
|
@ -80,6 +80,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -103,6 +104,25 @@ public class PagingTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
|
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void checkLoggerStart() throws Exception {
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void checkLoggerEnd() throws Exception {
|
||||||
|
try {
|
||||||
|
// These are the message errors for the negative size address size
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
|
||||||
|
} finally {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue