AMQ-7015 - Changed attribute to purgeRecoveredXATransactionStrategy and

allow NEVER, COMMIT, and ROLLBACK
This commit is contained in:
Jeff Genender 2018-07-25 12:52:49 -06:00
parent b4513004bc
commit 28819aea4a
3 changed files with 104 additions and 19 deletions

View File

@ -547,12 +547,12 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
} }
public boolean isPurgeRecoveredXATransactions() { public String getPurgeRecoveredXATransactionStrategy() {
return letter.isPurgeRecoveredXATransactions(); return letter.getPurgeRecoveredXATransactionStrategy();
} }
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) { public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions); letter.setPurgeRecoveredXATransactionStrategy(purgeRecoveredXATransactionStrategy);
} }
@Override @Override

View File

@ -240,6 +240,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
public enum PurgeRecoveredXATransactionStrategy {
NEVER,
COMMIT,
ROLLBACK;
}
protected PageFile pageFile; protected PageFile pageFile;
protected Journal journal; protected Journal journal;
protected Metadata metadata = new Metadata(); protected Metadata metadata = new Metadata();
@ -272,7 +278,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean ignoreMissingJournalfiles = false; private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000; private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false; private boolean checkForCorruptJournalFiles = false;
private boolean purgeRecoveredXATransactions = false; protected PurgeRecoveredXATransactionStrategy purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.NEVER;
private boolean checksumJournalFiles = true; private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false; protected boolean forceRecoverIndex = false;
private boolean archiveCorruptedIndex = false; private boolean archiveCorruptedIndex = false;
@ -746,14 +752,20 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
synchronized (preparedTransactions) { synchronized (preparedTransactions) {
for (TransactionId txId : preparedTransactions.keySet()) { Set<TransactionId> txIds = new LinkedHashSet<TransactionId>(preparedTransactions.keySet());
for (TransactionId txId : txIds) {
switch (purgeRecoveredXATransactionStrategy){
case NEVER:
LOG.warn("Recovered prepared XA TX: [{}]", txId); LOG.warn("Recovered prepared XA TX: [{}]", txId);
} break;
case COMMIT:
if (purgeRecoveredXATransactions){ store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
if (!preparedTransactions.isEmpty()){ LOG.warn("Recovered and Committing prepared XA TX: [{}]", txId);
LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" ); break;
preparedTransactions.clear(); case ROLLBACK:
store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), false, null, null);
LOG.warn("Recovered and Rolling Back prepared XA TX: [{}]", txId);
break;
} }
} }
} }
@ -3315,12 +3327,17 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
} }
public boolean isPurgeRecoveredXATransactions() { public PurgeRecoveredXATransactionStrategy getPurgeRecoveredXATransactionStrategyEnum() {
return purgeRecoveredXATransactions; return purgeRecoveredXATransactionStrategy;
} }
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) { public String getPurgeRecoveredXATransactionStrategy() {
this.purgeRecoveredXATransactions = purgeRecoveredXATransactions; return purgeRecoveredXATransactionStrategy.name();
}
public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) {
this.purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.valueOf(
purgeRecoveredXATransactionStrategy.trim().toUpperCase());
} }
public boolean isChecksumJournalFiles() { public boolean isChecksumJournalFiles() {

View File

@ -267,7 +267,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ(); assertEmptyDLQ();
} }
public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception { public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() throws Exception {
ActiveMQDestination destination = createDestination(); ActiveMQDestination destination = createDestination();
@ -306,7 +306,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
stopBroker(); stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactions(true); adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter"); LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
} }
broker.start(); broker.start();
@ -320,9 +320,77 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
consumerInfo = createConsumerInfo(sessionInfo, destination); consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo); connection.send(consumerInfo);
// Since rolledback but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered. // Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection)); assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection); assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since committed ... they should get delivered.
for (int i = 0; i < 4; i++) {
assertNotNull(receiveMessage(connection));
}
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response); assertNotNull(response);