AMQ-7082 We should make sure that pages managed during recovery are not recovered in error

variation of patch from Alan Protasio <alanprot@gmail.com> closes #317
This commit is contained in:
gtully 2018-11-07 11:29:14 +00:00
parent 81062fde88
commit 85859fd8dc
4 changed files with 178 additions and 14 deletions

View File

@ -135,7 +135,10 @@ public class PageFile {
// Keeps track of free pages. // Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong(); private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet(); private SequenceSet freeList = new SequenceSet();
private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>(); private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>();
private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>();
private final AtomicLong nextTxid = new AtomicLong(); private final AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file. // Persistent settings stored in the page file.
@ -146,8 +149,6 @@ public class PageFile {
private boolean useLFRUEviction = false; private boolean useLFRUEviction = false;
private float LFUEvictionFactor = 0.2f; private float LFUEvictionFactor = 0.2f;
private boolean needsFreePageRecovery = false;
/** /**
* Use to keep track of updated pages which have not yet been committed. * Use to keep track of updated pages which have not yet been committed.
*/ */
@ -412,7 +413,7 @@ public class PageFile {
} else { } else {
LOG.debug(toString() + ", Recovering page file..."); LOG.debug(toString() + ", Recovering page file...");
nextTxid.set(redoRecoveryUpdates()); nextTxid.set(redoRecoveryUpdates());
needsFreePageRecovery = true; trackingFreeDuringRecovery.set(new SequenceSet());
} }
if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
@ -424,7 +425,7 @@ public class PageFile {
storeMetaData(); storeMetaData();
getFreeFile().delete(); getFreeFile().delete();
startWriter(); startWriter();
if (needsFreePageRecovery) { if (trackingFreeDuringRecovery.get() != null) {
asyncFreePageRecovery(nextFreePageId.get()); asyncFreePageRecovery(nextFreePageId.get());
} }
} else { } else {
@ -478,8 +479,6 @@ public class PageFile {
// allow flush (with index lock held) to merge eventually // allow flush (with index lock held) to merge eventually
recoveredFreeList.lazySet(newFreePages); recoveredFreeList.lazySet(newFreePages);
} }
// all set for clean shutdown
needsFreePageRecovery = false;
} }
private void loadForRecovery(long nextFreePageIdSnap) throws Exception { private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
@ -518,7 +517,7 @@ public class PageFile {
} }
metaData.setLastTxId(nextTxid.get() - 1); metaData.setLastTxId(nextTxid.get() - 1);
if (needsFreePageRecovery) { if (trackingFreeDuringRecovery.get() != null) {
// async recovery incomplete, will have to try again // async recovery incomplete, will have to try again
metaData.setCleanShutdown(false); metaData.setCleanShutdown(false);
} else { } else {
@ -567,14 +566,16 @@ public class PageFile {
throw new IOException("Page file already stopped: checkpointing is not allowed"); throw new IOException("Page file already stopped: checkpointing is not allowed");
} }
SequenceSet toMerge = recoveredFreeList.get(); SequenceSet recovered = recoveredFreeList.get();
if (toMerge != null) { if (recovered != null) {
recoveredFreeList.lazySet(null); recoveredFreeList.lazySet(null);
Sequence seq = toMerge.getHead(); SequenceSet inUse = trackingFreeDuringRecovery.get();
while (seq != null) { recovered.remove(inUse);
freeList.add(seq); freeList.merge(recovered);
seq = seq.getNext();
} // all set for clean shutdown
trackingFreeDuringRecovery.set(null);
inUse.clear();
} }
// Setup a latch that gets notified when all buffered writes hits the disk. // Setup a latch that gets notified when all buffered writes hits the disk.
@ -961,6 +962,11 @@ public class PageFile {
public void freePage(long pageId) { public void freePage(long pageId) {
freeList.add(pageId); freeList.add(pageId);
removeFromCache(pageId); removeFromCache(pageId);
SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get();
if (trackFreeDuringRecovery != null) {
trackFreeDuringRecovery.add(pageId);
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -94,6 +94,30 @@ public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Lo
} }
} }
public void merge(SequenceSet sequenceSet) {
Sequence node = sequenceSet.getHead();
while (node != null) {
add(node);
node = node.getNext();
}
}
public void remove(SequenceSet sequenceSet) {
Sequence node = sequenceSet.getHead();
while (node != null) {
remove(node);
node = node.getNext();
}
}
public void remove(Sequence value) {
for(long i=value.first; i<value.last+1; i++) {
remove(i);
}
}
/** /**
* *
* @param value * @param value

View File

@ -18,7 +18,11 @@ package org.apache.activemq.store.kahadb.disk.page;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,6 +35,7 @@ import java.io.OutputStream;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public class PageFileTest extends TestCase { public class PageFileTest extends TestCase {
@ -355,4 +360,111 @@ public class PageFileTest extends TestCase {
} }
}, 12000000)); }, 12000000));
} }
public void testBackgroundWillMarkUsedPagesAsFreeInTheBeginning() throws Exception {
final int numberOfPages = 100000;
final AtomicBoolean recoveryEnd = new AtomicBoolean();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Recovered pageFile free list")) {
recoveryEnd.set(true);
}
}
};
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(PageFile.class);
log4jLogger.addAppender(appender);
log4jLogger.setLevel(Level.DEBUG);
PageFile pf = new PageFile(new File("target/test-data"), getName());
pf.delete();
pf.setEnableRecoveryFile(false);
pf.load();
List<Long> pagesToFree = new LinkedList<>();
LOG.info("Creating Transactions");
for (int i = 0; i < numberOfPages; i++) {
Transaction tx = pf.tx();
Page page = tx.allocate();
String t = "page:" + i;
page.set(t);
tx.store(page, StringMarshaller.INSTANCE, false);
tx.commit();
if (i >= numberOfPages / 2) {
pagesToFree.add(page.getPageId());
}
}
pf.flush();
LOG.info("Number of free pages:" + pf.getFreePageCount());
//Simulate an unclean shutdown
final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
pf2.setEnableRecoveryFile(false);
pf2.load();
LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount());
Transaction tx = pf2.tx();
for (Long pageId : pagesToFree) {
tx.free(tx.load(pageId, StringMarshaller.INSTANCE));
tx.commit();
}
LOG.info("RecoveredPageFile: Number of free pages Before Reusing:" + pf2.getFreePageCount());
List<Transaction> transactions = new LinkedList<>();
int totalFreePages = numberOfPages / 2;
int totalPages = numberOfPages;
for (int i = 0; i < 20; i++) {
tx = pf2.tx();
Page page = tx.allocate();
String t = "page:" + i;
page.set(t);
tx.store(page, StringMarshaller.INSTANCE, false);
tx.commit();
transactions.add(tx);
// Free pages was already recovered
if (page.getPageId() < numberOfPages) {
totalFreePages--;
} else {
totalPages++;
}
}
LOG.info("RecoveredPageFile: Number of free pages After Reusing:" + pf2.getFreePageCount());
assertTrue("Recovery Finished", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
pf2.flush();
long freePages = pf2.getFreePageCount();
LOG.info("free page count: " + freePages);
return recoveryEnd.get();
}
}, 100000));
LOG.info("RecoveredPageFile: Number of free pages:" + pf2.getFreePageCount());
for (Transaction txConcurrent: transactions) {
for (Page page : txConcurrent) {
assertFalse(pf2.isFreePage(page.pageId));
}
}
// Make sure we dont have leaking pages.
assertEquals(pf2.getFreePageCount(), totalFreePages);
assertEquals(pf2.getPageCount(), totalPages);
assertEquals("pages freed during recovery should be reused", numberOfPages, totalPages);
}
} }

View File

@ -139,6 +139,28 @@ public class SequenceSetTest {
set.remove(10); set.remove(10);
assertEquals(3, set.size()); assertEquals(3, set.size());
assertEquals(97, set.rangeSize()); assertEquals(97, set.rangeSize());
SequenceSet toRemove = new SequenceSet();
toRemove.add(new Sequence(0, 100));
set.remove(toRemove);
assertEquals(0, set.size());
assertEquals(0, set.rangeSize());
}
@Test
public void testMerge() {
SequenceSet set = new SequenceSet();
set.add(new Sequence(0, 100));
SequenceSet set2 = new SequenceSet();
set.add(new Sequence(50, 150));
set.merge(set2);
assertEquals(151, set.rangeSize());
assertEquals(1, set.size());
} }
@Test @Test