mirror of
https://github.com/apache/activemq.git
synced 2025-02-22 18:04:46 +00:00
https://issues.apache.org/jira/browse/AMQ-5960 - rework fix to reset the next sequence so that the next ack position and message reference gets cleared up in normal operation
This commit is contained in:
parent
86c826c461
commit
fcabcd282d
@ -1319,7 +1319,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||
|
||||
// Add the message.
|
||||
int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
|
||||
long id = sd.orderIndex.getNextMessageId(priority);
|
||||
long id = sd.orderIndex.getNextMessageId();
|
||||
Long previous = sd.locationIndex.put(tx, location, id);
|
||||
if (previous == null) {
|
||||
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||
@ -1346,16 +1346,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||
// added message. We don't want to assign it a new id as the other indexes would
|
||||
// be wrong..
|
||||
sd.locationIndex.put(tx, location, previous);
|
||||
// ensure sequence is not broken
|
||||
sd.orderIndex.revertNextMessageId();
|
||||
metadata.lastUpdate = location;
|
||||
// remove ack positions
|
||||
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
|
||||
Iterator<Entry<String, SequenceSet>> it = sd.ackPositions.iterator(tx);
|
||||
while (it.hasNext()) {
|
||||
Entry<String, SequenceSet> entry = it.next();
|
||||
entry.getValue().remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// record this id in any event, initial send or recovery
|
||||
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
||||
@ -1443,7 +1436,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||
removeAckLocation(command, tx, sd, subscriptionKey, sequence);
|
||||
metadata.lastUpdate = ackLocation;
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
|
||||
LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
|
||||
}
|
||||
|
||||
}
|
||||
@ -3183,10 +3176,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||
deletes.add(iterator.next());
|
||||
}
|
||||
|
||||
long getNextMessageId(int priority) {
|
||||
long getNextMessageId() {
|
||||
return nextMessageId++;
|
||||
}
|
||||
|
||||
void revertNextMessageId() {
|
||||
nextMessageId--;
|
||||
}
|
||||
|
||||
MessageKeys get(Transaction tx, Long key) throws IOException {
|
||||
MessageKeys result = defaultPriorityIndex.get(tx, key);
|
||||
if (result == null) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user