From 85859fd8dc22e3251de294c23d84c12b29d4fe81 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 7 Nov 2018 11:29:14 +0000 Subject: [PATCH] AMQ-7082 We should make sure that pages managed during recovery are not recovered in error variation of patch from Alan Protasio closes #317 --- .../store/kahadb/disk/page/PageFile.java | 34 +++--- .../store/kahadb/disk/util/SequenceSet.java | 24 ++++ .../store/kahadb/disk/page/PageFileTest.java | 112 ++++++++++++++++++ .../kahadb/disk/util/SequenceSetTest.java | 22 ++++ 4 files changed, 178 insertions(+), 14 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index 7456dfa80d..5b898f2db2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -135,7 +135,10 @@ public class PageFile { // Keeps track of free pages. private final AtomicLong nextFreePageId = new AtomicLong(); private SequenceSet freeList = new SequenceSet(); + private AtomicReference recoveredFreeList = new AtomicReference(); + private AtomicReference trackingFreeDuringRecovery = new AtomicReference(); + private final AtomicLong nextTxid = new AtomicLong(); // Persistent settings stored in the page file. @@ -146,8 +149,6 @@ public class PageFile { private boolean useLFRUEviction = false; private float LFUEvictionFactor = 0.2f; - private boolean needsFreePageRecovery = false; - /** * Use to keep track of updated pages which have not yet been committed. */ @@ -412,7 +413,7 @@ public class PageFile { } else { LOG.debug(toString() + ", Recovering page file..."); nextTxid.set(redoRecoveryUpdates()); - needsFreePageRecovery = true; + trackingFreeDuringRecovery.set(new SequenceSet()); } if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { @@ -424,7 +425,7 @@ public class PageFile { storeMetaData(); getFreeFile().delete(); startWriter(); - if (needsFreePageRecovery) { + if (trackingFreeDuringRecovery.get() != null) { asyncFreePageRecovery(nextFreePageId.get()); } } else { @@ -478,8 +479,6 @@ public class PageFile { // allow flush (with index lock held) to merge eventually recoveredFreeList.lazySet(newFreePages); } - // all set for clean shutdown - needsFreePageRecovery = false; } private void loadForRecovery(long nextFreePageIdSnap) throws Exception { @@ -518,7 +517,7 @@ public class PageFile { } metaData.setLastTxId(nextTxid.get() - 1); - if (needsFreePageRecovery) { + if (trackingFreeDuringRecovery.get() != null) { // async recovery incomplete, will have to try again metaData.setCleanShutdown(false); } else { @@ -567,14 +566,16 @@ public class PageFile { throw new IOException("Page file already stopped: checkpointing is not allowed"); } - SequenceSet toMerge = recoveredFreeList.get(); - if (toMerge != null) { + SequenceSet recovered = recoveredFreeList.get(); + if (recovered != null) { recoveredFreeList.lazySet(null); - Sequence seq = toMerge.getHead(); - while (seq != null) { - freeList.add(seq); - seq = seq.getNext(); - } + SequenceSet inUse = trackingFreeDuringRecovery.get(); + recovered.remove(inUse); + freeList.merge(recovered); + + // all set for clean shutdown + trackingFreeDuringRecovery.set(null); + inUse.clear(); } // Setup a latch that gets notified when all buffered writes hits the disk. @@ -961,6 +962,11 @@ public class PageFile { public void freePage(long pageId) { freeList.add(pageId); removeFromCache(pageId); + + SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get(); + if (trackFreeDuringRecovery != null) { + trackFreeDuringRecovery.add(pageId); + } } @SuppressWarnings("unchecked") diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java index fac831bf70..e589e84d0e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/SequenceSet.java @@ -94,6 +94,30 @@ public class SequenceSet extends LinkedNodeList implements Iterable pagesToFree = new LinkedList<>(); + + LOG.info("Creating Transactions"); + for (int i = 0; i < numberOfPages; i++) { + Transaction tx = pf.tx(); + Page page = tx.allocate(); + String t = "page:" + i; + page.set(t); + tx.store(page, StringMarshaller.INSTANCE, false); + tx.commit(); + + if (i >= numberOfPages / 2) { + pagesToFree.add(page.getPageId()); + } + } + + pf.flush(); + + LOG.info("Number of free pages:" + pf.getFreePageCount()); + + //Simulate an unclean shutdown + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.setEnableRecoveryFile(false); + pf2.load(); + + LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount()); + + Transaction tx = pf2.tx(); + + for (Long pageId : pagesToFree) { + tx.free(tx.load(pageId, StringMarshaller.INSTANCE)); + tx.commit(); + } + + LOG.info("RecoveredPageFile: Number of free pages Before Reusing:" + pf2.getFreePageCount()); + List transactions = new LinkedList<>(); + + int totalFreePages = numberOfPages / 2; + int totalPages = numberOfPages; + + for (int i = 0; i < 20; i++) { + tx = pf2.tx(); + Page page = tx.allocate(); + String t = "page:" + i; + page.set(t); + tx.store(page, StringMarshaller.INSTANCE, false); + tx.commit(); + transactions.add(tx); + + // Free pages was already recovered + if (page.getPageId() < numberOfPages) { + totalFreePages--; + } else { + totalPages++; + } + } + + LOG.info("RecoveredPageFile: Number of free pages After Reusing:" + pf2.getFreePageCount()); + + assertTrue("Recovery Finished", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return recoveryEnd.get(); + } + }, 100000)); + + LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount()); + + for (Transaction txConcurrent: transactions) { + for (Page page : txConcurrent) { + assertFalse(pf2.isFreePage(page.pageId)); + } + } + + // Make sure we dont have leaking pages. + assertEquals(pf2.getFreePageCount(), totalFreePages); + assertEquals(pf2.getPageCount(), totalPages); + + assertEquals("pages freed during recovery should be reused", numberOfPages, totalPages); + } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java index a25b4e72ab..7df83513f5 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java @@ -139,6 +139,28 @@ public class SequenceSetTest { set.remove(10); assertEquals(3, set.size()); assertEquals(97, set.rangeSize()); + + SequenceSet toRemove = new SequenceSet(); + toRemove.add(new Sequence(0, 100)); + + set.remove(toRemove); + assertEquals(0, set.size()); + assertEquals(0, set.rangeSize()); + + } + + @Test + public void testMerge() { + SequenceSet set = new SequenceSet(); + set.add(new Sequence(0, 100)); + + SequenceSet set2 = new SequenceSet(); + set.add(new Sequence(50, 150)); + + set.merge(set2); + assertEquals(151, set.rangeSize()); + assertEquals(1, set.size()); + } @Test