AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit

This commit is contained in:
gtully 2019-06-11 12:36:02 +01:00
parent 93e726d6a7
commit 28a0cc6e5a
2 changed files with 10 additions and 6 deletions

View File

@ -63,7 +63,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
private Journal journal;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
@ -294,10 +294,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
pendingCommit.put(txid, tx);
}
public void persistCompletion(TransactionId txid) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
pendingCommit.remove(txid);
}
private Location store(JournalCommand<?> data) throws IOException {
@ -355,6 +357,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
for (Tx tx : inflightTransactions.values()) {
knownDataFileIds.remove(tx.getPreparedLocationId());
}
for (Tx tx : pendingCommit.values()) {
knownDataFileIds.remove(tx.getPreparedLocationId());
}
try {
journal.removeDataFiles(knownDataFileIds);
} catch (Exception e) {
@ -380,8 +385,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
process(location, load(location));
location = journal.getNextLocation(location);
}
recoveredPendingCommit.addAll(inflightTransactions.keySet());
LOG.info("pending local transactions: " + recoveredPendingCommit);
pendingCommit.putAll(inflightTransactions);
LOG.info("pending local transactions: " + pendingCommit.keySet());
}
public JournalCommand<?> load(Location location) throws IOException {
@ -437,10 +442,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
for (TransactionId txid : broker.getPreparedTransactions(null)) {
if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
try {
if (recoveredPendingCommit.contains(txid)) {
if (pendingCommit.keySet().contains(txid)) {
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
recoveredPendingCommit.remove(txid);
} else {
LOG.info("delivering rollback outcome to store for tid: " + txid);
broker.forgetTransaction(null, txid);

View File

@ -207,7 +207,7 @@ public class MKahaDBTxRecoveryTest {
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10);
broker = createBroker(multiKahaDBPersistenceAdapter);
}