[AMQ-7126] Improvement to perf of 5266Test

This commit is contained in:
jgoodyear 2019-01-07 21:22:34 -03:30
parent 273afef47c
commit 4a21edc8d5
6 changed files with 22 additions and 3 deletions

View File

@ -177,6 +177,16 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
} }
boolean parentHasSpace(int waterMark) {
boolean result = true;
if (systemUsage != null) {
if (systemUsage.getMemoryUsage().getParent() != null) {
return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark;
}
}
return result;
}
private boolean isParentFull() { private boolean isParentFull() {
boolean result = false; boolean result = false;
if (systemUsage != null) { if (systemUsage != null) {

View File

@ -287,6 +287,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return useCache && size==0 && hasSpace() && isStarted(); return useCache && size==0 && hasSpace() && isStarted();
} }
@Override
public boolean canRecoveryNextMessage() {
// Should be safe to recovery messages if the overall memory usage if < 90%
return parentHasSpace(90);
}
private void syncWithStore(Message currentAdd) throws Exception { private void syncWithStore(Message currentAdd) throws Exception {
pruneLastCached(); pruneLastCached();
for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {

View File

@ -26,6 +26,9 @@ public interface MessageRecoveryListener {
boolean recoverMessage(Message message) throws Exception; boolean recoverMessage(Message message) throws Exception;
boolean recoverMessageReference(MessageId ref) throws Exception; boolean recoverMessageReference(MessageId ref) throws Exception;
boolean hasSpace(); boolean hasSpace();
default boolean canRecoveryNextMessage() {
return true;
}
/** /**
* check if ref is a duplicate but do not record the reference * check if ref is a duplicate but do not record the reference
* @param ref * @param ref

View File

@ -679,7 +679,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg); listener.recoverMessage(msg);
counter++; counter++;
if (counter >= maxReturned || !listener.hasSpace()) { if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break; break;
} }
} }

View File

@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
lastmsgid = msg.getMessageId lastmsgid = msg.getMessageId
count += 1 count += 1
} }
count < max && listener.hasSpace count < max && listener.canRecoveryNextMessage
} }
if( lastmsgid==null ) { if( lastmsgid==null ) {
startPos startPos

View File

@ -136,7 +136,7 @@ public class MemoryLimitTest extends TestSupport {
assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71); assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71);
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71); assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 91);
// let's make sure we can consume all messages // let's make sure we can consume all messages
for (int i = 1; i < 2000; i++) { for (int i = 1; i < 2000; i++) {