mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3519 - fix regression in org.apache.activemq.bugs.AMQ2983Test, transacted ack needs to use async remove to ensure a concurrent dispatch does not preceed an ack
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1180884 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8cf318788d
commit
2398a3a377
|
@ -454,7 +454,7 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
if (ack.isInTransaction()) {
|
||||
if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
|
||||
destination.removeMessage(context, ack);
|
||||
destination.removeAsyncMessage(context, ack);
|
||||
} else {
|
||||
Tx tx = getTx(ack.getTransactionId());
|
||||
tx.add(new RemoveMessageCommand(context) {
|
||||
|
|
|
@ -1148,7 +1148,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
// store a DUP
|
||||
// message. Bad BOY! Don't do it, and log a warning.
|
||||
LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
|
||||
// TODO: consider just rolling back the tx.
|
||||
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
||||
sd.locationIndex.remove(tx, location);
|
||||
}
|
||||
|
@ -1159,7 +1158,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
// indexes would
|
||||
// be wrong..
|
||||
//
|
||||
// TODO: consider just rolling back the tx.
|
||||
sd.locationIndex.put(tx, location, previous);
|
||||
}
|
||||
// record this id in any event, initial send or recovery
|
||||
|
@ -1178,7 +1176,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
if (keys != null) {
|
||||
sd.locationIndex.remove(tx, keys.location);
|
||||
recordAckMessageReferenceLocation(ackLocation, keys.location);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId());
|
||||
}
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("message not found in sequence id index: " + command.getMessageId());
|
||||
}
|
||||
} else {
|
||||
// In the topic case we need remove the message once it's been acked
|
||||
|
|
Loading…
Reference in New Issue