https://issues.apache.org/jira/browse/AMQ-5578 - preallocation could ocurr after a restart over an existing journal file! - fix and test

This commit is contained in:
gtully 2015-04-20 16:01:50 +01:00
parent 01f56d0ca2
commit 4a821186a4
4 changed files with 29 additions and 4 deletions

View File

@ -610,6 +610,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
redoCounter++;
} catch (IOException failedRecovery) {
if (isIgnoreMissingJournalfiles()) {
LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
// track this dud location
journal.corruptRecoveryLocation(recoveryPosition);
} else {

View File

@ -314,8 +314,11 @@ class DataFileAppender implements FileAppender {
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
// pre allocate on first open
journal.preallocateEntireJournalDataFile(file);
// pre allocate on first open of new file (length==0)
// note dataFile.length cannot be used because it is updated in enqueue
if (file.length() == 0l) {
journal.preallocateEntireJournalDataFile(file);
}
}
Journal.WriteCommand write = wb.writes.getHead();

View File

@ -116,6 +116,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(true);
adapter.setPreallocationStrategy("zeros");
adapter.setPreallocationScope("entire_journal");
}
@After
@ -186,6 +189,20 @@ public class JournalCorruptionEofIndexRecoveryTest {
}
@Test
public void testRecoverIndex() throws Exception {
startBroker();
final int numToSend = 4;
produceMessagesToConsumeMultipleDataFiles(numToSend);
// force journal replay by whacking the index
restartBroker(false, true);
assertEquals("Drain", numToSend, drainQueue(numToSend));
}
private void corruptBatchCheckSumSplash(int id) throws Exception{
Collection<DataFile> files =
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();

View File

@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb.disk.journal;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
@ -36,6 +38,8 @@ import static org.junit.Assert.assertTrue;
*/
public class PreallocationJournalTest {
private static final Logger LOG = LoggerFactory.getLogger(PreallocationJournalTest.class);
@Test
public void testSparseFilePreallocation() throws Exception {
executeTest("sparse_file");
@ -76,6 +80,7 @@ public class PreallocationJournalTest {
assertTrue("file size as expected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info ("file size:" + journalLog + ", chan.size " + channel.size() + ", jfileSize.length: " + journalLog.length());
return Journal.DEFAULT_MAX_FILE_LENGTH == channel.size();
}
}));
@ -87,8 +92,7 @@ public class PreallocationJournalTest {
buff.position(0);
assertEquals(0x00, buff.get());
System.out.println("File size: " + channel.size());
LOG.info("File size: " + channel.size());
store.stop();
}