ensure that the messageReferences are calculated correctly when loading a stored destination. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1311513 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-04-09 22:56:15 +00:00
parent ccaf0839da
commit 34af42f444
1 changed files with 70 additions and 3 deletions

View File

@ -1337,6 +1337,64 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.clear(tx); sd.messageIdIndex.clear(tx);
sd.locationIndex.clear(tx); sd.locationIndex.clear(tx);
sd.orderIndex.clear(tx); sd.orderIndex.clear(tx);
} else {
if (sd.subscriptionCache.size() == 1) {
int messageIdCount = 0;
TreeSet<Long> msgIdContents = new TreeSet<Long>();
Iterator<Entry<String, Long>> iterator1 = sd.messageIdIndex.iterator(tx);
while (iterator1.hasNext()) {
Entry<String, Long> entry = iterator1.next();
messageIdCount++;
msgIdContents.add(entry.getValue());
}
int locationCount = 0;
TreeSet<Long> locationContents = new TreeSet<Long>();
Iterator<Entry<Location, Long>> iterator2 = sd.locationIndex.iterator(tx);
while (iterator2.hasNext()) {
Entry<Location, Long> entry = iterator2.next();
locationCount++;
locationContents.add(entry.getValue());
}
LOG.info("Size of sd.messageIdIndex = " + messageIdCount);
LOG.info("Size of sd.locationIndex = " + locationCount);
LOG.info("Size of sd.ackPositions = " + sd.ackPositions.size());
LOG.info("Size of sd.messageReferences = " + sd.messageReferences.size());
Iterator<Entry<String, SequenceSet>> iterator3 = sd.ackPositions.iterator(tx);
while (iterator3.hasNext()) {
Entry<String, SequenceSet> entry = iterator3.next();
StringBuilder logEntry = new StringBuilder();
logEntry.append("Subscription["+entry.getKey()+"] references: ");
for (Long sequenceId : entry.getValue()) {
logEntry.append(sequenceId + " ");
}
LOG.info(logEntry.toString());
}
StringBuilder msgIdLog = new StringBuilder();
msgIdLog.append("sd.messageIdIndex contains [");
for(Long sequenceId : msgIdContents) {
msgIdLog.append(sequenceId + " ");
}
msgIdLog.append("]");
LOG.info(msgIdLog.toString());
StringBuilder locationLog = new StringBuilder();
locationLog.append("sd.locationIndex contains [");
for(Long sequenceId : locationContents) {
locationLog.append(sequenceId + " ");
}
locationLog.append("]");
LOG.info(locationLog.toString());
LOG.info("Order index last default key: " + sd.orderIndex.lastDefaultKey);
LOG.info("Order index last high key: " + sd.orderIndex.lastHighKey);
LOG.info("Order index last low key: " + sd.orderIndex.lastLowKey);
}
} }
} }
} }
@ -1829,13 +1887,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
while (subscriptions.hasNext()) { while (subscriptions.hasNext()) {
Entry<String, SequenceSet> subscription = subscriptions.next(); Entry<String, SequenceSet> subscription = subscriptions.next();
if (subscription.getValue() != null) { SequenceSet pendingAcks = subscription.getValue();
for(Long sequenceId : subscription.getValue()) { if (pendingAcks != null && !pendingAcks.isEmpty()) {
Long lastPendingAck = pendingAcks.getTail().getLast();
for(Long sequenceId : pendingAcks) {
Long current = rc.messageReferences.get(sequenceId); Long current = rc.messageReferences.get(sequenceId);
if (current == null) { if (current == null) {
current = new Long(0); current = new Long(0);
} }
rc.messageReferences.put(sequenceId, Long.valueOf(current.longValue() + 1));
// We always add a trailing empty entry for the next position to start from
// so we need to ensure we don't count that as a message reference on reload.
if (!sequenceId.equals(lastPendingAck)) {
current = current.longValue() + 1;
}
rc.messageReferences.put(sequenceId, current);
} }
} }
} }