diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index b6b2ca70be..21cac0ab85 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -669,6 +669,24 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); } + /** + * Returns whether Ack compaction is enabled + * + * @return enableAckCompaction + */ + public boolean isEnableAckCompaction() { + return letter.isEnableAckCompaction(); + } + + /** + * Configure if the Ack compaction task should be enabled to run + * + * @param enableAckCompaction + */ + public void setEnableAckCompaction(boolean enableAckCompaction) { + letter.setEnableAckCompaction(enableAckCompaction); + } + public KahaDBStore getStore() { return letter; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 434e49ca13..8bb9491828 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -274,6 +274,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean enableIndexPageCaching = true; ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); + private boolean enableAckCompaction = true; private int compactAcksAfterNoGC = 10; private boolean compactAcksIgnoresStoreGrowth = false; private int checkPointCyclesWithNoGC; @@ -1817,7 +1818,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (ackMessageFileMapMod) { checkpointUpdate(tx, false); } - } else { + } else if (isEnableAckCompaction()) { if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { // First check length of journal to make sure it makes sense to even try. // @@ -3671,4 +3672,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; } + + /** + * Returns whether Ack compaction is enabled + * + * @return enableAckCompaction + */ + public boolean isEnableAckCompaction() { + return enableAckCompaction; + } + + /** + * Configure if the Ack compaction task should be enabled to run + * + * @param enableAckCompaction + */ + public void setEnableAckCompaction(boolean enableAckCompaction) { + this.enableAckCompaction = enableAckCompaction; + } }