diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index b426a72311..2c6348e894 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -134,7 +134,7 @@ public class PageFile { // Keeps track of free pages. private final AtomicLong nextFreePageId = new AtomicLong(); private SequenceSet freeList = new SequenceSet(); - + private SequenceSet recoveredFreeList = null; private final AtomicLong nextTxid = new AtomicLong(); // Persistent settings stored in the page file. @@ -423,11 +423,70 @@ public class PageFile { storeMetaData(); getFreeFile().delete(); startWriter(); + if (needsFreePageRecovery) { + asyncFreePageRecovery(); + } } else { throw new IllegalStateException("Cannot load the page file when it is already loaded."); } } + private void asyncFreePageRecovery() { + Thread thread = new Thread("KahaDB Index Free Page Recovery") { + @Override + public void run() { + try { + recoverFreePages(); + } catch (Throwable e) { + if (loaded.get()) { + LOG.warn("Error recovering index free page list", e); + } + } + } + }; + thread.setPriority(Thread.NORM_PRIORITY); + thread.setDaemon(true); + thread.start(); + } + + private void recoverFreePages() throws Exception { + LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown.."); + SequenceSet newFreePages = new SequenceSet(); + // need new pageFile instance to get unshared readFile + PageFile recoveryPageFile = new PageFile(directory, name); + recoveryPageFile.loadForRecovery(nextFreePageId.get()); + try { + for (Iterator i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) { + Page page = i.next(); + if (page.getType() == Page.PAGE_FREE_TYPE) { + newFreePages.add(page.getPageId()); + } + } + } finally { + recoveryPageFile.readFile.close(); + } + + LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize()); + if (!newFreePages.isEmpty()) { + + // allow flush (with index lock held) to merge + recoveredFreeList = newFreePages; + } + // all set for clean shutdown + needsFreePageRecovery = false; + } + + private void loadForRecovery(long nextFreePageIdSnap) throws Exception { + loaded.set(true); + enablePageCaching = false; + File file = getMainPageFile(); + readFile = new RecoverableRandomAccessFile(file, "r"); + loadMetaData(); + pageSize = metaData.getPageSize(); + enableRecoveryFile = false; + nextFreePageId.set(nextFreePageIdSnap); + } + /** * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. @@ -445,22 +504,6 @@ public class PageFile { throw new InterruptedIOException(); } - if (needsFreePageRecovery) { - LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown.."); - freeList = new SequenceSet(); - loaded.set(true); - try { - for (Iterator i = new Transaction(this).iterator(true); i.hasNext(); ) { - Page page = i.next(); - if (page.getType() == Page.PAGE_FREE_TYPE) { - freeList.add(page.getPageId()); - } - } - } finally { - loaded.set(false); - } - } - if (freeList.isEmpty()) { metaData.setFreePages(0); } else { @@ -469,7 +512,12 @@ public class PageFile { } metaData.setLastTxId(nextTxid.get() - 1); - metaData.setCleanShutdown(true); + if (needsFreePageRecovery) { + // async recovery incomplete, will have to try again + metaData.setCleanShutdown(false); + } else { + metaData.setCleanShutdown(true); + } storeMetaData(); if (readFile != null) { @@ -513,6 +561,16 @@ public class PageFile { throw new IOException("Page file already stopped: checkpointing is not allowed"); } + SequenceSet toMerge = recoveredFreeList; + if (toMerge != null) { + recoveredFreeList = null; + Sequence seq = toMerge.getHead(); + while (seq != null) { + freeList.add(seq); + seq = seq.getNext(); + } + } + // Setup a latch that gets notified when all buffered writes hits the disk. CountDownLatch checkpointLatch; synchronized (writes) { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java index 1bdbe6f379..72e8d7b372 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java @@ -27,10 +27,15 @@ import java.util.HashSet; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import junit.framework.TestCase; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("rawtypes") public class PageFileTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(PageFileTest.class); + public void testCRUD() throws IOException { PageFile pf = new PageFile(new File("target/test-data"), getName()); @@ -219,12 +224,20 @@ public class PageFileTest extends TestCase { PageFile pf2 = new PageFile(new File("target/test-data"), getName()); pf2.setEnableRecoveryFile(false); pf2.load(); - pf2.unload(); - pf2.load(); - long freePages = pf2.getFreePageCount(); - pf.unload(); + try { + assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { - //Make sure that all 10 pages are still tracked - assertEquals(10, freePages); + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == 10l; + } + }, 12000000)); + } finally { + pf.unload(); + pf2.unload(); + } } }