mirror of https://github.com/apache/activemq.git
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/activemq
This commit is contained in:
commit
179d7c0580
|
@ -1319,7 +1319,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
|
|
||||||
// Add the message.
|
// Add the message.
|
||||||
int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
|
int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
|
||||||
long id = sd.orderIndex.getNextMessageId(priority);
|
long id = sd.orderIndex.getNextMessageId();
|
||||||
Long previous = sd.locationIndex.put(tx, location, id);
|
Long previous = sd.locationIndex.put(tx, location, id);
|
||||||
if (previous == null) {
|
if (previous == null) {
|
||||||
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||||
|
@ -1346,16 +1346,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
// added message. We don't want to assign it a new id as the other indexes would
|
// added message. We don't want to assign it a new id as the other indexes would
|
||||||
// be wrong..
|
// be wrong..
|
||||||
sd.locationIndex.put(tx, location, previous);
|
sd.locationIndex.put(tx, location, previous);
|
||||||
|
// ensure sequence is not broken
|
||||||
|
sd.orderIndex.revertNextMessageId();
|
||||||
metadata.lastUpdate = location;
|
metadata.lastUpdate = location;
|
||||||
// remove ack positions
|
|
||||||
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
|
|
||||||
Iterator<Entry<String, SequenceSet>> it = sd.ackPositions.iterator(tx);
|
|
||||||
while (it.hasNext()) {
|
|
||||||
Entry<String, SequenceSet> entry = it.next();
|
|
||||||
entry.getValue().remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
// record this id in any event, initial send or recovery
|
// record this id in any event, initial send or recovery
|
||||||
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
||||||
|
@ -1443,7 +1436,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
removeAckLocation(command, tx, sd, subscriptionKey, sequence);
|
removeAckLocation(command, tx, sd, subscriptionKey, sequence);
|
||||||
metadata.lastUpdate = ackLocation;
|
metadata.lastUpdate = ackLocation;
|
||||||
} else if (LOG.isDebugEnabled()) {
|
} else if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
|
LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3183,10 +3176,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
deletes.add(iterator.next());
|
deletes.add(iterator.next());
|
||||||
}
|
}
|
||||||
|
|
||||||
long getNextMessageId(int priority) {
|
long getNextMessageId() {
|
||||||
return nextMessageId++;
|
return nextMessageId++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void revertNextMessageId() {
|
||||||
|
nextMessageId--;
|
||||||
|
}
|
||||||
|
|
||||||
MessageKeys get(Transaction tx, Long key) throws IOException {
|
MessageKeys get(Transaction tx, Long key) throws IOException {
|
||||||
MessageKeys result = defaultPriorityIndex.get(tx, key);
|
MessageKeys result = defaultPriorityIndex.get(tx, key);
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
|
|
6
pom.xml
6
pom.xml
|
@ -68,10 +68,10 @@
|
||||||
<hawtdispatch-version>1.21</hawtdispatch-version>
|
<hawtdispatch-version>1.21</hawtdispatch-version>
|
||||||
<howl-version>0.1.8</howl-version>
|
<howl-version>0.1.8</howl-version>
|
||||||
<hsqldb-version>1.8.0.12</hsqldb-version>
|
<hsqldb-version>1.8.0.12</hsqldb-version>
|
||||||
<httpclient-version>4.5</httpclient-version>
|
<httpclient-version>4.5.1</httpclient-version>
|
||||||
<httpcore-version>4.4.3</httpcore-version>
|
<httpcore-version>4.4.3</httpcore-version>
|
||||||
<insight-version>1.2.0.Beta4</insight-version>
|
<insight-version>1.2.0.Beta4</insight-version>
|
||||||
<jackson-version>2.6.1</jackson-version>
|
<jackson-version>2.6.2</jackson-version>
|
||||||
<jasypt-version>1.9.2</jasypt-version>
|
<jasypt-version>1.9.2</jasypt-version>
|
||||||
<jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
|
<jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
|
||||||
<jdom-version>1.0</jdom-version>
|
<jdom-version>1.0</jdom-version>
|
||||||
|
@ -103,7 +103,7 @@
|
||||||
<zookeeper-version>3.4.6</zookeeper-version>
|
<zookeeper-version>3.4.6</zookeeper-version>
|
||||||
<qpid-proton-version>0.10</qpid-proton-version>
|
<qpid-proton-version>0.10</qpid-proton-version>
|
||||||
<qpid-jms-version>0.5.0</qpid-jms-version>
|
<qpid-jms-version>0.5.0</qpid-jms-version>
|
||||||
<netty-all-version>4.0.29.Final</netty-all-version>
|
<netty-all-version>4.0.31.Final</netty-all-version>
|
||||||
<regexp-version>1.3</regexp-version>
|
<regexp-version>1.3</regexp-version>
|
||||||
<rome-version>1.0</rome-version>
|
<rome-version>1.0</rome-version>
|
||||||
<saxon-version>9.5.1-2</saxon-version>
|
<saxon-version>9.5.1-2</saxon-version>
|
||||||
|
|
Loading…
Reference in New Issue