diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index fe79a2d6b1..7456dfa80d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -425,19 +425,19 @@ public class PageFile { getFreeFile().delete(); startWriter(); if (needsFreePageRecovery) { - asyncFreePageRecovery(); + asyncFreePageRecovery(nextFreePageId.get()); } } else { throw new IllegalStateException("Cannot load the page file when it is already loaded."); } } - private void asyncFreePageRecovery() { + private void asyncFreePageRecovery(final long lastRecoveryPage) { Thread thread = new Thread("KahaDB Index Free Page Recovery") { @Override public void run() { try { - recoverFreePages(); + recoverFreePages(lastRecoveryPage); } catch (Throwable e) { if (loaded.get()) { LOG.warn("Error recovering index free page list", e); @@ -450,7 +450,7 @@ public class PageFile { thread.start(); } - private void recoverFreePages() throws Exception { + private void recoverFreePages(final long lastRecoveryPage) throws Exception { LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown.."); SequenceSet newFreePages = new SequenceSet(); // need new pageFile instance to get unshared readFile @@ -459,6 +459,11 @@ public class PageFile { try { for (Iterator i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) { Page page = i.next(); + + if (page.getPageId() >= lastRecoveryPage) { + break; + } + if (page.getType() == Page.PAGE_FREE_TYPE) { newFreePages.add(page.getPageId()); } @@ -817,6 +822,9 @@ public class PageFile { return toOffset(nextFreePageId.get()); } + public boolean isFreePage(long pageId) { + return freeList.contains(pageId); + } /** * @return the number of pages allocated in the PageFile */ diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java index 3a5cefd56c..db1ecf3743 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.store.kahadb.disk.page; +import junit.framework.TestCase; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -23,13 +29,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashSet; - -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; - -import junit.framework.TestCase; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.LinkedList; +import java.util.List; @SuppressWarnings("rawtypes") public class PageFileTest extends TestCase { @@ -261,4 +262,97 @@ public class PageFileTest extends TestCase { assertEquals(pf.getFreePageCount(), 10); } + + public void testBackgroundRecoveryIsThreadSafe() throws Exception { + + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(false); + pf.load(); + + Transaction tx = pf.tx(); + tx.allocate(100000); + tx.commit(); + LOG.info("Number of free pages:" + pf.getFreePageCount()); + pf.flush(); + + //Simulate an unclean shutdown + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.setEnableRecoveryFile(false); + pf2.load(); + + Transaction tx2 = pf2.tx(); + tx2.allocate(100000); + tx2.commit(); + LOG.info("Number of free pages:" + pf2.getFreePageCount()); + + List transactions = new LinkedList<>(); + + Thread.sleep(500); + LOG.info("Creating Transactions"); + for (int i = 0; i < 20; i++) { + Transaction txConcurrent = pf2.tx(); + Page page = txConcurrent.allocate(); + String t = "page:" + i; + page.set(t); + txConcurrent.store(page, StringMarshaller.INSTANCE, false); + txConcurrent.commit(); + transactions.add(txConcurrent); + Thread.sleep(50); + } + + assertTrue("We have 199980 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == 199980; + } + }, 12000000)); + + for (Transaction txConcurrent2: transactions) { + for (Page page : txConcurrent2) { + assertFalse(pf2.isFreePage(page.pageId)); + } + } + + } + + public void testBackgroundWillNotMarkEaslyPagesAsFree() throws Exception { + + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(false); + pf.load(); + + Transaction tx = pf.tx(); + tx.allocate(100000); + tx.commit(); + LOG.info("Number of free pages:" + pf.getFreePageCount()); + pf.flush(); + + //Simulate an unclean shutdown + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.setEnableRecoveryFile(false); + pf2.load(); + + Transaction tx2 = pf2.tx(); + tx2.allocate(200); + tx2.commit(); + LOG.info("Number of free pages:" + pf2.getFreePageCount()); + + Transaction tx3 = pf2.tx(); + tx3.allocate(100); + + assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == 100100; + } + }, 12000000)); + } }