Fix KahaDB index free page recovery on unclean shutdown so that existing
free pages will be tracked and not lost.

(cherry picked from commit 38d85be476)
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-02-02 07:05:28 -05:00
parent 58ab9b6c93
commit 6a87e13eb3
2 changed files with 41 additions and 7 deletions

View File

@ -401,6 +401,8 @@ public class PageFile {
recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
}
boolean needsFreePageRecovery = false;
if (metaData.isCleanShutdown()) {
nextTxid.set(metaData.getLastTxId() + 1);
if (metaData.getFreePages() > 0) {
@ -409,8 +411,16 @@ public class PageFile {
} else {
LOG.debug(toString() + ", Recovering page file...");
nextTxid.set(redoRecoveryUpdates());
needsFreePageRecovery = true;
}
// Scan all to find the free pages.
if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
writeFile.setLength(PAGE_FILE_HEADER_SIZE);
}
nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
if (needsFreePageRecovery) {
// Scan all to find the free pages after nextFreePageId is set
freeList = new SequenceSet();
for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
Page page = i.next();
@ -423,13 +433,7 @@ public class PageFile {
metaData.setCleanShutdown(false);
storeMetaData();
getFreeFile().delete();
if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
writeFile.setLength(PAGE_FILE_HEADER_SIZE);
}
nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
startWriter();
} else {
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}

View File

@ -22,9 +22,11 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.HashSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import junit.framework.TestCase;
@ -199,4 +201,32 @@ public class PageFileTest extends TestCase {
}
assertEquals(expected, actual);
}
//Test for AMQ-6590
public void testFreePageRecoveryUncleanShutdown() throws Exception {
PageFile pf = new PageFile(new File("target/test-data"), getName());
pf.delete();
pf.setEnableRecoveryFile(false);
pf.load();
//Allocate 10 free pages
Transaction tx = pf.tx();
tx.allocate(10);
tx.commit();
pf.flush();
//Load a second instance on the same directory fo the page file which
//simulates an unclean shutdown from the previous run
PageFile pf2 = new PageFile(new File("target/test-data"), getName());
pf2.setEnableRecoveryFile(false);
pf2.load();
long freePages = pf2.getFreePageCount();
pf.unload();
pf2.unload();
//Make sure that all 10 pages are still tracked
assertEquals(10, freePages);
}
}