From ceb97f6baa471006cf176f298e763224f8d1b58f Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 27 Apr 2018 13:48:51 +0100 Subject: [PATCH] AMQ-6707 - mKahadb, track recovered tx per store for completion, resolve test regression --- .../kahadb/MultiKahaDBTransactionStore.java | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index ff70076372..f13fc53943 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -19,7 +19,9 @@ package org.apache.activemq.store.kahadb; import java.io.File; import java.io.IOException; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -189,15 +191,23 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } public class Tx { - private final Set stores = new HashSet(); + private final HashMap stores = new HashMap(); private int prepareLocationId = 0; + public void trackStore(TransactionStore store, XATransactionId xid) { + stores.put(store, xid); + } + public void trackStore(TransactionStore store) { - stores.add(store); + stores.put(store, null); + } + + public HashMap getStoresMap() { + return stores; } public Set getStores() { - return stores; + return stores.keySet(); } public void trackPrepareLocation(Location location) { @@ -240,8 +250,13 @@ public class MultiKahaDBTransactionStore implements TransactionStore { Tx tx = getTx(txid); if (wasPrepared) { - for (TransactionStore store : tx.getStores()) { - store.commit(txid, true, null, null); + for (Map.Entry storeTx : tx.getStoresMap().entrySet()) { + TransactionId recovered = storeTx.getValue(); + if (recovered != null) { + storeTx.getKey().commit(recovered, true, null, null); + } else { + storeTx.getKey().commit(txid, true, null, null); + } } } else { // can only do 1pc on a single store @@ -289,8 +304,13 @@ public class MultiKahaDBTransactionStore implements TransactionStore { public void rollback(TransactionId txid) throws IOException { Tx tx = removeTx(txid); if (tx != null) { - for (TransactionStore store : tx.getStores()) { - store.rollback(txid); + for (Map.Entry storeTx : tx.getStoresMap().entrySet()) { + TransactionId recovered = storeTx.getValue(); + if (recovered != null) { + storeTx.getKey().rollback(recovered); + } else { + storeTx.getKey().rollback(txid); + } } } } @@ -387,7 +407,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore { @Override public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { try { - getTx(xid).trackStore(adapter.createTransactionStore()); + getTx(xid).trackStore(adapter.createTransactionStore(), xid); } catch (IOException e) { LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); }