diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 9535a8317f..de9e4d453e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1754,7 +1754,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar Math.max(rc.orderIndex.nextMessageId, nextMessageId); } } - } if (metadata.version < VERSION) { @@ -1769,10 +1768,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (sequences == null) { sequences = new SequenceSet(); sequences.add(messageSequence); - sd.ackPositions.put(tx, subscriptionKey, sequences); + sd.ackPositions.add(tx, subscriptionKey, sequences); } else { sequences.add(messageSequence); - sd.ackPositions.add(tx, subscriptionKey, sequences); + sd.ackPositions.put(tx, subscriptionKey, sequences); } Long count = sd.messageReferences.get(messageSequence); @@ -1789,10 +1788,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (sequences == null) { sequences = new SequenceSet(); sequences.add(messageSequence); - sd.ackPositions.put(tx, subscriptionKey, sequences); + sd.ackPositions.add(tx, subscriptionKey, sequences); } else { sequences.add(messageSequence); - sd.ackPositions.add(tx, subscriptionKey, sequences); + sd.ackPositions.put(tx, subscriptionKey, sequences); } Long count = sd.messageReferences.get(messageSequence); @@ -1811,10 +1810,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (sequences == null) { sequences = new SequenceSet(); sequences.add(new Sequence(messageSequence, messageSequence + 1)); - sd.ackPositions.put(tx, subscriptionKey, sequences); + sd.ackPositions.add(tx, subscriptionKey, sequences); } else { sequences.add(new Sequence(messageSequence, messageSequence + 1)); - sd.ackPositions.add(tx, subscriptionKey, sequences); + sd.ackPositions.put(tx, subscriptionKey, sequences); } Long count = sd.messageReferences.get(messageSequence); @@ -1875,12 +1874,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar * @param sequenceId * @throws IOException */ - private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { + private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { // Remove the sub from the previous location set.. - if (sequenceId != null) { + if (messageSequence != null) { SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); if (range != null && !range.isEmpty()) { - range.remove(sequenceId); + range.remove(messageSequence); if (!range.isEmpty()) { sd.ackPositions.put(tx, subscriptionKey, range); } else { @@ -1888,18 +1887,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } // Check if the message is reference by any other subscription. - Long count = sd.messageReferences.get(sequenceId); + Long count = sd.messageReferences.get(messageSequence); long references = count.longValue() - 1; if (references > 0) { - sd.messageReferences.put(sequenceId, Long.valueOf(references)); + sd.messageReferences.put(messageSequence, Long.valueOf(references)); return; } else { - sd.messageReferences.remove(sequenceId); + sd.messageReferences.remove(messageSequence); } // Find all the entries that need to get deleted. ArrayList> deletes = new ArrayList>(); - sd.orderIndex.getDeleteList(tx, deletes, sequenceId); + sd.orderIndex.getDeleteList(tx, deletes, messageSequence); // Do the actual deletes. for (Entry entry : deletes) {