AMQ-7015 - Changed attribute to purgeRecoveredXATransactionStrategy and

allow NEVER, COMMIT, and ROLLBACK
This commit is contained in:
Jeff Genender 2018-07-25 12:52:49 -06:00
parent d25de5d8c3
commit 2a2e01df6a
3 changed files with 104 additions and 19 deletions

View File

@ -547,12 +547,12 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
public boolean isPurgeRecoveredXATransactions() {
return letter.isPurgeRecoveredXATransactions();
public String getPurgeRecoveredXATransactionStrategy() {
return letter.getPurgeRecoveredXATransactionStrategy();
}
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
letter.setPurgeRecoveredXATransactionStrategy(purgeRecoveredXATransactionStrategy);
}
@Override

View File

@ -240,6 +240,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
public enum PurgeRecoveredXATransactionStrategy {
NEVER,
COMMIT,
ROLLBACK;
}
protected PageFile pageFile;
protected Journal journal;
protected Metadata metadata = new Metadata();
@ -272,7 +278,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
private boolean purgeRecoveredXATransactions = false;
protected PurgeRecoveredXATransactionStrategy purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.NEVER;
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
private boolean archiveCorruptedIndex = false;
@ -746,14 +752,20 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
synchronized (preparedTransactions) {
for (TransactionId txId : preparedTransactions.keySet()) {
LOG.warn("Recovered prepared XA TX: [{}]", txId);
}
if (purgeRecoveredXATransactions){
if (!preparedTransactions.isEmpty()){
LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" );
preparedTransactions.clear();
Set<TransactionId> txIds = new LinkedHashSet<TransactionId>(preparedTransactions.keySet());
for (TransactionId txId : txIds) {
switch (purgeRecoveredXATransactionStrategy){
case NEVER:
LOG.warn("Recovered prepared XA TX: [{}]", txId);
break;
case COMMIT:
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId);
break;
case ROLLBACK:
store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId);
break;
}
}
}
@ -3348,12 +3360,17 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
}
public boolean isPurgeRecoveredXATransactions() {
return purgeRecoveredXATransactions;
public PurgeRecoveredXATransactionStrategy getPurgeRecoveredXATransactionStrategyEnum() {
return purgeRecoveredXATransactionStrategy;
}
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
public String getPurgeRecoveredXATransactionStrategy() {
return purgeRecoveredXATransactionStrategy.name();
}
public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
this.purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.valueOf(
purgeRecoveredXATransactionStrategy.trim().toUpperCase());
}
public boolean isChecksumJournalFiles() {

View File

@ -267,7 +267,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ();
}
public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception {
public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
@ -306,7 +306,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactions(true);
adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
@ -320,9 +320,77 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since rolledback but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since committed ... they should get delivered.
for (int i = 0; i < 4; i++) {
assertNotNull(receiveMessage(connection));
}
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);