mirror of https://github.com/apache/activemq.git
fix and issue created when the new changes were merged. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1173188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5325cdb30b
commit
e23afb3391
|
@ -1754,7 +1754,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
Math.max(rc.orderIndex.nextMessageId, nextMessageId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (metadata.version < VERSION) {
|
||||
|
@ -1769,10 +1768,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
if (sequences == null) {
|
||||
sequences = new SequenceSet();
|
||||
sequences.add(messageSequence);
|
||||
sd.ackPositions.put(tx, subscriptionKey, sequences);
|
||||
sd.ackPositions.add(tx, subscriptionKey, sequences);
|
||||
} else {
|
||||
sequences.add(messageSequence);
|
||||
sd.ackPositions.add(tx, subscriptionKey, sequences);
|
||||
sd.ackPositions.put(tx, subscriptionKey, sequences);
|
||||
}
|
||||
|
||||
Long count = sd.messageReferences.get(messageSequence);
|
||||
|
@ -1789,10 +1788,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
if (sequences == null) {
|
||||
sequences = new SequenceSet();
|
||||
sequences.add(messageSequence);
|
||||
sd.ackPositions.put(tx, subscriptionKey, sequences);
|
||||
sd.ackPositions.add(tx, subscriptionKey, sequences);
|
||||
} else {
|
||||
sequences.add(messageSequence);
|
||||
sd.ackPositions.add(tx, subscriptionKey, sequences);
|
||||
sd.ackPositions.put(tx, subscriptionKey, sequences);
|
||||
}
|
||||
|
||||
Long count = sd.messageReferences.get(messageSequence);
|
||||
|
@ -1811,10 +1810,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
if (sequences == null) {
|
||||
sequences = new SequenceSet();
|
||||
sequences.add(new Sequence(messageSequence, messageSequence + 1));
|
||||
sd.ackPositions.put(tx, subscriptionKey, sequences);
|
||||
sd.ackPositions.add(tx, subscriptionKey, sequences);
|
||||
} else {
|
||||
sequences.add(new Sequence(messageSequence, messageSequence + 1));
|
||||
sd.ackPositions.add(tx, subscriptionKey, sequences);
|
||||
sd.ackPositions.put(tx, subscriptionKey, sequences);
|
||||
}
|
||||
|
||||
Long count = sd.messageReferences.get(messageSequence);
|
||||
|
@ -1875,12 +1874,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
* @param sequenceId
|
||||
* @throws IOException
|
||||
*/
|
||||
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
|
||||
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
|
||||
// Remove the sub from the previous location set..
|
||||
if (sequenceId != null) {
|
||||
if (messageSequence != null) {
|
||||
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
|
||||
if (range != null && !range.isEmpty()) {
|
||||
range.remove(sequenceId);
|
||||
range.remove(messageSequence);
|
||||
if (!range.isEmpty()) {
|
||||
sd.ackPositions.put(tx, subscriptionKey, range);
|
||||
} else {
|
||||
|
@ -1888,18 +1887,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
|
||||
// Check if the message is reference by any other subscription.
|
||||
Long count = sd.messageReferences.get(sequenceId);
|
||||
Long count = sd.messageReferences.get(messageSequence);
|
||||
long references = count.longValue() - 1;
|
||||
if (references > 0) {
|
||||
sd.messageReferences.put(sequenceId, Long.valueOf(references));
|
||||
sd.messageReferences.put(messageSequence, Long.valueOf(references));
|
||||
return;
|
||||
} else {
|
||||
sd.messageReferences.remove(sequenceId);
|
||||
sd.messageReferences.remove(messageSequence);
|
||||
}
|
||||
|
||||
// Find all the entries that need to get deleted.
|
||||
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
|
||||
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
|
||||
sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
|
||||
|
||||
// Do the actual deletes.
|
||||
for (Entry<Long, MessageKeys> entry : deletes) {
|
||||
|
|
Loading…
Reference in New Issue