diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 5c384354c0..e30f3bf744 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -670,6 +670,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return infos.toString(); } + public String getPreparedTransaction(TransactionId transactionId) { + String result = ""; + synchronized (preparedTransactions) { + List operations = preparedTransactions.get(transactionId); + if (operations != null) { + TranInfo info = new TranInfo(); + info.id = transactionId; + for (Operation operation : preparedTransactions.get(transactionId)) { + info.track(operation); + } + result = info.toString(); + } + } + return result; + } + /** * Move all the messages that were in the journal into long term storage. We * just replay and do a checkpoint. diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index c83e86b6d8..0ddae6e84e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -562,6 +562,14 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem return transactionStore.getJournalCleanupInterval(); } + public void setCheckForCorruption(boolean checkForCorruption) { + transactionStore.setCheckForCorruption(checkForCorruption); + } + + public boolean isCheckForCorruption() { + return transactionStore.isCheckForCorruption(); + } + public List getAdapters() { return Collections.unmodifiableList(adapters); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java index 55931a6ebb..4a29f8ea66 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java @@ -50,6 +50,7 @@ import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaEntryType; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaTraceCommand; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.usage.StoreUsage; @@ -70,6 +71,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore { private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean recovered = new AtomicBoolean(false); private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL; + private boolean checkForCorruption = true; + private AtomicBoolean corruptJournalDetected = new AtomicBoolean(false); public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; @@ -200,6 +203,14 @@ public class MultiKahaDBTransactionStore implements TransactionStore { return journalCleanupInterval; } + public void setCheckForCorruption(boolean checkForCorruption) { + this.checkForCorruption = checkForCorruption; + } + + public boolean isCheckForCorruption() { + return checkForCorruption; + } + public class Tx { private final HashMap stores = new HashMap(); private int prepareLocationId = 0; @@ -341,16 +352,22 @@ public class MultiKahaDBTransactionStore implements TransactionStore { journal.setMaxFileLength(journalMaxFileLength); journal.setWriteBatchSize(journalWriteBatchSize); journal.setCleanupInterval(journalCleanupInterval); + journal.setCheckForCorruptionOnStartup(checkForCorruption); + journal.setChecksum(checkForCorruption); IOHelper.mkdirs(journal.getDirectory()); journal.start(); recoverPendingLocalTransactions(); recovered.set(true); - store(new KahaTraceCommand().setMessage("LOADED " + new Date())); + loaded(); } } + private void loaded() throws IOException { + store(new KahaTraceCommand().setMessage("LOADED " + new Date())); + } + private void txStoreCleanup() { - if (!recovered.get()) { + if (!recovered.get() || corruptJournalDetected.get()) { return; } Set knownDataFileIds = new TreeSet(journal.getFileMap().keySet()); @@ -380,13 +397,30 @@ public class MultiKahaDBTransactionStore implements TransactionStore { } private void recoverPendingLocalTransactions() throws IOException { - Location location = journal.getNextLocation(null); - while (location != null) { - process(location, load(location)); - location = journal.getNextLocation(location); + + if (checkForCorruption) { + for (DataFile dataFile: journal.getFileMap().values()) { + if (!dataFile.getCorruptedBlocks().isEmpty()) { + LOG.error("Corrupt Transaction journal records found in db-{}.log at {}", dataFile.getDataFileId(), dataFile.getCorruptedBlocks()); + corruptJournalDetected.set(true); + } + } + } + if (!corruptJournalDetected.get()) { + Location location = null; + try { + location = journal.getNextLocation(null); + while (location != null) { + process(location, load(location)); + location = journal.getNextLocation(location); + } + } catch (Exception oops) { + LOG.error("Corrupt journal record; unexpected exception on transaction journal replay of location:" + location, oops); + corruptJournalDetected.set(true); + } + pendingCommit.putAll(inflightTransactions); + LOG.info("pending local transactions: " + pendingCommit.keySet()); } - pendingCommit.putAll(inflightTransactions); - LOG.info("pending local transactions: " + pendingCommit.keySet()); } public JournalCommand load(Location location) throws IOException { @@ -436,28 +470,61 @@ public class MultiKahaDBTransactionStore implements TransactionStore { }); } + boolean recoveryWorkPending = false; try { Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); // force completion of local xa for (TransactionId txid : broker.getPreparedTransactions(null)) { if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { - try { - if (pendingCommit.keySet().contains(txid)) { - LOG.info("delivering pending commit outcome for tid: " + txid); - broker.commitTransaction(null, txid, false); - } else { - LOG.info("delivering rollback outcome to store for tid: " + txid); - broker.forgetTransaction(null, txid); + recoveryWorkPending = true; + if (corruptJournalDetected.get()) { + // not having a record is meaningless once our tx store is corrupt; we need a heuristic decision + LOG.warn("Pending multi store local transaction {} requires manual heuristic outcome via JMX", txid); + logSomeContext(txid); + } else { + try { + if (pendingCommit.keySet().contains(txid)) { + // we recorded the commit outcome, finish the job + LOG.info("delivering pending commit outcome for tid: " + txid); + broker.commitTransaction(null, txid, false); + } else { + // we have not record an outcome, and would have reported a commit failure, so we must rollback + LOG.info("delivering rollback outcome to store for tid: " + txid); + broker.forgetTransaction(null, txid); + } + persistCompletion(txid); + } catch (Exception ex) { + LOG.error("failed to deliver pending outcome for tid: " + txid, ex); } - persistCompletion(txid); - } catch (Exception ex) { - LOG.error("failed to deliver pending outcome for tid: " + txid, ex); } } } } catch (Exception e) { LOG.error("failed to resolve pending local transactions", e); } + // can we ignore corruption and resume + if (corruptJournalDetected.get() && !recoveryWorkPending) { + // move to new write file, gc will cleanup + journal.rotateWriteFile(); + loaded(); + corruptJournalDetected.set(false); + LOG.info("No heuristics outcome pending after corrupt tx store detection, auto resolving"); + } + } + + private void logSomeContext(TransactionId txid) throws IOException { + Tx tx = getTx(txid); + if (tx != null) { + for (TransactionStore store: tx.getStores()) { + for (PersistenceAdapter persistenceAdapter : multiKahaDBPersistenceAdapter.adapters) { + if (persistenceAdapter.createTransactionStore() == store) { + if (persistenceAdapter instanceof KahaDBPersistenceAdapter) { + LOG.warn("Heuristic data in: " + persistenceAdapter + ", " + ((KahaDBPersistenceAdapter)persistenceAdapter).getStore().getPreparedTransaction(txid)); + } + } + } + } + } } void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index 9a6e25662d..459d1fd314 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -510,30 +510,32 @@ public class Journal { while (true) { int size = checkBatchRecord(bs, randomAccessFile); - if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { - if (size == 0) { - // eof batch record - break; - } + if (size > 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); - } else { - - // Perhaps it's just some corruption... scan through the - // file to find the next valid batch record. We - // may have subsequent valid batch records. + } else if (size == 0 && location.getOffset() + EOF_RECORD.length + size <= totalFileLength) { + // eof batch record + break; + } else { + // track corruption and skip if possible + Sequence sequence = new Sequence(location.getOffset()); if (findNextBatchRecord(bs, randomAccessFile) >= 0) { int nextOffset = Math.toIntExact(randomAccessFile.getFilePointer() - bs.remaining()); - Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); - LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); + sequence.setLast(nextOffset - 1); dataFile.corruptedBlocks.add(sequence); + LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); location.setOffset(nextOffset); } else { + // corruption to eof, don't loose track of this corruption, don't truncate + sequence.setLast(Math.toIntExact(randomAccessFile.getFilePointer())); + dataFile.corruptedBlocks.add(sequence); + LOG.warn("Corrupt journal records found in '{}' from offset: {} to EOF", dataFile.getFile(), sequence); break; } } } } catch (IOException e) { + LOG.trace("exception on recovery check of: " + dataFile + ", at " + location, e); } finally { accessorPool.closeDataFileAccessor(reader); } @@ -543,14 +545,6 @@ public class Journal { if (existingLen > dataFile.getLength()) { totalLength.addAndGet(dataFile.getLength() - existingLen); } - - if (!dataFile.corruptedBlocks.isEmpty()) { - // Is the end of the data file corrupted? - if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { - dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); - } - } - return location; } @@ -654,7 +648,7 @@ public class Journal { return totalLength.get(); } - private void rotateWriteFile() throws IOException { + public void rotateWriteFile() throws IOException { synchronized (dataFileIdLock) { DataFile dataFile = nextDataFile; if (dataFile == null) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java index da96431141..9e9bcfd44e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java @@ -21,6 +21,8 @@ import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.TransactionId; @@ -29,7 +31,14 @@ import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.DefaultTestAppender; import org.apache.activemq.util.Wait; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; @@ -38,10 +47,14 @@ import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.management.ObjectName; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -183,6 +196,309 @@ public class MKahaDBTxRecoveryTest { } + @Test + public void testManualRecoveryOnCorruptTxStore() throws Exception { + + prepareBrokerWithMultiStore(true); + ((MultiKahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setCheckForCorruption(true); + broker.start(); + broker.waitUntilStarted(); + + // Ensure we have an Admin View. + assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return (broker.getAdminView()) != null; + } + })); + + final AtomicBoolean injectFailure = new AtomicBoolean(true); + + final AtomicInteger reps = new AtomicInteger(); + final AtomicReference delegate = new AtomicReference(); + + TransactionIdTransformer faultInjector = new TransactionIdTransformer() { + @Override + public TransactionId transform(TransactionId txid) { + if (injectFailure.get() && reps.incrementAndGet() > 5) { + throw new RuntimeException("Bla2"); + } + return delegate.get().transform(txid); + } + }; + // set up kahadb to fail after N ops + for (KahaDBPersistenceAdapter pa : kahadbs) { + if (delegate.get() == null) { + delegate.set(pa.getStore().getTransactionIdTransformer()); + } + pa.setTransactionIdTransformer(faultInjector); + } + + ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); + f.setAlwaysSyncSend(true); + Connection c = f.createConnection(); + c.start(); + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2)); + producer.send(s.createTextMessage("HI")); + try { + s.commit(); + fail("Expect commit failure on error injection!"); + } catch (Exception expected) { + expected.printStackTrace(); + } + + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); + + final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)); + final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + + assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount(); + } + })); + + // check completion on recovery + injectFailure.set(false); + + // fire in many more local transactions to use N txStore journal files + for (int i=0; i<100; i++) { + producer.send(s.createTextMessage("HI")); + s.commit(); + } + + ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); + BrokerViewMBean brokerViewMBean = (BrokerViewMBean) broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true); + String pathToDataDir = brokerViewMBean.getDataDirectory(); + + broker.stop(); + + // corrupt the journal such that it fails to load + corruptTxStoreJournal(pathToDataDir); + + // verify failure to load txStore via logging + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class); + + AtomicBoolean foundSomeCorruption = new AtomicBoolean(); + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().equals(Level.ERROR) && event.getMessage().toString().startsWith("Corrupt ")) { + LOG.info("received expected log message: " + event.getMessage()); + foundSomeCorruption.set(true); + } + } + }; + log4jLogger.addAppender(appender); + try { + + prepareBrokerWithMultiStore(false); + ((MultiKahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruption(true); + + broker.start(); + broker.waitUntilStarted(); + + { + final Destination dest1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)); + final Destination dest2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + + // verify partial commit still present + assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return dest1.getMessageStore().getMessageCount() != dest2.getMessageStore().getMessageCount(); + } + })); + } + + assertTrue("broker/store found corruption", foundSomeCorruption.get()); + broker.stop(); + + // and without checksum + LOG.info("Check for journal read failure... no checksum"); + foundSomeCorruption.set(false); + prepareBrokerWithMultiStore(false); + ((MultiKahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruption(false); + + broker.start(); + broker.waitUntilStarted(); + + { + final Destination dest1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)); + final Destination dest2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + + // verify partial commit still present + assertTrue("Partial commit - one dest still has message", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return dest1.getMessageStore().getMessageCount() != dest2.getMessageStore().getMessageCount(); + } + })); + } + + assertTrue("broker/store found corruption without checksum", foundSomeCorruption.get()); + + // force commit outcome via Tx MBeans + ObjectName matchAllPendingTx = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=*"); + Set pendingTx = broker.getManagementContext().queryNames(matchAllPendingTx, null); + assertFalse(pendingTx.isEmpty()); + + for (ObjectName pendingXAtxOn: pendingTx) { + RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean) broker.getManagementContext().newProxyInstance(pendingXAtxOn, + RecoveredXATransactionViewMBean.class, true); + assertEquals("matches ", proxy.getFormatId(), 61616); + + // force commit outcome, we verify the commit in this test, knowing that one branch has committed already + proxy.heuristicCommit(); + } + + pendingTx = broker.getManagementContext().queryNames(matchAllPendingTx, null); + assertTrue(pendingTx.isEmpty()); + + // verify commit completed + Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)); + assertEquals(101, destination.getMessageStore().getMessageCount()); + + destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + assertEquals(101, destination.getMessageStore().getMessageCount()); + + } finally { + log4jLogger.removeAppender(appender); + } + } + + @Test + public void testCorruptionDetectedOnTruncateAndIgnored() throws Exception { + + prepareBrokerWithMultiStore(true); + broker.start(); + broker.waitUntilStarted(); + + ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); + f.setAlwaysSyncSend(true); + Connection c = f.createConnection(); + c.start(); + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2)); + for (int i=0; i<20; i++) { + producer.send(s.createTextMessage("HI")); + s.commit(); + } + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); + + ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); + BrokerViewMBean brokerViewMBean = (BrokerViewMBean) broker.getManagementContext().newProxyInstance(objectName, BrokerViewMBean.class, true); + String pathToDataDir = brokerViewMBean.getDataDirectory(); + + broker.stop(); + + // corrupt the journal such that it fails to load + corruptTxStoreJournalAndTruncate(pathToDataDir); + + // verify failure to load txStore via logging + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(MultiKahaDBTransactionStore.class); + + AtomicBoolean foundSomeCorruption = new AtomicBoolean(); + AtomicBoolean ignoringCorruption = new AtomicBoolean(); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().equals(Level.ERROR) && event.getMessage().toString().startsWith("Corrupt ")) { + LOG.info("received expected log message: " + event.getMessage()); + foundSomeCorruption.set(true); + } else if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("auto resolving")) { + ignoringCorruption.set(true); + } + } + }; + log4jLogger.addAppender(appender); + try { + prepareBrokerWithMultiStore(false); + + broker.start(); + broker.waitUntilStarted(); + + assertTrue("broker/store found corruption", foundSomeCorruption.get()); + assertTrue("broker/store ignored corruption", ignoringCorruption.get()); + + broker.stop(); + + foundSomeCorruption.set(false); + ignoringCorruption.set(false); + + prepareBrokerWithMultiStore(false); + broker.start(); + broker.waitUntilStarted(); + + assertFalse("broker/store no corruption", foundSomeCorruption.get()); + assertFalse("broker/store no ignored corruption", ignoringCorruption.get()); + + Connection connection = f.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer messageProducer = session.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2)); + for (int i=0; i<20; i++) { + messageProducer.send(session.createTextMessage("HI")); + session.commit(); + } + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); + + broker.stop(); + } finally { + log4jLogger.removeAppender(appender); + } + } + + private void corruptTxStoreJournal(String pathToDataDir) throws Exception { + corruptTxStore(pathToDataDir, false); + } + + private void corruptTxStoreJournalAndTruncate(String pathToDataDir) throws Exception { + corruptTxStore(pathToDataDir, true); + } + + private void corruptTxStore(String pathToDataDir, boolean truncate) throws Exception { + LOG.info("Path to broker datadir: " + pathToDataDir); + + RandomAccessFile randomAccessFile = new RandomAccessFile(String.format("%s/mKahaDB/txStore/db-1.log", pathToDataDir), "rw"); + final ByteSequence header = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER); + byte data[] = new byte[1024 * 20]; + ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data, 0, data.length)); + + int offset = bs.indexOf(header, 1); + offset = bs.indexOf(header, offset+1); + offset = bs.indexOf(header, offset+1); + + // 3rd batch + LOG.info("3rd batch record in file: 1:" + offset); + + offset += Journal.BATCH_CONTROL_RECORD_SIZE; + offset += 4; // location size + offset += 1; // location type + + byte fill = (byte) 0xAF; + LOG.info("Whacking batch record in file:" + 1 + ", at offset: " + offset + " with fill:" + fill); + // whack that record + byte[] bla = new byte[2]; + Arrays.fill(bla, fill); + randomAccessFile.seek(offset); + randomAccessFile.write(bla, 0, bla.length); + + if (truncate) { + // set length to truncate + randomAccessFile.setLength(randomAccessFile.getFilePointer()); + } + randomAccessFile.getFD().sync(); + } + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); kaha.setJournalMaxFileLength(maxFileLength);