diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 815b9dfe2f..351219087f 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1319,7 +1319,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Add the message. int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; - long id = sd.orderIndex.getNextMessageId(priority); + long id = sd.orderIndex.getNextMessageId(); Long previous = sd.locationIndex.put(tx, location, id); if (previous == null) { previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); @@ -1346,16 +1346,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // added message. We don't want to assign it a new id as the other indexes would // be wrong.. sd.locationIndex.put(tx, location, previous); + // ensure sequence is not broken + sd.orderIndex.revertNextMessageId(); metadata.lastUpdate = location; - // remove ack positions - if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { - Iterator> it = sd.ackPositions.iterator(tx); - while (it.hasNext()) { - Entry entry = it.next(); - entry.getValue().remove(id); - } - } - } // record this id in any event, initial send or recovery metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); @@ -1443,7 +1436,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe removeAckLocation(command, tx, sd, subscriptionKey, sequence); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { - LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); + LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); } } @@ -3183,10 +3176,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe deletes.add(iterator.next()); } - long getNextMessageId(int priority) { + long getNextMessageId() { return nextMessageId++; } + void revertNextMessageId() { + nextMessageId--; + } + MessageKeys get(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.get(tx, key); if (result == null) { diff --git a/pom.xml b/pom.xml index d6fc6c1294..40132202cc 100755 --- a/pom.xml +++ b/pom.xml @@ -68,10 +68,10 @@ 1.21 0.1.8 1.8.0.12 - 4.5 + 4.5.1 4.4.3 1.2.0.Beta4 - 2.6.1 + 2.6.2 1.9.2 2.2.11_1 1.0 @@ -103,7 +103,7 @@ 3.4.6 0.10 0.5.0 - 4.0.29.Final + 4.0.31.Final 1.3 1.0 9.5.1-2