AMQ-7082 - recover index free pages in parallel with start, merge in flush, clean shutdown if complete. follow up on AMQ-6590

This commit is contained in:
gtully 2018-10-19 16:00:23 +01:00
parent dc36c19c81
commit 79c74998dc
2 changed files with 95 additions and 24 deletions

View File

@ -134,7 +134,7 @@ public class PageFile {
// Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
private SequenceSet recoveredFreeList = null;
private final AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file.
@ -423,11 +423,70 @@ public class PageFile {
storeMetaData();
getFreeFile().delete();
startWriter();
if (needsFreePageRecovery) {
asyncFreePageRecovery();
}
} else {
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}
}
private void asyncFreePageRecovery() {
Thread thread = new Thread("KahaDB Index Free Page Recovery") {
@Override
public void run() {
try {
recoverFreePages();
} catch (Throwable e) {
if (loaded.get()) {
LOG.warn("Error recovering index free page list", e);
}
}
}
};
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(true);
thread.start();
}
private void recoverFreePages() throws Exception {
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
SequenceSet newFreePages = new SequenceSet();
// need new pageFile instance to get unshared readFile
PageFile recoveryPageFile = new PageFile(directory, name);
recoveryPageFile.loadForRecovery(nextFreePageId.get());
try {
for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
Page page = i.next();
if (page.getType() == Page.PAGE_FREE_TYPE) {
newFreePages.add(page.getPageId());
}
}
} finally {
recoveryPageFile.readFile.close();
}
LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize());
if (!newFreePages.isEmpty()) {
// allow flush (with index lock held) to merge
recoveredFreeList = newFreePages;
}
// all set for clean shutdown
needsFreePageRecovery = false;
}
private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
loaded.set(true);
enablePageCaching = false;
File file = getMainPageFile();
readFile = new RecoverableRandomAccessFile(file, "r");
loadMetaData();
pageSize = metaData.getPageSize();
enableRecoveryFile = false;
nextFreePageId.set(nextFreePageIdSnap);
}
/**
* Unloads a previously loaded PageFile. This deallocates OS related resources like file handles.
@ -445,22 +504,6 @@ public class PageFile {
throw new InterruptedIOException();
}
if (needsFreePageRecovery) {
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
freeList = new SequenceSet();
loaded.set(true);
try {
for (Iterator<Page> i = new Transaction(this).iterator(true); i.hasNext(); ) {
Page page = i.next();
if (page.getType() == Page.PAGE_FREE_TYPE) {
freeList.add(page.getPageId());
}
}
} finally {
loaded.set(false);
}
}
if (freeList.isEmpty()) {
metaData.setFreePages(0);
} else {
@ -469,7 +512,12 @@ public class PageFile {
}
metaData.setLastTxId(nextTxid.get() - 1);
metaData.setCleanShutdown(true);
if (needsFreePageRecovery) {
// async recovery incomplete, will have to try again
metaData.setCleanShutdown(false);
} else {
metaData.setCleanShutdown(true);
}
storeMetaData();
if (readFile != null) {
@ -513,6 +561,16 @@ public class PageFile {
throw new IOException("Page file already stopped: checkpointing is not allowed");
}
SequenceSet toMerge = recoveredFreeList;
if (toMerge != null) {
recoveredFreeList = null;
Sequence seq = toMerge.getHead();
while (seq != null) {
freeList.add(seq);
seq = seq.getNext();
}
}
// Setup a latch that gets notified when all buffered writes hits the disk.
CountDownLatch checkpointLatch;
synchronized (writes) {

View File

@ -27,10 +27,15 @@ import java.util.HashSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import junit.framework.TestCase;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
public class PageFileTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(PageFileTest.class);
public void testCRUD() throws IOException {
PageFile pf = new PageFile(new File("target/test-data"), getName());
@ -219,12 +224,20 @@ public class PageFileTest extends TestCase {
PageFile pf2 = new PageFile(new File("target/test-data"), getName());
pf2.setEnableRecoveryFile(false);
pf2.load();
pf2.unload();
pf2.load();
long freePages = pf2.getFreePageCount();
pf.unload();
try {
assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
//Make sure that all 10 pages are still tracked
assertEquals(10, freePages);
pf2.flush();
long freePages = pf2.getFreePageCount();
LOG.info("free page count: " + freePages);
return freePages == 10l;
}
}, 12000000));
} finally {
pf.unload();
pf2.unload();
}
}
}