AMQ-6707 - mKahadb, track recovered tx per store for completion, resolve test regression

This commit is contained in:
gtully 2018-04-27 13:48:51 +01:00
parent ea70e827c0
commit ceb97f6baa
1 changed files with 28 additions and 8 deletions

View File

@ -19,7 +19,9 @@ package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@ -189,15 +191,23 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
public class Tx {
private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
private int prepareLocationId = 0;
public void trackStore(TransactionStore store, XATransactionId xid) {
stores.put(store, xid);
}
public void trackStore(TransactionStore store) {
stores.add(store);
stores.put(store, null);
}
public HashMap<TransactionStore, TransactionId> getStoresMap() {
return stores;
}
public Set<TransactionStore> getStores() {
return stores;
return stores.keySet();
}
public void trackPrepareLocation(Location location) {
@ -240,8 +250,13 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
Tx tx = getTx(txid);
if (wasPrepared) {
for (TransactionStore store : tx.getStores()) {
store.commit(txid, true, null, null);
for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
TransactionId recovered = storeTx.getValue();
if (recovered != null) {
storeTx.getKey().commit(recovered, true, null, null);
} else {
storeTx.getKey().commit(txid, true, null, null);
}
}
} else {
// can only do 1pc on a single store
@ -289,8 +304,13 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
public void rollback(TransactionId txid) throws IOException {
Tx tx = removeTx(txid);
if (tx != null) {
for (TransactionStore store : tx.getStores()) {
store.rollback(txid);
for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
TransactionId recovered = storeTx.getValue();
if (recovered != null) {
storeTx.getKey().rollback(recovered);
} else {
storeTx.getKey().rollback(txid);
}
}
}
}
@ -387,7 +407,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
@Override
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
try {
getTx(xid).trackStore(adapter.createTransactionStore());
getTx(xid).trackStore(adapter.createTransactionStore(), xid);
} catch (IOException e) {
LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
}