[AMQ-7488] mkahadb - detect txStore corruption and suspend recovery, auto recover if no outcomes pending

This commit is contained in:
gtully 2020-05-20 12:43:37 +01:00
parent 7b51233a69
commit cedac472a1
5 changed files with 440 additions and 39 deletions

View File

@ -670,6 +670,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return infos.toString();
}
public String getPreparedTransaction(TransactionId transactionId) {
String result = "";
synchronized (preparedTransactions) {
List<Operation> operations = preparedTransactions.get(transactionId);
if (operations != null) {
TranInfo info = new TranInfo();
info.id = transactionId;
for (Operation operation : preparedTransactions.get(transactionId)) {
info.track(operation);
}
result = info.toString();
}
}
return result;
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.

View File

@ -562,6 +562,14 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
return transactionStore.getJournalCleanupInterval();
}
public void setCheckForCorruption(boolean checkForCorruption) {
transactionStore.setCheckForCorruption(checkForCorruption);
}
public boolean isCheckForCorruption() {
return transactionStore.isCheckForCorruption();
}
public List<PersistenceAdapter> getAdapters() {
return Collections.unmodifiableList(adapters);
}

View File

@ -50,6 +50,7 @@ import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.usage.StoreUsage;
@ -70,6 +71,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean recovered = new AtomicBoolean(false);
private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
private boolean checkForCorruption = true;
private AtomicBoolean corruptJournalDetected = new AtomicBoolean(false);
public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
@ -200,6 +203,14 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return journalCleanupInterval;
}
public void setCheckForCorruption(boolean checkForCorruption) {
this.checkForCorruption = checkForCorruption;
}
public boolean isCheckForCorruption() {
return checkForCorruption;
}
public class Tx {
private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
private int prepareLocationId = 0;
@ -341,16 +352,22 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
journal.setMaxFileLength(journalMaxFileLength);
journal.setWriteBatchSize(journalWriteBatchSize);
journal.setCleanupInterval(journalCleanupInterval);
journal.setCheckForCorruptionOnStartup(checkForCorruption);
journal.setChecksum(checkForCorruption);
IOHelper.mkdirs(journal.getDirectory());
journal.start();
recoverPendingLocalTransactions();
recovered.set(true);
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
loaded();
}
}
private void loaded() throws IOException {
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
}
private void txStoreCleanup() {
if (!recovered.get()) {
if (!recovered.get() || corruptJournalDetected.get()) {
return;
}
Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
@ -380,13 +397,30 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
private void recoverPendingLocalTransactions() throws IOException {
Location location = journal.getNextLocation(null);
while (location != null) {
process(location, load(location));
location = journal.getNextLocation(location);
if (checkForCorruption) {
for (DataFile dataFile: journal.getFileMap().values()) {
if (!dataFile.getCorruptedBlocks().isEmpty()) {
LOG.error("Corrupt Transaction journal records found in db-{}.log at {}", dataFile.getDataFileId(), dataFile.getCorruptedBlocks());
corruptJournalDetected.set(true);
}
}
}
if (!corruptJournalDetected.get()) {
Location location = null;
try {
location = journal.getNextLocation(null);
while (location != null) {
process(location, load(location));
location = journal.getNextLocation(location);
}
} catch (Exception oops) {
LOG.error("Corrupt journal record; unexpected exception on transaction journal replay of location:" + location, oops);
corruptJournalDetected.set(true);
}
pendingCommit.putAll(inflightTransactions);
LOG.info("pending local transactions: " + pendingCommit.keySet());
}
pendingCommit.putAll(inflightTransactions);
LOG.info("pending local transactions: " + pendingCommit.keySet());
}
public JournalCommand<?> load(Location location) throws IOException {
@ -436,28 +470,61 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
});
}
boolean recoveryWorkPending = false;
try {
Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
// force completion of local xa
for (TransactionId txid : broker.getPreparedTransactions(null)) {
if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
try {
if (pendingCommit.keySet().contains(txid)) {
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
} else {
LOG.info("delivering rollback outcome to store for tid: " + txid);
broker.forgetTransaction(null, txid);
recoveryWorkPending = true;
if (corruptJournalDetected.get()) {
// not having a record is meaningless once our tx store is corrupt; we need a heuristic decision
LOG.warn("Pending multi store local transaction {} requires manual heuristic outcome via JMX", txid);
logSomeContext(txid);
} else {
try {
if (pendingCommit.keySet().contains(txid)) {
// we recorded the commit outcome, finish the job
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
} else {
// we have not record an outcome, and would have reported a commit failure, so we must rollback
LOG.info("delivering rollback outcome to store for tid: " + txid);
broker.forgetTransaction(null, txid);
}
persistCompletion(txid);
} catch (Exception ex) {
LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
}
persistCompletion(txid);
} catch (Exception ex) {
LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
}
}
}
} catch (Exception e) {
LOG.error("failed to resolve pending local transactions", e);
}
// can we ignore corruption and resume
if (corruptJournalDetected.get() && !recoveryWorkPending) {
// move to new write file, gc will cleanup
journal.rotateWriteFile();
loaded();
corruptJournalDetected.set(false);
LOG.info("No heuristics outcome pending after corrupt tx store detection, auto resolving");
}
}
private void logSomeContext(TransactionId txid) throws IOException {
Tx tx = getTx(txid);
if (tx != null) {
for (TransactionStore store: tx.getStores()) {
for (PersistenceAdapter persistenceAdapter : multiKahaDBPersistenceAdapter.adapters) {
if (persistenceAdapter.createTransactionStore() == store) {
if (persistenceAdapter instanceof KahaDBPersistenceAdapter) {
LOG.warn("Heuristic data in: " + persistenceAdapter + ", " + ((KahaDBPersistenceAdapter)persistenceAdapter).getStore().getPreparedTransaction(txid));
}
}
}
}
}
}
void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)

