From a311139bfe2f2b3ffc0c84cfb1e9cec0c11830c7 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 9 Oct 2018 12:55:11 +0100 Subject: [PATCH] AMQ-7067 - ensure updates to ackMessageFileMap are protected by the index lock --- .../store/kahadb/MessageDatabase.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 46696f45ea..d231a864c7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1401,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void execute(Transaction tx) throws IOException { for (Operation op : messagingTx) { op.execute(tx); + recordAckMessageReferenceLocation(location, op.getLocation()); } } }); @@ -1408,21 +1409,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } finally { indexLock.writeLock().unlock(); } - for (Operation op: inflightTx) { - recordAckMessageReferenceLocation(location, op.getLocation()); - } } @SuppressWarnings("rawtypes") protected void process(KahaPrepareCommand command, Location location) { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); + List tx = null; synchronized (inflightTransactions) { - List tx = inflightTransactions.remove(key); + tx = inflightTransactions.remove(key); if (tx != null) { preparedTransactions.put(key, tx); - for (Operation op: tx) { + } + } + if (tx != null && !tx.isEmpty()) { + indexLock.writeLock().lock(); + try { + for (Operation op : tx) { recordAckMessageReferenceLocation(location, op.getLocation()); } + } finally { + indexLock.writeLock().unlock(); } } } @@ -1437,9 +1443,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updates = preparedTransactions.remove(key); } } - if (key.isXATransaction() && updates != null) { - for(Operation op : updates) { - recordAckMessageReferenceLocation(location, op.getLocation()); + if (key.isXATransaction() && updates != null && !updates.isEmpty()) { + indexLock.writeLock().lock(); + try { + for (Operation op : updates) { + recordAckMessageReferenceLocation(location, op.getLocation()); + } + } finally { + indexLock.writeLock().unlock(); } } }