Merge branch 'AMQ-7082'

This closes #316

Thanks to Alan Protasio for the patch
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-11-06 08:24:38 -05:00
commit 81062fde88
2 changed files with 113 additions and 11 deletions

View File

@ -425,19 +425,19 @@ public class PageFile {
getFreeFile().delete();
startWriter();
if (needsFreePageRecovery) {
asyncFreePageRecovery();
asyncFreePageRecovery(nextFreePageId.get());
}
} else {
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}
}
private void asyncFreePageRecovery() {
private void asyncFreePageRecovery(final long lastRecoveryPage) {
Thread thread = new Thread("KahaDB Index Free Page Recovery") {
@Override
public void run() {
try {
recoverFreePages();
recoverFreePages(lastRecoveryPage);
} catch (Throwable e) {
if (loaded.get()) {
LOG.warn("Error recovering index free page list", e);
@ -450,7 +450,7 @@ public class PageFile {
thread.start();
}
private void recoverFreePages() throws Exception {
private void recoverFreePages(final long lastRecoveryPage) throws Exception {
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
SequenceSet newFreePages = new SequenceSet();
// need new pageFile instance to get unshared readFile
@ -459,6 +459,11 @@ public class PageFile {
try {
for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
Page page = i.next();
if (page.getPageId() >= lastRecoveryPage) {
break;
}
if (page.getType() == Page.PAGE_FREE_TYPE) {
newFreePages.add(page.getPageId());
}
@ -817,6 +822,9 @@ public class PageFile {
return toOffset(nextFreePageId.get());
}
public boolean isFreePage(long pageId) {
return freeList.contains(pageId);
}
/**
* @return the number of pages allocated in the PageFile
*/

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.store.kahadb.disk.page;
import junit.framework.TestCase;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@ -23,13 +29,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import junit.framework.TestCase;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
@SuppressWarnings("rawtypes")
public class PageFileTest extends TestCase {
@ -261,4 +262,97 @@ public class PageFileTest extends TestCase {
assertEquals(pf.getFreePageCount(), 10);
}
public void testBackgroundRecoveryIsThreadSafe() throws Exception {
PageFile pf = new PageFile(new File("target/test-data"), getName());
pf.delete();
pf.setEnableRecoveryFile(false);
pf.load();
Transaction tx = pf.tx();
tx.allocate(100000);
tx.commit();
LOG.info("Number of free pages:" + pf.getFreePageCount());
pf.flush();
//Simulate an unclean shutdown
final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
pf2.setEnableRecoveryFile(false);
pf2.load();
Transaction tx2 = pf2.tx();
tx2.allocate(100000);
tx2.commit();
LOG.info("Number of free pages:" + pf2.getFreePageCount());
List<Transaction> transactions = new LinkedList<>();
Thread.sleep(500);
LOG.info("Creating Transactions");
for (int i = 0; i < 20; i++) {
Transaction txConcurrent = pf2.tx();
Page page = txConcurrent.allocate();
String t = "page:" + i;
page.set(t);
txConcurrent.store(page, StringMarshaller.INSTANCE, false);
txConcurrent.commit();
transactions.add(txConcurrent);
Thread.sleep(50);
}
assertTrue("We have 199980 free pages", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
pf2.flush();
long freePages = pf2.getFreePageCount();
LOG.info("free page count: " + freePages);
return freePages == 199980;
}
}, 12000000));
for (Transaction txConcurrent2: transactions) {
for (Page page : txConcurrent2) {
assertFalse(pf2.isFreePage(page.pageId));
}
}
}
public void testBackgroundWillNotMarkEaslyPagesAsFree() throws Exception {
PageFile pf = new PageFile(new File("target/test-data"), getName());
pf.delete();
pf.setEnableRecoveryFile(false);
pf.load();
Transaction tx = pf.tx();
tx.allocate(100000);
tx.commit();
LOG.info("Number of free pages:" + pf.getFreePageCount());
pf.flush();
//Simulate an unclean shutdown
final PageFile pf2 = new PageFile(new File("target/test-data"), getName());
pf2.setEnableRecoveryFile(false);
pf2.load();
Transaction tx2 = pf2.tx();
tx2.allocate(200);
tx2.commit();
LOG.info("Number of free pages:" + pf2.getFreePageCount());
Transaction tx3 = pf2.tx();
tx3.allocate(100);
assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
pf2.flush();
long freePages = pf2.getFreePageCount();
LOG.info("free page count: " + freePages);
return freePages == 100100;
}
}, 12000000));
}
}