diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index a8f975ed88..a1319278ba 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -423,7 +423,8 @@ public class PageFile { } if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { - writeFile.setLength(PAGE_FILE_HEADER_SIZE); + throw new IllegalStateException("File " + file + " is corrupt, length of " + + writeFile.length() + " is less than page file header size of " + PAGE_FILE_HEADER_SIZE); } nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize); @@ -1165,12 +1166,6 @@ public class PageFile { recoveryFile.write(w.getDiskBound(tmpFilesForRemoval), 0, pageSize); } - // Can we shrink the recovery buffer?? - if (recoveryPageCount > recoveryFileMaxPageCount) { - int t = Math.max(recoveryFileMinPageCount, batch.size()); - recoveryFile.setLength(recoveryFileSizeForPages(t)); - } - // Record the page writes in the recovery buffer. recoveryFile.seek(0); // Store the next tx id... @@ -1262,8 +1257,6 @@ public class PageFile { if (recoveryFile.length() == 0) { // Write an empty header.. recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); - // Preallocate the minium size for better performance. - recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); return 0; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java index 309272a870..ca392da9db 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java @@ -374,10 +374,6 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io. } } - public void setLength(long length) throws IOException { - throw new IllegalStateException("File size is pre allocated"); - } - public void seek(long pos) throws IOException { try { getRaf().seek(pos); diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java index 7eceef1f7e..7e8278f5c9 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java @@ -367,20 +367,7 @@ public class PageFileTest extends TestCase { public void testRecoveryAfterUncleanShutdownAndZeroFreePages() throws Exception { final int numberOfPages = 1000; final AtomicBoolean recoveryEnd = new AtomicBoolean(); - - final var logger = org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(PageFile.class)); - final var appender = new AbstractAppender("testAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { - @Override - public void append(LogEvent event) { - if (event.toImmutable().getLevel().equals(Level.INFO) && event.toImmutable().getMessage().getFormattedMessage().contains("Recovered pageFile free list")) { - recoveryEnd.set(true); - } - } - }; - appender.start(); - - logger.addAppender(appender); - logger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() {}); + createRecoveryAppender("testRecoveryAfterUncleanShutdownAndZeroFreePagesAppender", recoveryEnd); PageFile pf = new PageFile(new File("target/test-data"), getName()); pf.delete(); @@ -414,22 +401,50 @@ public class PageFileTest extends TestCase { } + public void testRecoveryAfterUncleanShutdownAndMissingRecoveryFile() throws Exception { + final int numberOfPages = 1000; + final AtomicBoolean recoveryEnd = new AtomicBoolean(); + createRecoveryAppender("testRecoveryAfterUncleanShutdownAndMissingRecoveryFileAppender", recoveryEnd); + + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(false); + pf.load(); + + LOG.info("Creating Transactions"); + for (int i = 0; i < numberOfPages; i++) { + Transaction tx = pf.tx(); + Page page = tx.allocate(); + String t = "page:" + i; + page.set(t); + tx.store(page, StringMarshaller.INSTANCE, false); + tx.commit(); + } + + pf.flush(); + + assertEquals(pf.getFreePageCount(), 0); + + //Simulate an unclean shutdown + PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.setEnableRecoveryFile(true); + + // Simulate a missing recovery file + pf2.getRecoveryFile().delete(); + pf2.load(); + + assertTrue("Recovery Finished", Wait.waitFor(recoveryEnd::get, 100000)); + + //Simulate a clean shutdown + pf2.unload(); + assertTrue(pf2.isCleanShutdown()); + + } + public void testBackgroundWillMarkUsedPagesAsFreeInTheBeginning() throws Exception { final int numberOfPages = 100000; final AtomicBoolean recoveryEnd = new AtomicBoolean(); - - final var logger = org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(PageFile.class)); - final var appender = new AbstractAppender("pageAppender", new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { - @Override - public void append(LogEvent event) { - if (event.toImmutable().getLevel().equals(Level.INFO) && event.toImmutable().getMessage().getFormattedMessage().contains("Recovered pageFile free list")) { - recoveryEnd.set(true); - } - } - }; - appender.start(); - logger.addAppender(appender); - logger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() {}); + createRecoveryAppender("testBackgroundWillMarkUsedPagesAsFreeInTheBeginningAppender", recoveryEnd); PageFile pf = new PageFile(new File("target/test-data"), getName()); pf.delete(); @@ -519,4 +534,20 @@ public class PageFileTest extends TestCase { assertEquals("pages freed during recovery should be reused", numberOfPages, totalPages); } + + private void createRecoveryAppender(String name, AtomicBoolean recoveryEnd) { + final var logger = org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(PageFile.class)); + final var appender = new AbstractAppender(name, new AbstractFilter() {}, new MessageLayout(), false, new Property[0]) { + @Override + public void append(LogEvent event) { + if (event.toImmutable().getLevel().equals(Level.INFO) && event.toImmutable().getMessage().getFormattedMessage().contains("Recovered pageFile free list")) { + recoveryEnd.set(true); + } + } + }; + appender.start(); + + logger.addAppender(appender); + logger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() {}); + } }