[AMQ-7126] Prevent OOM when recovering KahaDB and memory space is insufficient to load full page

This commit is contained in:
jgoodyear 2019-01-03 15:41:17 -03:30
parent 540999654a
commit 2ed15d563c
5 changed files with 6 additions and 14 deletions

View File

@ -595,7 +595,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg); listener.recoverMessage(msg);
counter++; counter++;
if (counter >= maxReturned) { if (counter >= maxReturned || !listener.hasSpace()) {
break; break;
} }
} }

View File

@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
lastmsgid = msg.getMessageId lastmsgid = msg.getMessageId
count += 1 count += 1
} }
count < max count < max && listener.hasSpace
} }
if( lastmsgid==null ) { if( lastmsgid==null ) {
startPos startPos

View File

@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase {
protected void configureBroker() throws Exception { protected void configureBroker() throws Exception {
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
broker.setAdvisorySupport(false); broker.setAdvisorySupport(false);
broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024); broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();

View File

@ -133,18 +133,10 @@ public class MemoryLimitTest extends TestSupport {
Message msg = consumer.receive(5000); Message msg = consumer.receive(5000);
msg.acknowledge(); msg.acknowledge();
// this should free some space and allow us to get new batch of messages in the memory assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
// exceeding the limit
assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Destination usage: " + dest.getMemoryUsage());
return dest.getMemoryUsage().getPercentUsage() >= 200;
}
}));
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 200); assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
// let's make sure we can consume all messages // let's make sure we can consume all messages
for (int i = 1; i < 2000; i++) { for (int i = 1; i < 2000; i++) {

View File

@ -182,7 +182,7 @@ public class QueueBrowsingTest {
@Test @Test
public void testMemoryLimit() throws Exception { public void testMemoryLimit() throws Exception {
broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024); broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 4 * 1024);
int messageToSend = 370; int messageToSend = 370;