mirror of https://github.com/apache/activemq.git
AMQ-7082 - recover index free pages in parallel with start, merge in flush, clean shutdown if complete. follow up on AMQ-6590
This commit is contained in:
parent
7518da17ed
commit
c6103415b9
|
@ -134,7 +134,7 @@ public class PageFile {
|
||||||
// Keeps track of free pages.
|
// Keeps track of free pages.
|
||||||
private final AtomicLong nextFreePageId = new AtomicLong();
|
private final AtomicLong nextFreePageId = new AtomicLong();
|
||||||
private SequenceSet freeList = new SequenceSet();
|
private SequenceSet freeList = new SequenceSet();
|
||||||
|
private SequenceSet recoveredFreeList = null;
|
||||||
private final AtomicLong nextTxid = new AtomicLong();
|
private final AtomicLong nextTxid = new AtomicLong();
|
||||||
|
|
||||||
// Persistent settings stored in the page file.
|
// Persistent settings stored in the page file.
|
||||||
|
@ -423,11 +423,70 @@ public class PageFile {
|
||||||
storeMetaData();
|
storeMetaData();
|
||||||
getFreeFile().delete();
|
getFreeFile().delete();
|
||||||
startWriter();
|
startWriter();
|
||||||
|
if (needsFreePageRecovery) {
|
||||||
|
asyncFreePageRecovery();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
|
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void asyncFreePageRecovery() {
|
||||||
|
Thread thread = new Thread("KahaDB Index Free Page Recovery") {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
recoverFreePages();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
if (loaded.get()) {
|
||||||
|
LOG.warn("Error recovering index free page list", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
thread.setPriority(Thread.NORM_PRIORITY);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recoverFreePages() throws Exception {
|
||||||
|
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
|
||||||
|
SequenceSet newFreePages = new SequenceSet();
|
||||||
|
// need new pageFile instance to get unshared readFile
|
||||||
|
PageFile recoveryPageFile = new PageFile(directory, name);
|
||||||
|
recoveryPageFile.loadForRecovery(nextFreePageId.get());
|
||||||
|
try {
|
||||||
|
for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
|
||||||
|
Page page = i.next();
|
||||||
|
if (page.getType() == Page.PAGE_FREE_TYPE) {
|
||||||
|
newFreePages.add(page.getPageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
recoveryPageFile.readFile.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize());
|
||||||
|
if (!newFreePages.isEmpty()) {
|
||||||
|
|
||||||
|
// allow flush (with index lock held) to merge
|
||||||
|
recoveredFreeList = newFreePages;
|
||||||
|
}
|
||||||
|
// all set for clean shutdown
|
||||||
|
needsFreePageRecovery = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
|
||||||
|
loaded.set(true);
|
||||||
|
enablePageCaching = false;
|
||||||
|
File file = getMainPageFile();
|
||||||
|
readFile = new RecoverableRandomAccessFile(file, "r");
|
||||||
|
loadMetaData();
|
||||||
|
pageSize = metaData.getPageSize();
|
||||||
|
enableRecoveryFile = false;
|
||||||
|
nextFreePageId.set(nextFreePageIdSnap);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unloads a previously loaded PageFile. This deallocates OS related resources like file handles.
|
* Unloads a previously loaded PageFile. This deallocates OS related resources like file handles.
|
||||||
|
@ -445,22 +504,6 @@ public class PageFile {
|
||||||
throw new InterruptedIOException();
|
throw new InterruptedIOException();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needsFreePageRecovery) {
|
|
||||||
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
|
|
||||||
freeList = new SequenceSet();
|
|
||||||
loaded.set(true);
|
|
||||||
try {
|
|
||||||
for (Iterator<Page> i = new Transaction(this).iterator(true); i.hasNext(); ) {
|
|
||||||
Page page = i.next();
|
|
||||||
if (page.getType() == Page.PAGE_FREE_TYPE) {
|
|
||||||
freeList.add(page.getPageId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
loaded.set(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (freeList.isEmpty()) {
|
if (freeList.isEmpty()) {
|
||||||
metaData.setFreePages(0);
|
metaData.setFreePages(0);
|
||||||
} else {
|
} else {
|
||||||
|
@ -469,7 +512,12 @@ public class PageFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
metaData.setLastTxId(nextTxid.get() - 1);
|
metaData.setLastTxId(nextTxid.get() - 1);
|
||||||
metaData.setCleanShutdown(true);
|
if (needsFreePageRecovery) {
|
||||||
|
// async recovery incomplete, will have to try again
|
||||||
|
metaData.setCleanShutdown(false);
|
||||||
|
} else {
|
||||||
|
metaData.setCleanShutdown(true);
|
||||||
|
}
|
||||||
storeMetaData();
|
storeMetaData();
|
||||||
|
|
||||||
if (readFile != null) {
|
if (readFile != null) {
|
||||||
|
@ -513,6 +561,16 @@ public class PageFile {
|
||||||
throw new IOException("Page file already stopped: checkpointing is not allowed");
|
throw new IOException("Page file already stopped: checkpointing is not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SequenceSet toMerge = recoveredFreeList;
|
||||||
|
if (toMerge != null) {
|
||||||
|
recoveredFreeList = null;
|
||||||
|
Sequence seq = toMerge.getHead();
|
||||||
|
while (seq != null) {
|
||||||
|
freeList.add(seq);
|
||||||
|
seq = seq.getNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Setup a latch that gets notified when all buffered writes hits the disk.
|
// Setup a latch that gets notified when all buffered writes hits the disk.
|
||||||
CountDownLatch checkpointLatch;
|
CountDownLatch checkpointLatch;
|
||||||
synchronized (writes) {
|
synchronized (writes) {
|
||||||
|
|
|
@ -27,10 +27,15 @@ import java.util.HashSet;
|
||||||
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
|
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public class PageFileTest extends TestCase {
|
public class PageFileTest extends TestCase {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(PageFileTest.class);
|
||||||
|
|
||||||
public void testCRUD() throws IOException {
|
public void testCRUD() throws IOException {
|
||||||
|
|
||||||
PageFile pf = new PageFile(new File("target/test-data"), getName());
|
PageFile pf = new PageFile(new File("target/test-data"), getName());
|
||||||
|
@ -219,12 +224,20 @@ public class PageFileTest extends TestCase {
|
||||||
PageFile pf2 = new PageFile(new File("target/test-data"), getName());
|
PageFile pf2 = new PageFile(new File("target/test-data"), getName());
|
||||||
pf2.setEnableRecoveryFile(false);
|
pf2.setEnableRecoveryFile(false);
|
||||||
pf2.load();
|
pf2.load();
|
||||||
pf2.unload();
|
try {
|
||||||
pf2.load();
|
assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() {
|
||||||
long freePages = pf2.getFreePageCount();
|
@Override
|
||||||
pf.unload();
|
public boolean isSatisified() throws Exception {
|
||||||
|
|
||||||
//Make sure that all 10 pages are still tracked
|
pf2.flush();
|
||||||
assertEquals(10, freePages);
|
long freePages = pf2.getFreePageCount();
|
||||||
|
LOG.info("free page count: " + freePages);
|
||||||
|
return freePages == 10l;
|
||||||
|
}
|
||||||
|
}, 12000000));
|
||||||
|
} finally {
|
||||||
|
pf.unload();
|
||||||
|
pf2.unload();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue