From 4a21edc8d544474c477e2b38ac3f939f9873bfee Mon Sep 17 00:00:00 2001 From: jgoodyear Date: Mon, 7 Jan 2019 21:22:34 -0330 Subject: [PATCH] [AMQ-7126] Improvement to perf of 5266Test --- .../region/cursors/AbstractPendingMessageCursor.java | 10 ++++++++++ .../broker/region/cursors/AbstractStoreCursor.java | 6 ++++++ .../apache/activemq/store/MessageRecoveryListener.java | 3 +++ .../org/apache/activemq/store/kahadb/KahaDBStore.java | 2 +- .../scala/org/apache/activemq/leveldb/DBManager.scala | 2 +- .../org/apache/activemq/usecases/MemoryLimitTest.java | 2 +- 6 files changed, 22 insertions(+), 3 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 83465a1f16..e0250f28db 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -177,6 +177,16 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; } + boolean parentHasSpace(int waterMark) { + boolean result = true; + if (systemUsage != null) { + if (systemUsage.getMemoryUsage().getParent() != null) { + return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark; + } + } + return result; + } + private boolean isParentFull() { boolean result = false; if (systemUsage != null) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 2dd934f08d..71a83acaef 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -287,6 +287,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return useCache && size==0 && hasSpace() && isStarted(); } + @Override + public boolean canRecoveryNextMessage() { + // Should be safe to recovery messages if the overall memory usage if < 90% + return parentHasSpace(90); + } + private void syncWithStore(Message currentAdd) throws Exception { pruneLastCached(); for (ListIterator it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java index 5cbeac9bd1..8d9bf62818 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java @@ -26,6 +26,9 @@ public interface MessageRecoveryListener { boolean recoverMessage(Message message) throws Exception; boolean recoverMessageReference(MessageId ref) throws Exception; boolean hasSpace(); + default boolean canRecoveryNextMessage() { + return true; + } /** * check if ref is a duplicate but do not record the reference * @param ref diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index d36bff21a3..0351d06349 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -679,7 +679,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); listener.recoverMessage(msg); counter++; - if (counter >= maxReturned || !listener.hasSpace()) { + if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { break; } } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index f9ce9e76d4..8119a9b9a8 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) { lastmsgid = msg.getMessageId count += 1 } - count < max && listener.hasSpace + count < max && listener.canRecoveryNextMessage } if( lastmsgid==null ) { startPos diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index fa27175c59..4c0747f05a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -136,7 +136,7 @@ public class MemoryLimitTest extends TestSupport { assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71); LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); - assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71); + assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 91); // let's make sure we can consume all messages for (int i = 1; i < 2000; i++) {