AMQ-7067 - ensure updates to ackMessageFileMap are protected by the index lock

(cherry picked from commit a311139bfe)
This commit is contained in:
gtully 2018-10-09 12:55:11 +01:00 committed by Christopher L. Shannon (cshannon)
parent 7fa85185aa
commit 29fbeb511f
1 changed files with 19 additions and 8 deletions

View File

@ -1401,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) { for (Operation op : messagingTx) {
op.execute(tx); op.execute(tx);
recordAckMessageReferenceLocation(location, op.getLocation());
} }
} }
}); });
@ -1408,21 +1409,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} finally { } finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
for (Operation op: inflightTx) {
recordAckMessageReferenceLocation(location, op.getLocation());
}
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
protected void process(KahaPrepareCommand command, Location location) { protected void process(KahaPrepareCommand command, Location location) {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
List<Operation> tx = null;
synchronized (inflightTransactions) { synchronized (inflightTransactions) {
List<Operation> tx = inflightTransactions.remove(key); tx = inflightTransactions.remove(key);
if (tx != null) { if (tx != null) {
preparedTransactions.put(key, tx); preparedTransactions.put(key, tx);
for (Operation op: tx) { }
}
if (tx != null && !tx.isEmpty()) {
indexLock.writeLock().lock();
try {
for (Operation op : tx) {
recordAckMessageReferenceLocation(location, op.getLocation()); recordAckMessageReferenceLocation(location, op.getLocation());
} }
} finally {
indexLock.writeLock().unlock();
} }
} }
} }
@ -1437,9 +1443,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
updates = preparedTransactions.remove(key); updates = preparedTransactions.remove(key);
} }
} }
if (key.isXATransaction() && updates != null) { if (key.isXATransaction() && updates != null && !updates.isEmpty()) {
for(Operation op : updates) { indexLock.writeLock().lock();
recordAckMessageReferenceLocation(location, op.getLocation()); try {
for (Operation op : updates) {
recordAckMessageReferenceLocation(location, op.getLocation());
}
} finally {
indexLock.writeLock().unlock();
} }
} }
} }