diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 2ae22c6ba6..1fbea759a0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1337,6 +1337,64 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.messageIdIndex.clear(tx); sd.locationIndex.clear(tx); sd.orderIndex.clear(tx); + } else { + + if (sd.subscriptionCache.size() == 1) { + + int messageIdCount = 0; + TreeSet msgIdContents = new TreeSet(); + Iterator> iterator1 = sd.messageIdIndex.iterator(tx); + while (iterator1.hasNext()) { + Entry entry = iterator1.next(); + messageIdCount++; + msgIdContents.add(entry.getValue()); + } + + int locationCount = 0; + TreeSet locationContents = new TreeSet(); + Iterator> iterator2 = sd.locationIndex.iterator(tx); + while (iterator2.hasNext()) { + Entry entry = iterator2.next(); + locationCount++; + locationContents.add(entry.getValue()); + } + + LOG.info("Size of sd.messageIdIndex = " + messageIdCount); + LOG.info("Size of sd.locationIndex = " + locationCount); + LOG.info("Size of sd.ackPositions = " + sd.ackPositions.size()); + LOG.info("Size of sd.messageReferences = " + sd.messageReferences.size()); + + Iterator> iterator3 = sd.ackPositions.iterator(tx); + while (iterator3.hasNext()) { + Entry entry = iterator3.next(); + StringBuilder logEntry = new StringBuilder(); + logEntry.append("Subscription["+entry.getKey()+"] references: "); + for (Long sequenceId : entry.getValue()) { + logEntry.append(sequenceId + " "); + } + LOG.info(logEntry.toString()); + } + + StringBuilder msgIdLog = new StringBuilder(); + msgIdLog.append("sd.messageIdIndex contains ["); + for(Long sequenceId : msgIdContents) { + msgIdLog.append(sequenceId + " "); + } + msgIdLog.append("]"); + LOG.info(msgIdLog.toString()); + StringBuilder locationLog = new StringBuilder(); + locationLog.append("sd.locationIndex contains ["); + for(Long sequenceId : locationContents) { + locationLog.append(sequenceId + " "); + } + locationLog.append("]"); + LOG.info(locationLog.toString()); + + LOG.info("Order index last default key: " + sd.orderIndex.lastDefaultKey); + LOG.info("Order index last high key: " + sd.orderIndex.lastHighKey); + LOG.info("Order index last low key: " + sd.orderIndex.lastLowKey); + } + } } } @@ -1829,13 +1887,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Iterator> subscriptions = rc.ackPositions.iterator(tx); while (subscriptions.hasNext()) { Entry subscription = subscriptions.next(); - if (subscription.getValue() != null) { - for(Long sequenceId : subscription.getValue()) { + SequenceSet pendingAcks = subscription.getValue(); + if (pendingAcks != null && !pendingAcks.isEmpty()) { + Long lastPendingAck = pendingAcks.getTail().getLast(); + for(Long sequenceId : pendingAcks) { Long current = rc.messageReferences.get(sequenceId); if (current == null) { current = new Long(0); } - rc.messageReferences.put(sequenceId, Long.valueOf(current.longValue() + 1)); + + // We always add a trailing empty entry for the next position to start from + // so we need to ensure we don't count that as a message reference on reload. + if (!sequenceId.equals(lastPendingAck)) { + current = current.longValue() + 1; + } + + rc.messageReferences.put(sequenceId, current); } } }