AMQ-7015 Added a purgeRecoveredXATransactions property on the KahaDB adaptor to purge prepared XA messages on recovery

This commit is contained in:
hkesler 2018-07-19 11:53:04 -06:00
parent 7313d72c6b
commit ce7498c971
4 changed files with 96 additions and 3 deletions

View File

@ -547,6 +547,14 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
public boolean isPurgeRecoveredXATransactions() {
return letter.isPurgeRecoveredXATransactions();
}
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
}
@Override
public void setBrokerService(BrokerService brokerService) {
super.setBrokerService(brokerService);

View File

@ -272,6 +272,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
private boolean purgeRecoveredXATransactions = false;
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
private boolean archiveCorruptedIndex = false;
@ -748,6 +749,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for (TransactionId txId : preparedTransactions.keySet()) {
LOG.warn("Recovered prepared XA TX: [{}]", txId);
}
if (purgeRecoveredXATransactions){
if (!preparedTransactions.isEmpty()){
LOG.warn("Purging " + preparedTransactions.size() + " recovered prepared XA TXs" );
preparedTransactions.clear();
}
}
}
} finally {
@ -3340,6 +3348,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
}
public boolean isPurgeRecoveredXATransactions() {
return purgeRecoveredXATransactions;
}
public void setPurgeRecoveredXATransactions(boolean purgeRecoveredXATransactions) {
this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
}
public boolean isChecksumJournalFiles() {
return checksumJournalFiles;
}

View File

@ -58,10 +58,13 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
* @throws URISyntaxException
*/
protected void restartBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
broker = createRestartedBroker();
stopBroker();
broker.start();
}
protected void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
broker = createRestartedBroker();
}
}

View File

@ -267,6 +267,72 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
assertEmptyDLQ();
}
public void testPreparedTransactionRecoveredPurgeOnRestart() throws Exception {
ActiveMQDestination destination = createDestination();
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Prepare 4 message sends.
for (int i = 0; i < 4; i++) {
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setTransactionId(txid);
connection.send(message);
// Prepare
connection.send(createPrepareTransaction(connectionInfo, txid));
}
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
connection.request(closeConnectionInfo(connectionInfo));
// restart the broker.
stopBroker();
if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
adapter.setPurgeRecoveredXATransactions(true);
LOG.info("Setting purgeRecoveredXATransactions to true on the KahaDBPersistenceAdapter");
}
broker.start();
// Setup the consumer and try receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Since prepared but not committed.. they should not get delivered.
assertNull(receiveMessage(connection));
assertNoMessagesLeft(connection);
Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER));
assertNotNull(response);
DataArrayResponse dar = (DataArrayResponse)response;
//These should be purged so expect 0
assertEquals(0, dar.getData().length);
}
private void assertEmptyDLQ() throws Exception {
try {
DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));