diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 32b71708db..41c9aba37a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -610,6 +610,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe redoCounter++; } catch (IOException failedRecovery) { if (isIgnoreMissingJournalfiles()) { + LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); // track this dud location journal.corruptRecoveryLocation(recoveryPosition); } else { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index fbb276a1fe..0ce647a0c0 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -314,8 +314,11 @@ class DataFileAppender implements FileAppender { } dataFile = wb.dataFile; file = dataFile.openRandomAccessFile(); - // pre allocate on first open - journal.preallocateEntireJournalDataFile(file); + // pre allocate on first open of new file (length==0) + // note dataFile.length cannot be used because it is updated in enqueue + if (file.length() == 0l) { + journal.preallocateEntireJournalDataFile(file); + } } Journal.WriteCommand write = wb.writes.getHead(); diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 948b543021..bb56e7d1ab 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -116,6 +116,9 @@ public class JournalCorruptionEofIndexRecoveryTest { adapter.setCheckForCorruptJournalFiles(true); adapter.setIgnoreMissingJournalfiles(true); + adapter.setPreallocationStrategy("zeros"); + adapter.setPreallocationScope("entire_journal"); + } @After @@ -186,6 +189,20 @@ public class JournalCorruptionEofIndexRecoveryTest { } + @Test + public void testRecoverIndex() throws Exception { + startBroker(); + + final int numToSend = 4; + produceMessagesToConsumeMultipleDataFiles(numToSend); + + // force journal replay by whacking the index + restartBroker(false, true); + + assertEquals("Drain", numToSend, drainQueue(numToSend)); + + } + private void corruptBatchCheckSumSplash(int id) throws Exception{ Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java index 9c21a5645e..d4095f3817 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb.disk.journal; import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.util.Wait; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; @@ -36,6 +38,8 @@ import static org.junit.Assert.assertTrue; */ public class PreallocationJournalTest { + private static final Logger LOG = LoggerFactory.getLogger(PreallocationJournalTest.class); + @Test public void testSparseFilePreallocation() throws Exception { executeTest("sparse_file"); @@ -76,6 +80,7 @@ public class PreallocationJournalTest { assertTrue("file size as expected", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { + LOG.info ("file size:" + journalLog + ", chan.size " + channel.size() + ", jfileSize.length: " + journalLog.length()); return Journal.DEFAULT_MAX_FILE_LENGTH == channel.size(); } })); @@ -87,8 +92,7 @@ public class PreallocationJournalTest { buff.position(0); assertEquals(0x00, buff.get()); - System.out.println("File size: " + channel.size()); - + LOG.info("File size: " + channel.size()); store.stop(); }