diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 33c5f3e1b6..c476e42dc7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -95,7 +95,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private TaskRunnerFactory taskRunnerFactory; private WireFormat wireFormat = new OpenWireFormat(); private SystemUsage usageManager; - private long checkpointInterval = 1000 * 60; + private long checkpointInterval = 1000 * 20; private int maxCheckpointMessageAddSize = 1024 * 4; private AMQTransactionStore transactionStore = new AMQTransactionStore(this); private TaskRunner checkpointTask; @@ -375,12 +375,13 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, LOG.debug("Checkpoint started."); } - Location newMark = null; + Location currentMark = asyncDataManager.getMark(); + Location newMark = currentMark; Iterator queueIterator = queues.values().iterator(); while (queueIterator.hasNext()) { final AMQMessageStore ms = queueIterator.next(); Location mark = (Location)ms.getMark(); - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { newMark = mark; } } @@ -388,12 +389,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, while (topicIterator.hasNext()) { final AMQTopicMessageStore ms = topicIterator.next(); Location mark = (Location)ms.getMark(); - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) { newMark = mark; } } try { - if (newMark != null) { + if (newMark != currentMark) { if (LOG.isDebugEnabled()) { LOG.debug("Marking journal at: " + newMark); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 30bf24a28b..1550436577 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -142,10 +142,10 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic if (container != null) { ConsumerMessageRef ref = null; if((ref = container.remove(messageId)) != null) { - TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); + StoreEntry entry = ref.getAckEntry(); + TopicSubAck tsa = ackContainer.get(entry); if (tsa != null) { if (tsa.decrementCount() <= 0) { - StoreEntry entry = ref.getAckEntry(); entry = ackContainer.refresh(entry); ackContainer.remove(entry); ReferenceRecord rr = messageContainer.get(messageId); @@ -156,6 +156,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic removeInterest(rr); removeMessage = true; } + }else { + ackContainer.update(entry,tsa); } } }else{