View File

@ -510,30 +510,32 @@ public class Journal {
while (true) {
int size = checkBatchRecord(bs, randomAccessFile);
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
if (size == 0) {
// eof batch record
break;
}
if (size > 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
} else {
// Perhaps it's just some corruption... scan through the
// file to find the next valid batch record. We
// may have subsequent valid batch records.
} else if (size == 0 && location.getOffset() + EOF_RECORD.length + size <= totalFileLength) {
// eof batch record
break;
} else {
// track corruption and skip if possible
Sequence sequence = new Sequence(location.getOffset());
if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining());
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
sequence.setLast(nextOffset - 1);
dataFile.corruptedBlocks.add(sequence);
LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
location.setOffset(nextOffset);
} else {
// corruption to eof, don't loose track of this corruption, don't truncate
sequence.setLast(Math.toIntExact(randomAccessFile.getFilePointer()));
dataFile.corruptedBlocks.add(sequence);
LOG.warn("Corrupt journal records found in '{}' from offset: {} to EOF", dataFile.getFile(), sequence);
break;
}
}
}
} catch (IOException e) {
LOG.trace("exception on recovery check of: " + dataFile + ", at " + location, e);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
@ -543,14 +545,6 @@ public class Journal {
if (existingLen > dataFile.getLength()) {
totalLength.addAndGet(dataFile.getLength() - existingLen);
}
if (!dataFile.corruptedBlocks.isEmpty()) {
// Is the end of the data file corrupted?
if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
}
}
return location;
}
@ -654,7 +648,7 @@ public class Journal {
return totalLength.get();
}
private void rotateWriteFile() throws IOException {
public void rotateWriteFile() throws IOException {
synchronized (dataFileIdLock) {
DataFile dataFile = nextDataFile;
if (dataFile == null) {

View File

@ -21,6 +21,8 @@ import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
@ -29,7 +31,14 @@ import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
@ -38,10 +47,14 @@ import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -183,6 +196,309 @@ public class MKahaDBTxRecoveryTest {
}
@Test
public void testManualRecoveryOnCorruptTxStore() throws Exception {
prepareBrokerWithMultiStore(true);
((MultiKahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCheckForCorruption(true);
broker.start();
broker.waitUntilStarted();
// Ensure we have an Admin View.
assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return (broker.getAdminView()) != null;
}
}));
final AtomicBoolean injectFailure = new AtomicBoolean(true);
final AtomicInteger reps = new AtomicInteger();
final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
TransactionIdTransformer faultInjector = new TransactionIdTransformer() {
@Override
public TransactionId transform(TransactionId txid) {
if (injectFailure.get() && reps.incrementAndGet() > 5) {
throw new RuntimeException("Bla2");
}
return delegate.get().transform(txid);
}
};
// set up kahadb to fail after N ops
for (KahaDBPersistenceAdapter pa : kahadbs) {
if (delegate.get() == null) {
delegate.set(pa.getStore().getTransactionIdTransformer());
}
pa.setTransactionIdTransformer(faultInjector);
}
ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
f.setAlwaysSyncSend(true);
Connection c = f.createConnection();
c.start();
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2));
producer.send(s.createTextMessage("HI"));
try {
s.commit();
fail("Expect commit failure on error injection!");
} catch (Exception expected) {
expected.printStackTrace();
}
assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
}
}));
// check completion on recovery
injectFailure.set(false);
// fire in many more local transactions to use N txStore journal files
for (int i=0; i<100; i++) {
producer.send(s.createTextMessage("HI"));
s.commit();
}
ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean brokerViewMBean = (BrokerViewMBean) broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true);
String pathToDataDir = brokerViewMBean.getDataDirectory();
broker.stop();
// corrupt the journal such that it fails to load
corruptTxStoreJournal(pathToDataDir);
// verify failure to load txStore via logging
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class);
AtomicBoolean foundSomeCorruption = new AtomicBoolean();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.ERROR) && event.getMessage().toString().startsWith("Corrupt ")) {
LOG.info("received expected log message: " + event.getMessage());
foundSomeCorruption.set(true);
}
}
};
log4jLogger.addAppender(appender);
try {
prepareBrokerWithMultiStore(false);
((MultiKahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruption(true);
broker.start();
broker.waitUntilStarted();
{
final Destination dest1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
final Destination dest2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
// verify partial commit still present
assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return dest1.getMessageStore().getMessageCount() != dest2.getMessageStore().getMessageCount();
}
}));
}
assertTrue("broker/store found corruption", foundSomeCorruption.get());
broker.stop();
// and without checksum
LOG.info("Check for journal read failure... no checksum");
foundSomeCorruption.set(false);
prepareBrokerWithMultiStore(false);
((MultiKahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruption(false);
broker.start();
broker.waitUntilStarted();
{
final Destination dest1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
final Destination dest2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
// verify partial commit still present
assertTrue("Partial commit - one dest still has message", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return dest1.getMessageStore().getMessageCount() != dest2.getMessageStore().getMessageCount();
}
}));
}
assertTrue("broker/store found corruption without checksum", foundSomeCorruption.get());
// force commit outcome via Tx MBeans
ObjectName matchAllPendingTx = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=*");
Set<ObjectName> pendingTx = broker.getManagementContext().queryNames(matchAllPendingTx, null);
assertFalse(pendingTx.isEmpty());
for (ObjectName pendingXAtxOn: pendingTx) {
RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean) broker.getManagementContext().newProxyInstance(pendingXAtxOn,
RecoveredXATransactionViewMBean.class, true);
assertEquals("matches ", proxy.getFormatId(), 61616);
// force commit outcome, we verify the commit in this test, knowing that one branch has committed already
proxy.heuristicCommit();
}
pendingTx = broker.getManagementContext().queryNames(matchAllPendingTx, null);
assertTrue(pendingTx.isEmpty());
// verify commit completed
Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
assertEquals(101, destination.getMessageStore().getMessageCount());
destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
assertEquals(101, destination.getMessageStore().getMessageCount());
} finally {
log4jLogger.removeAppender(appender);
}
}
@Test
public void testCorruptionDetectedOnTruncateAndIgnored() throws Exception {
prepareBrokerWithMultiStore(true);
broker.start();
broker.waitUntilStarted();
ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
f.setAlwaysSyncSend(true);
Connection c = f.createConnection();
c.start();
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2));
for (int i=0; i<20; i++) {
producer.send(s.createTextMessage("HI"));
s.commit();
}
assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean brokerViewMBean = (BrokerViewMBean) broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true);
String pathToDataDir = brokerViewMBean.getDataDirectory();
broker.stop();
// corrupt the journal such that it fails to load
corruptTxStoreJournalAndTruncate(pathToDataDir);
// verify failure to load txStore via logging
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class);
AtomicBoolean foundSomeCorruption = new AtomicBoolean();
AtomicBoolean ignoringCorruption = new AtomicBoolean();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.ERROR) && event.getMessage().toString().startsWith("Corrupt ")) {
LOG.info("received expected log message: " + event.getMessage());
foundSomeCorruption.set(true);
} else if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("auto resolving")) {
ignoringCorruption.set(true);
}
}
};
log4jLogger.addAppender(appender);
try {
prepareBrokerWithMultiStore(false);
broker.start();
broker.waitUntilStarted();
assertTrue("broker/store found corruption", foundSomeCorruption.get());
assertTrue("broker/store ignored corruption", ignoringCorruption.get());
broker.stop();
foundSomeCorruption.set(false);
ignoringCorruption.set(false);
prepareBrokerWithMultiStore(false);
broker.start();
broker.waitUntilStarted();
assertFalse("broker/store no corruption", foundSomeCorruption.get());
assertFalse("broker/store no ignored corruption", ignoringCorruption.get());
Connection connection = f.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer messageProducer = session.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2));
for (int i=0; i<20; i++) {
messageProducer.send(session.createTextMessage("HI"));
session.commit();
}
assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
broker.stop();
} finally {
log4jLogger.removeAppender(appender);
}
}
private void corruptTxStoreJournal(String pathToDataDir) throws Exception {
corruptTxStore(pathToDataDir, false);
}
private void corruptTxStoreJournalAndTruncate(String pathToDataDir) throws Exception {
corruptTxStore(pathToDataDir, true);
}
private void corruptTxStore(String pathToDataDir, boolean truncate) throws Exception {
LOG.info("Path to broker datadir: " + pathToDataDir);
RandomAccessFile randomAccessFile = new RandomAccessFile(String.format("%s/mKahaDB/txStore/db-1.log", pathToDataDir), "rw");
final ByteSequence header = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER);
byte data[] = new byte[1024 * 20];
ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data, 0, data.length));
int offset = bs.indexOf(header, 1);
offset = bs.indexOf(header, offset+1);
offset = bs.indexOf(header, offset+1);
// 3rd batch
LOG.info("3rd batch record in file: 1:" + offset);
offset += Journal.BATCH_CONTROL_RECORD_SIZE;
offset += 4; // location size
offset += 1; // location type
byte fill = (byte) 0xAF;
LOG.info("Whacking batch record in file:" + 1 + ", at offset: " + offset + " with fill:" + fill);
// whack that record
byte[] bla = new byte[2];
Arrays.fill(bla, fill);
randomAccessFile.seek(offset);
randomAccessFile.write(bla, 0, bla.length);
if (truncate) {
// set length to truncate
randomAccessFile.setLength(randomAccessFile.getFilePointer());
}
randomAccessFile.getFD().sync();
}
protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setJournalMaxFileLength(maxFileLength);