git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@667752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-06-14 06:59:36 +00:00
parent 35706757e2
commit 2e4c688cdc
2 changed files with 10 additions and 7 deletions

View File

@ -95,7 +95,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private TaskRunnerFactory taskRunnerFactory; private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat(); private WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager; private SystemUsage usageManager;
private long checkpointInterval = 1000 * 60; private long checkpointInterval = 1000 * 20;
private int maxCheckpointMessageAddSize = 1024 * 4; private int maxCheckpointMessageAddSize = 1024 * 4;
private AMQTransactionStore transactionStore = new AMQTransactionStore(this); private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
private TaskRunner checkpointTask; private TaskRunner checkpointTask;
@ -375,12 +375,13 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
LOG.debug("Checkpoint started."); LOG.debug("Checkpoint started.");
} }
Location newMark = null; Location currentMark = asyncDataManager.getMark();
Location newMark = currentMark;
Iterator<AMQMessageStore> queueIterator = queues.values().iterator(); Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
while (queueIterator.hasNext()) { while (queueIterator.hasNext()) {
final AMQMessageStore ms = queueIterator.next(); final AMQMessageStore ms = queueIterator.next();
Location mark = (Location)ms.getMark(); Location mark = (Location)ms.getMark();
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
newMark = mark; newMark = mark;
} }
} }
@ -388,12 +389,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
while (topicIterator.hasNext()) { while (topicIterator.hasNext()) {
final AMQTopicMessageStore ms = topicIterator.next(); final AMQTopicMessageStore ms = topicIterator.next();
Location mark = (Location)ms.getMark(); Location mark = (Location)ms.getMark();
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
newMark = mark; newMark = mark;
} }
} }
try { try {
if (newMark != null) { if (newMark != currentMark) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Marking journal at: " + newMark); LOG.debug("Marking journal at: " + newMark);
} }

View File

@ -142,10 +142,10 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
if (container != null) { if (container != null) {
ConsumerMessageRef ref = null; ConsumerMessageRef ref = null;
if((ref = container.remove(messageId)) != null) { if((ref = container.remove(messageId)) != null) {
TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); StoreEntry entry = ref.getAckEntry();
TopicSubAck tsa = ackContainer.get(entry);
if (tsa != null) { if (tsa != null) {
if (tsa.decrementCount() <= 0) { if (tsa.decrementCount() <= 0) {
StoreEntry entry = ref.getAckEntry();
entry = ackContainer.refresh(entry); entry = ackContainer.refresh(entry);
ackContainer.remove(entry); ackContainer.remove(entry);
ReferenceRecord rr = messageContainer.get(messageId); ReferenceRecord rr = messageContainer.get(messageId);
@ -156,6 +156,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
removeInterest(rr); removeInterest(rr);
removeMessage = true; removeMessage = true;
} }
}else {
ackContainer.update(entry,tsa);
} }
} }
}else{ }else{