https://issues.apache.org/jira/browse/AMQ-3434: Contention in PLIist creation results in NPE on load - FilePendingMessageCursor. Resolve contention on creation, tidy up ListIndex iterator remove and plist release, additional test that stresses contention such that it can reproduce the stomp load test scenario

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1153420 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-08-03 10:18:18 +00:00
parent 24cd2b3f29
commit bf59b7d70f
10 changed files with 379 additions and 166 deletions

View File

@ -133,6 +133,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
@Override @Override
public synchronized void release() { public synchronized void release() {
iterating = false; iterating = false;
if (iter instanceof DiskIterator) {
((DiskIterator)iter).release();
};
if (flushRequired) { if (flushRequired) {
flushRequired = false; flushRequired = false;
if (!hasSpace()) { if (!hasSpace()) {
@ -417,7 +420,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
protected synchronized void flushToDisk() { protected synchronized void flushToDisk() {
if (!memoryList.isEmpty()) { if (!memoryList.isEmpty() && store != null) {
long start = 0; long start = 0;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
@ -483,7 +486,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
final class DiskIterator implements Iterator<MessageReference> { final class DiskIterator implements Iterator<MessageReference> {
private final Iterator<PListEntry> iterator; private final PList.PListIterator iterator;
DiskIterator() { DiskIterator() {
try { try {
iterator = getDiskList().iterator(); iterator = getDiskList().iterator();
@ -510,5 +513,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
iterator.remove(); iterator.remove();
} }
public void release() {
iterator.release();
}
} }
} }

View File

@ -26,7 +26,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.kahadb.index.ListIndex; import org.apache.kahadb.index.ListIndex;
import org.apache.kahadb.index.ListNode;
import org.apache.kahadb.journal.Location; import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction; import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.ByteSequence;
@ -58,11 +57,11 @@ public class PList extends ListIndex<String, Location> {
} }
void read(DataInput in) throws IOException { void read(DataInput in) throws IOException {
this.headPageId = in.readLong(); setHeadPageId(in.readLong());
} }
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
out.writeLong(this.headPageId); out.writeLong(getHeadPageId());
} }
public synchronized void destroy() throws IOException { public synchronized void destroy() throws IOException {
@ -185,17 +184,19 @@ public class PList extends ListIndex<String, Location> {
return size() == 0; return size() == 0;
} }
synchronized public Iterator<PListEntry> iterator() throws IOException { public PListIterator iterator() throws IOException {
return new PListIterator(); return new PListIterator();
} }
private final class PListIterator implements Iterator<PListEntry> { public final class PListIterator implements Iterator<PListEntry> {
final Iterator<Map.Entry<String, Location>> iterator; final Iterator<Map.Entry<String, Location>> iterator;
final Transaction tx; final Transaction tx;
PListIterator() throws IOException { PListIterator() throws IOException {
tx = store.pageFile.tx(); tx = store.pageFile.tx();
this.iterator = iterator(tx); synchronized (indexLock) {
this.iterator = iterator(tx);
}
} }
@Override @Override
@ -234,6 +235,16 @@ public class PList extends ListIndex<String, Location> {
throw e; throw e;
} }
} }
public void release() {
try {
tx.rollback();
} catch (IOException unexpected) {
IllegalStateException e = new IllegalStateException(unexpected);
e.initCause(unexpected);
throw e;
}
}
} }
public void claimFileLocations(final Set<Integer> candidates) throws IOException { public void claimFileLocations(final Set<Integer> candidates) throws IOException {
@ -254,6 +265,6 @@ public class PList extends ListIndex<String, Location> {
@Override @Override
public String toString() { public String toString() {
return "" + name + ",[headPageId=" + headPageId + ",tailPageId=" + tailPageId + ", size=" + size() + "]"; return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
} }
} }

View File

@ -76,6 +76,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE; private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE; private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
private boolean indexEnablePageCaching = true;
public Object getIndexLock() { public Object getIndexLock() {
return indexLock; return indexLock;
@ -110,6 +111,14 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
this.indexWriteBatchSize = indexWriteBatchSize; this.indexWriteBatchSize = indexWriteBatchSize;
} }
public boolean getIndexEnablePageCaching() {
return indexEnablePageCaching;
}
public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
this.indexEnablePageCaching = indexEnablePageCaching;
}
protected class MetaData { protected class MetaData {
protected MetaData(PListStore store) { protected MetaData(PListStore store) {
this.store = store; this.store = store;
@ -223,10 +232,10 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
result = pl; result = pl;
this.persistentLists.put(name, pl); this.persistentLists.put(name, pl);
} }
final PList load = result; final PList toLoad = result;
getPageFile().tx().execute(new Transaction.Closure<IOException>() { getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
load.load(tx); toLoad.load(tx);
} }
}); });
@ -269,6 +278,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
this.journal.start(); this.journal.start();
this.pageFile = new PageFile(directory, "tmpDB"); this.pageFile = new PageFile(directory, "tmpDB");
this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
this.pageFile.setPageSize(getIndexPageSize()); this.pageFile.setPageSize(getIndexPageSize());
this.pageFile.setWriteBatchSize(getIndexWriteBatchSize()); this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
this.pageFile.setPageCacheSize(getIndexCacheSize()); this.pageFile.setPageCacheSize(getIndexCacheSize());
@ -340,12 +350,21 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
public void run() { public void run() {
try { try {
final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
final Set<Integer> candidates = journal.getFileMap().keySet(); final Set<Integer> candidates = journal.getFileMap().keySet();
LOG.trace("Full gc candidate set:" + candidates); LOG.trace("Full gc candidate set:" + candidates);
if (candidates.size() > 1) { if (candidates.size() > 1) {
// prune current write
for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
if (iterator.next() >= lastJournalFileId) {
iterator.remove();
}
}
List<PList> plists = null; List<PList> plists = null;
synchronized (this) { synchronized (indexLock) {
plists = new ArrayList(persistentLists.values()); synchronized (this) {
plists = new ArrayList(persistentLists.values());
}
} }
for (PList list : plists) { for (PList list : plists) {
list.claimFileLocations(candidates); list.claimFileLocations(candidates);

View File

@ -18,11 +18,11 @@ package org.apache.activemq.store.kahadb.plist;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -44,8 +44,9 @@ public class PListTest {
private PListStore store; private PListStore store;
private PList plist; private PList plist;
final ByteSequence payload = new ByteSequence(new byte[400]); final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed"); final String idSeed = new String("Seed" + new byte[1024]);
final Vector<Throwable> exceptions = new Vector<Throwable>(); final Vector<Throwable> exceptions = new Vector<Throwable>();
ExecutorService executor;
@Test @Test
@ -147,6 +148,17 @@ public class PListTest {
} }
@Test
public void testRemoveSingleEntry() throws Exception {
plist.addLast("First", new ByteSequence("A".getBytes()));
Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext()) {
PListEntry v = iterator.next();
iterator.remove();
}
}
@Test @Test
public void testRemoveSecondPosition() throws Exception { public void testRemoveSecondPosition() throws Exception {
plist.addLast("First", new ByteSequence("A".getBytes())); plist.addLast("First", new ByteSequence("A".getBytes()));
@ -154,7 +166,7 @@ public class PListTest {
assertTrue(plist.remove(1)); assertTrue(plist.remove(1));
assertTrue(plist.remove(0)); assertTrue(plist.remove(0));
assertFalse(plist.remove(3)); assertFalse(plist.remove(0));
} }
@ -165,36 +177,47 @@ public class PListTest {
IOHelper.mkdirs(directory); IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory); IOHelper.deleteChildren(directory);
store = new PListStore(); store = new PListStore();
store.setCleanupInterval(400);
store.setDirectory(directory); store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5); store.setJournalMaxFileLength(1024*5);
store.start(); store.start();
final ByteSequence payload = new ByteSequence(new byte[1024*4]); final ByteSequence payload = new ByteSequence(new byte[1024*2]);
final Vector<Throwable> exceptions = new Vector<Throwable>(); final Vector<Throwable> exceptions = new Vector<Throwable>();
final int iterations = 1000; final int iterations = 5000;
final int numLists = 10; final int numLists = 10;
final PList[] lists = new PList[numLists]; final PList[] lists = new PList[numLists];
String threadName = Thread.currentThread().getName();
for (int i=0; i<numLists; i++) { for (int i=0; i<numLists; i++) {
lists[i] = store.getPList("List" + i); Thread.currentThread().setName("C:"+String.valueOf(i));
lists[i] = store.getPList(String.valueOf(i));
} }
Thread.currentThread().setName(threadName);
ExecutorService executor = Executors.newFixedThreadPool(100); executor = Executors.newFixedThreadPool(100);
class A implements Runnable { class A implements Runnable {
@Override @Override
public void run() { public void run() {
final String threadName = Thread.currentThread().getName();
try { try {
for (int i=0; i<iterations; i++) { for (int i=0; i<iterations; i++) {
PList candidate = lists[i%numLists]; PList candidate = lists[i%numLists];
candidate.addLast(String.valueOf(i), payload); Thread.currentThread().setName("ALRF:"+candidate.getName());
PListEntry entry = candidate.getFirst(); synchronized (plistLocks(candidate)) {
assertTrue(candidate.remove(String.valueOf(i))); candidate.addLast(String.valueOf(i), payload);
PListEntry entry = candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
} }
} catch (Exception error) { } catch (Exception error) {
LOG.error("Unexpcted ex", error);
error.printStackTrace(); error.printStackTrace();
exceptions.add(error); exceptions.add(error);
} finally {
Thread.currentThread().setName(threadName);
} }
} }
}; };
@ -202,16 +225,22 @@ public class PListTest {
class B implements Runnable { class B implements Runnable {
@Override @Override
public void run() { public void run() {
final String threadName = Thread.currentThread().getName();
try { try {
for (int i=0; i<iterations; i++) { for (int i=0; i<iterations; i++) {
PList candidate = lists[i%numLists]; PList candidate = lists[i%numLists];
candidate.addLast(String.valueOf(i), payload); Thread.currentThread().setName("ALRF:"+candidate.getName());
PListEntry entry = candidate.getFirst(); synchronized (plistLocks(candidate)) {
assertTrue(candidate.remove(String.valueOf(i))); candidate.addLast(String.valueOf(i), payload);
PListEntry entry = candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
} }
} catch (Exception error) { } catch (Exception error) {
error.printStackTrace(); error.printStackTrace();
exceptions.add(error); exceptions.add(error);
} finally {
Thread.currentThread().setName(threadName);
} }
} }
}; };
@ -244,7 +273,7 @@ public class PListTest {
final int numThreads = 20; final int numThreads = 20;
final int iterations = 2000; final int iterations = 2000;
ExecutorService executor = Executors.newFixedThreadPool(100); executor = Executors.newFixedThreadPool(100);
for (int i=0; i<numThreads; i++) { for (int i=0; i<numThreads; i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run(); new Job(i, PListTest.TaskType.ADD, iterations).run();
} }
@ -333,7 +362,7 @@ public class PListTest {
} }
LOG.info("parallel add and remove"); LOG.info("parallel add and remove");
ExecutorService executor = Executors.newFixedThreadPool(numLists*2); executor = Executors.newFixedThreadPool(numLists*2);
for (int i=0; i<numLists*2; i++) { for (int i=0; i<numLists*2; i++) {
executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations)); executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
} }
@ -344,7 +373,72 @@ public class PListTest {
assertTrue("no exceptions", exceptions.isEmpty()); assertTrue("no exceptions", exceptions.isEmpty());
} }
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE} // for non determinant issues, increasing this may help diagnose
final int numRepeats = 1;
@Test
public void testRepeatStressWithCache() throws Exception {
for (int i=0; i<numRepeats;i++) {
do_testConcurrentAddIterateRemove(true);
}
}
@Test
public void testRepeatStressWithOutCache() throws Exception {
for (int i=0; i<numRepeats;i++) {
do_testConcurrentAddIterateRemove(false);
}
}
public void do_testConcurrentAddIterateRemove(boolean enablePageCache) throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setIndexEnablePageCaching(enablePageCache);
store.setIndexPageSize(2*1024);
store.setDirectory(directory);
store.start();
final int iterations = 5000;
final int numLists = 50;
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.CREATE, iterations).run();
}
LOG.info("fill");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
LOG.info("parallel add and remove");
executor = Executors.newFixedThreadPool(400);
final int numProducer = 5;
final int numConsumer = 50;
for (int i=0; i<numLists; i++) {
for (int j=0; j<numProducer; j++) {
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations*2));
}
for (int k=0;k<numConsumer; k++) {
executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations/4));
}
}
for (int i=numLists; i<numLists*10; i++) {
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
}
executor.shutdown();
LOG.info("wait for parallel work to complete");
boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
assertTrue("test did not timeout ", shutdown);
assertTrue("no exceptions", exceptions.isEmpty());
}
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
class Job implements Runnable { class Job implements Runnable {
@ -360,54 +454,104 @@ public class PListTest {
@Override @Override
public void run() { public void run() {
final String threadName = Thread.currentThread().getName();
try { try {
PList plist = null; PList plist = null;
switch (task) { switch (task) {
case CREATE: case CREATE:
plist = store.getPList("List-" + id); Thread.currentThread().setName("C:"+id);
plist = store.getPList(String.valueOf(id));
LOG.info("Job-" + id + ", CREATE");
break; break;
case DELETE: case DELETE:
store.removePList("List-" + id); Thread.currentThread().setName("D:"+id);
store.removePList(String.valueOf(id));
break; break;
case ADD: case ADD:
plist = store.getPList("List-" + id); Thread.currentThread().setName("A:"+id);
plist = store.getPList(String.valueOf(id));
for (int j = 0; j < iterations; j++) { for (int j = 0; j < iterations; j++) {
plist.addLast(idSeed + "id" + j, payload); synchronized (plistLocks(plist)) {
if (j > 0 && j % (iterations / 2) == 0) { plist.addLast ("PL>" + id + idSeed + "-" + j, payload);
LOG.info("Job-" + id + ", Done: " + j);
} }
} }
LOG.info("Job-" + id + ", Add, done: " + iterations);
break; break;
case REMOVE: case REMOVE:
plist = store.getPList("List-" + id); Thread.currentThread().setName("R:"+id);
plist = store.getPList(String.valueOf(id));
synchronized (plistLocks(plist)) {
for (int j = iterations -1; j >= 0; j--) { for (int j = iterations -1; j >= 0; j--) {
plist.remove(idSeed + "id" + j); plist.remove("PL>" + id + idSeed + "-" + j);
if (j > 0 && j % (iterations / 2) == 0) { if (j > 0 && j % (iterations / 2) == 0) {
LOG.info("Job-" + id + " Done remove: " + j); LOG.info("Job-" + id + " Done remove: " + j);
}
} }
} }
break; break;
case ITERATE: case ITERATE:
plist = store.getPList("List-" + id); Thread.currentThread().setName("I:"+id);
plist = store.getPList(String.valueOf(id));
Iterator<PListEntry> iterator = plist.iterator(); synchronized (plistLocks(plist)) {
PListEntry element = null; Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext()) { PListEntry element = null;
element = iterator.next(); while (iterator.hasNext()) {
element = iterator.next();
}
} }
break; break;
case ITERATE_REMOVE:
Thread.currentThread().setName("IRM:"+id);
plist = store.getPList(String.valueOf(id));
int removeCount = 0;
synchronized (plistLocks(plist)) {
Iterator<PListEntry> removeIterator = plist.iterator();
PListEntry v = null;
while (removeIterator.hasNext()) {
v = removeIterator.next();
removeIterator.remove();
if (removeCount++ > iterations) {
break;
}
}
}
LOG.info("Job-" + id + " Done remove: " + removeCount);
break;
default: default:
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
exceptions.add(e); exceptions.add(e);
executor.shutdownNow();
} finally {
Thread.currentThread().setName(threadName);
} }
} }
} }
Map<PList, Object> locks = new HashMap<PList, Object>();
private Object plistLocks(PList plist) {
Object lock = null;
synchronized (locks) {
if (locks.containsKey(plist)) {
lock = locks.get(plist);
} else {
lock = new Object();
locks.put(plist, lock);
}
}
return lock;
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
File directory = new File("target/test/PlistDB"); File directory = new File("target/test/PlistDB");
@ -421,7 +565,7 @@ public class PListTest {
store = new PListStore(); store = new PListStore();
store.setDirectory(directory); store.setDirectory(directory);
store.start(); store.start();
plist = store.getPList("test"); plist = store.getPList("main");
} }
@After @After

View File

@ -32,7 +32,7 @@ import org.apache.kahadb.util.Marshaller;
public class ListIndex<Key,Value> implements Index<Key,Value> { public class ListIndex<Key,Value> implements Index<Key,Value> {
private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class); private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class);
public final static long NOT_SET = -1;
protected PageFile pageFile; protected PageFile pageFile;
protected long headPageId; protected long headPageId;
protected long tailPageId; protected long tailPageId;
@ -40,7 +40,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
protected AtomicBoolean loaded = new AtomicBoolean(); protected AtomicBoolean loaded = new AtomicBoolean();
private final ListNode.Marshaller<Key, Value> marshaller = new ListNode.Marshaller<Key, Value>(this); private ListNode.NodeMarshaller<Key, Value> marshaller;
private Marshaller<Key> keyMarshaller; private Marshaller<Key> keyMarshaller;
private Marshaller<Value> valueMarshaller; private Marshaller<Value> valueMarshaller;
@ -49,7 +49,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
public ListIndex(PageFile pageFile, long headPageId) { public ListIndex(PageFile pageFile, long headPageId) {
this.pageFile = pageFile; this.pageFile = pageFile;
this.headPageId = headPageId; setHeadPageId(headPageId);
} }
synchronized public void load(Transaction tx) throws IOException { synchronized public void load(Transaction tx) throws IOException {
@ -62,19 +62,22 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex"); throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex");
} }
final Page<ListNode<Key,Value>> p = tx.load(headPageId, null); marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller);
final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null);
if( p.getType() == Page.PAGE_FREE_TYPE ) { if( p.getType() == Page.PAGE_FREE_TYPE ) {
// Need to initialize it.. // Need to initialize it..
ListNode<Key, Value> root = createNode(p); ListNode<Key, Value> root = createNode(p);
storeNode(tx, root, true); storeNode(tx, root, true);
tailPageId = headPageId = p.getPageId(); setHeadPageId(p.getPageId());
setTailPageId(getHeadPageId());
} else { } else {
ListNode<Key, Value> node = loadNode(tx, headPageId); ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
setTailPageId(getHeadPageId());
size.addAndGet(node.size(tx)); size.addAndGet(node.size(tx));
while (node.getNext() != -1) { while (node.getNext() != NOT_SET ) {
node = loadNode(tx, node.getNext()); node = loadNode(tx, node.getNext());
size.addAndGet(node.size(tx)); size.addAndGet(node.size(tx));
tailPageId = node.getPageId(); setTailPageId(node.getPageId());
} }
} }
} }
@ -86,11 +89,11 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
} }
protected ListNode<Key,Value> getHead(Transaction tx) throws IOException { protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
return loadNode(tx, headPageId); return loadNode(tx, getHeadPageId());
} }
protected ListNode<Key,Value> getTail(Transaction tx) throws IOException { protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
return loadNode(tx, tailPageId); return loadNode(tx, getTailPageId());
} }
synchronized public boolean containsKey(Transaction tx, Key key) throws IOException { synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
@ -201,25 +204,23 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller); Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
ListNode<Key, Value> node = page.get(); ListNode<Key, Value> node = page.get();
node.setPage(page); node.setPage(page);
node.setContainingList(this);
return node; return node;
} }
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException { ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(this); ListNode<Key,Value> node = new ListNode<Key,Value>();
node.setPage(page); node.setPage(page);
page.set(node); page.set(node);
node.setContainingList(this);
return node; return node;
} }
ListNode<Key,Value> createNode(Transaction tx) throws IOException { public ListNode<Key,Value> createNode(Transaction tx) throws IOException {
Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), null); return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null));
ListNode<Key,Value> node = new ListNode<Key,Value>(this);
node.setPage(page);
page.set(node);
return node;
} }
void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException { public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
tx.store(node.getPage(), marshaller, overflow); tx.store(node.getPage(), marshaller, overflow);
} }
@ -257,6 +258,10 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
this.tailPageId = tailPageId; this.tailPageId = tailPageId;
} }
public long getTailPageId() {
return tailPageId;
}
public long size() { public long size() {
return size.get(); return size.get();
} }

View File

@ -26,6 +26,7 @@ import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction; import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.LinkedNode; import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList; import org.apache.kahadb.util.LinkedNodeList;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.VariableMarshaller; import org.apache.kahadb.util.VariableMarshaller;
/** /**
@ -35,22 +36,24 @@ import org.apache.kahadb.util.VariableMarshaller;
public final class ListNode<Key,Value> { public final class ListNode<Key,Value> {
private final static boolean ADD_FIRST = true; private final static boolean ADD_FIRST = true;
private final static boolean ADD_LAST = false; private final static boolean ADD_LAST = false;
private final static long NOT_SET = -1;
// The index that this node is part of. // The index that this node is part of.
private final ListIndex<Key,Value> index; private ListIndex<Key,Value> containingList;
// The page associated with this node // The page associated with this node
private Page<ListNode<Key,Value>> page; private Page<ListNode<Key,Value>> page;
protected LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>(); private LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>() {
@Override
public String toString() {
return "PageId:" + page.getPageId() + ", index:" + containingList + super.toString();
}
};
// The next page after this one. // The next page after this one.
private long next = NOT_SET; private long next = ListIndex.NOT_SET;
public int size(Transaction tx) {
return entries.size();
}
static final class KeyValueEntry<Key, Value> extends LinkedNode<KeyValueEntry<Key, Value>> implements Entry<Key, Value> static final class KeyValueEntry<Key, Value> extends LinkedNode<KeyValueEntry<Key, Value>> implements Entry<Key, Value>
{ {
@ -83,11 +86,13 @@ public final class ListNode<Key,Value> {
private final class ListNodeIterator implements Iterator<ListNode<Key,Value>> { private final class ListNodeIterator implements Iterator<ListNode<Key,Value>> {
private final Transaction tx; private final Transaction tx;
private final ListIndex<Key,Value> index;
ListNode<Key,Value> nextEntry; ListNode<Key,Value> nextEntry;
private ListNodeIterator(Transaction tx, ListNode<Key,Value> current) throws IOException { private ListNodeIterator(Transaction tx, ListNode<Key,Value> current) {
this.tx = tx; this.tx = tx;
nextEntry = current; nextEntry = current;
index = current.getContainingList();
} }
public boolean hasNext() { public boolean hasNext() {
@ -96,8 +101,8 @@ public final class ListNode<Key,Value> {
public ListNode<Key,Value> next() { public ListNode<Key,Value> next() {
ListNode<Key,Value> current = nextEntry; ListNode<Key,Value> current = nextEntry;
if( nextEntry !=null ) { if( current !=null ) {
if (nextEntry.next != NOT_SET) { if (current.next != ListIndex.NOT_SET) {
try { try {
nextEntry = index.loadNode(tx, current.next); nextEntry = index.loadNode(tx, current.next);
} catch (IOException unexpected) { } catch (IOException unexpected) {
@ -120,64 +125,96 @@ public final class ListNode<Key,Value> {
private final class ListIterator implements Iterator<Entry<Key, Value>> { private final class ListIterator implements Iterator<Entry<Key, Value>> {
private final Transaction tx; private final Transaction tx;
ListNode<Key,Value> current, prev; private final ListIndex<Key,Value> targetList;
ListNode<Key,Value> currentNode, previousNode;
KeyValueEntry<Key, Value> nextEntry; KeyValueEntry<Key, Value> nextEntry;
KeyValueEntry<Key, Value> toRemove; KeyValueEntry<Key, Value> entryToRemove;
private ListIterator(Transaction tx, ListNode<Key,Value> current, long nextIndex) throws IOException { private ListIterator(Transaction tx, ListNode<Key,Value> current, long start) {
this.tx = tx; this.tx = tx;
this.current = current; this.currentNode = current;
this.targetList = current.getContainingList();
nextEntry = current.entries.getHead(); nextEntry = current.entries.getHead();
if (nextIndex > 0 && nextEntry != null) { if (start > 0) {
for (long i=0; i<nextIndex; i++) { moveToRequestedStart(start);
nextEntry = nextEntry.getNext();
if (nextEntry == null) {
if (!nextFromNextListNode())
throw new NoSuchElementException("Index out of range: " + nextIndex);
}
}
}
} }
}
private boolean nextFromNextListNode() { private void moveToRequestedStart(final long start) {
boolean haveNext = false; long count = 0;
if (current.getNext() != NOT_SET) { while (hasNext() && count < start) {
next();
count++;
}
if (!hasNext()) {
throw new NoSuchElementException("Index " + start + " out of current range: " + count);
}
}
private KeyValueEntry<Key, Value> getFromNextNode() {
KeyValueEntry<Key, Value> result = null;
if (currentNode.getNext() != ListIndex.NOT_SET) {
try { try {
prev = current; previousNode = currentNode;
current = index.loadNode(tx, current.getNext()); currentNode = targetList.loadNode(tx, currentNode.getNext());
} catch (IOException unexpected) { } catch (IOException unexpected) {
NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage()); NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
e.initCause(unexpected); e.initCause(unexpected);
throw e; throw e;
} }
nextEntry = current.entries.getHead(); result = currentNode.entries.getHead();
haveNext = nextEntry != null;
} }
return haveNext; return result;
} }
public boolean hasNext() { public boolean hasNext() {
return nextEntry !=null || nextFromNextListNode(); if (nextEntry == null) {
nextEntry = getFromNextNode();
}
return nextEntry != null;
} }
public Entry<Key, Value> next() { public Entry<Key, Value> next() {
if( nextEntry !=null ) { if( nextEntry !=null ) {
toRemove = nextEntry; entryToRemove = nextEntry;
nextEntry=toRemove.getNext(); nextEntry = entryToRemove.getNext();
return toRemove; return entryToRemove;
} else { } else {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
} }
public void remove() { public void remove() {
if (toRemove == null) { if (entryToRemove == null) {
throw new IllegalStateException("can only remove once, call next again"); throw new IllegalStateException("can only remove once, call hasNext();next() again");
} }
try { try {
doRemove(tx, current, prev, toRemove); entryToRemove.unlink();
index.onRemove(); entryToRemove = null;
toRemove = null; ListNode<Key,Value> toRemoveNode = null;
if (currentNode.entries.isEmpty()) {
// may need to free this node
if (currentNode.isHead() && currentNode.isTail()) {
// store empty list
} else if (currentNode.isHead()) {
// new head
toRemoveNode = currentNode;
nextEntry = getFromNextNode();
targetList.setHeadPageId(currentNode.getPageId());
} else if (currentNode.isTail()) {
toRemoveNode = currentNode;
previousNode.setNext(ListIndex.NOT_SET);
previousNode.store(tx);
targetList.setTailPageId(previousNode.getPageId());
}
}
targetList.onRemove();
if (toRemoveNode != null) {
tx.free(toRemoveNode.getPage());
} else {
currentNode.store(tx);
}
} catch (IOException unexpected) { } catch (IOException unexpected) {
IllegalStateException e = new IllegalStateException(unexpected.getLocalizedMessage()); IllegalStateException e = new IllegalStateException(unexpected.getLocalizedMessage());
e.initCause(unexpected); e.initCause(unexpected);
@ -192,11 +229,13 @@ public final class ListNode<Key,Value> {
* @param <Key> * @param <Key>
* @param <Value> * @param <Value>
*/ */
static public class Marshaller<Key,Value> extends VariableMarshaller<ListNode<Key,Value>> { static public final class NodeMarshaller<Key,Value> extends VariableMarshaller<ListNode<Key,Value>> {
private final ListIndex<Key,Value> index; private final Marshaller<Key> keyMarshaller;
private final Marshaller<Value> valueMarshaller;
public Marshaller(ListIndex<Key,Value> index) { public NodeMarshaller(Marshaller<Key> keyMarshaller, Marshaller<Value> valueMarshaller) {
this.index = index; this.keyMarshaller = keyMarshaller;
this.valueMarshaller = valueMarshaller;
} }
public void writePayload(ListNode<Key,Value> node, DataOutput os) throws IOException { public void writePayload(ListNode<Key,Value> node, DataOutput os) throws IOException {
@ -209,58 +248,31 @@ public final class ListNode<Key,Value> {
os.writeShort(count); os.writeShort(count);
KeyValueEntry<Key, Value> entry = node.entries.getHead(); KeyValueEntry<Key, Value> entry = node.entries.getHead();
while (entry != null) { while (entry != null) {
index.getKeyMarshaller().writePayload((Key) entry.getKey(), os); keyMarshaller.writePayload((Key) entry.getKey(), os);
index.getValueMarshaller().writePayload((Value) entry.getValue(), os); valueMarshaller.writePayload((Value) entry.getValue(), os);
entry = entry.getNext(); entry = entry.getNext();
} }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public ListNode<Key,Value> readPayload(DataInput is) throws IOException { public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(index); ListNode<Key,Value> node = new ListNode<Key,Value>();
node.next = is.readLong(); node.next = is.readLong();
final short size = is.readShort(); final short size = is.readShort();
for (short i = 0; i < size; i++) { for (short i = 0; i < size; i++) {
node.entries.addLast( node.entries.addLast(
new KeyValueEntry(index.getKeyMarshaller().readPayload(is), new KeyValueEntry(keyMarshaller.readPayload(is),
index.getValueMarshaller().readPayload(is))); valueMarshaller.readPayload(is)));
} }
return node; return node;
} }
} }
public ListNode(ListIndex<Key, Value> index) {
this.index = index;
}
private void doRemove(final Transaction tx, final ListNode current, final ListNode prev, KeyValueEntry<Key, Value> entry) throws IOException {
entry.unlink();
if (current.entries.isEmpty()) {
if (current.getPageId() == index.getHeadPageId()) {
if (current.getNext() != NOT_SET) {
// new head
index.setHeadPageId(current.getNext());
tx.free(current.getPageId());
} else {
// store current in empty state
store(tx);
}
} else {
// need to unlink the node
prev.setNext(current.next);
index.storeNode(tx, prev, false);
tx.free(current.getPageId());
}
} else {
store(tx);
}
}
public Value put(Transaction tx, Key key, Value value) throws IOException { public Value put(Transaction tx, Key key, Value value) throws IOException {
if (key == null) { if (key == null) {
throw new IllegalArgumentException("Key cannot be null"); throw new IllegalArgumentException("Key cannot be null");
} }
entries.addLast(new KeyValueEntry(key, value)); entries.addLast(new KeyValueEntry<Key, Value>(key, value));
store(tx, ADD_LAST); store(tx, ADD_LAST);
return null; return null;
} }
@ -269,14 +281,14 @@ public final class ListNode<Key,Value> {
if (key == null) { if (key == null) {
throw new IllegalArgumentException("Key cannot be null"); throw new IllegalArgumentException("Key cannot be null");
} }
entries.addFirst(new KeyValueEntry(key, value)); entries.addFirst(new KeyValueEntry<Key, Value>(key, value));
store(tx, ADD_FIRST); store(tx, ADD_FIRST);
return null; return null;
} }
private void store(Transaction tx, boolean addFirst) throws IOException { private void store(Transaction tx, boolean addFirst) throws IOException {
try { try {
index.storeNode(tx, this, false); getContainingList().storeNode(tx, this, false);
} catch ( Transaction.PageOverflowIOException e ) { } catch ( Transaction.PageOverflowIOException e ) {
// If we get an overflow // If we get an overflow
split(tx, addFirst); split(tx, addFirst);
@ -284,23 +296,23 @@ public final class ListNode<Key,Value> {
} }
private void store(Transaction tx) throws IOException { private void store(Transaction tx) throws IOException {
index.storeNode(tx, this, false); getContainingList().storeNode(tx, this, false);
} }
private void split(Transaction tx, boolean isAddFirst) throws IOException { private void split(Transaction tx, boolean isAddFirst) throws IOException {
ListNode<Key, Value> extension = index.createNode(tx); ListNode<Key, Value> extension = getContainingList().createNode(tx);
if (isAddFirst) { if (isAddFirst) {
// head keeps the first entry, insert extension with the rest // head keeps the first entry, insert extension with the rest
extension.setNext(this.getNext()); extension.setNext(this.getNext());
this.setNext(extension.getPageId()); this.setNext(extension.getPageId());
extension.setEntries(entries.getHead().splitAfter()); extension.setEntries(entries.getHead().splitAfter());
} else { } else {
index.setTailPageId(extension.getPageId());
this.setNext(extension.getPageId()); this.setNext(extension.getPageId());
extension.setEntries(entries.getTail().getPrevious().splitAfter()); extension.setEntries(entries.getTail().getPrevious().splitAfter());
getContainingList().setTailPageId(extension.getPageId());
} }
index.storeNode(tx, this, false);
extension.store(tx, isAddFirst); extension.store(tx, isAddFirst);
store(tx);
} }
// called after a split // called after a split
@ -308,7 +320,7 @@ public final class ListNode<Key,Value> {
this.entries = list; this.entries = list;
} }
public Value get(Transaction tx, Key key) throws IOException { public Value get(Transaction tx, Key key) {
if (key == null) { if (key == null) {
throw new IllegalArgumentException("Key cannot be null"); throw new IllegalArgumentException("Key cannot be null");
} }
@ -324,15 +336,15 @@ public final class ListNode<Key,Value> {
return result; return result;
} }
public boolean isEmpty(final Transaction tx) throws IOException { public boolean isEmpty(final Transaction tx) {
return entries.isEmpty(); return entries.isEmpty();
} }
public Entry<Key,Value> getFirst(Transaction tx) throws IOException { public Entry<Key,Value> getFirst(Transaction tx) {
return entries.getHead(); return entries.getHead();
} }
public Entry<Key,Value> getLast(Transaction tx) throws IOException { public Entry<Key,Value> getLast(Transaction tx) {
return entries.getTail(); return entries.getTail();
} }
@ -353,7 +365,7 @@ public final class ListNode<Key,Value> {
tx.free(this.getPageId()); tx.free(this.getPageId());
} }
public boolean contains(Transaction tx, Key key) throws IOException { public boolean contains(Transaction tx, Key key) {
if (key == null) { if (key == null) {
throw new IllegalArgumentException("Key cannot be null"); throw new IllegalArgumentException("Key cannot be null");
} }
@ -393,9 +405,29 @@ public final class ListNode<Key,Value> {
this.next = next; this.next = next;
} }
public void setContainingList(ListIndex<Key, Value> list) {
this.containingList = list;
}
public ListIndex<Key,Value> getContainingList() {
return containingList;
}
public boolean isHead() {
return getPageId() == containingList.getHeadPageId();
}
public boolean isTail() {
return getPageId() == containingList.getTailPageId();
}
public int size(Transaction tx) {
return entries.size();
}
@Override @Override
public String toString() { public String toString() {
return "[ListNode(" + page.getPageId() + "->" + next + ") " + entries.toString() + "]"; return "[ListNode(" + (page != null ? page.getPageId() + "->" + next : "null") + ")[" + entries.size() + "]]";
} }
} }

View File

@ -171,7 +171,7 @@ public class PageFile {
@Override @Override
public String toString() { public String toString() {
return "[PageWrite:"+page.getPageId()+"]"; return "[PageWrite:"+page.getPageId()+ "-" + page.getType() + "]";
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -827,9 +827,7 @@ public class PageFile {
public void freePage(long pageId) { public void freePage(long pageId) {
freeList.add(pageId); freeList.add(pageId);
if( enablePageCaching ) { removeFromCache(pageId);
pageCache.remove(pageId);
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -932,9 +930,9 @@ public class PageFile {
} }
} }
void removeFromCache(Page page) { void removeFromCache(long pageId) {
if (enablePageCaching) { if (enablePageCaching) {
pageCache.remove(page.getPageId()); pageCache.remove(pageId);
} }
} }

View File

@ -129,8 +129,6 @@ public class Transaction implements Iterable<Page> {
* if the PageFile is not loaded * if the PageFile is not loaded
*/ */
public <T> Page<T> allocate(int count) throws IOException { public <T> Page<T> allocate(int count) throws IOException {
// TODO: we need to track allocated pages so that they can be returned if the
// transaction gets rolled back.
Page<T> rc = pageFile.allocate(count); Page<T> rc = pageFile.allocate(count);
allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1)); allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
return rc; return rc;

View File

@ -197,7 +197,7 @@ public class LinkedNode<T extends LinkedNode<T>> {
public void linkToHead(LinkedNodeList<T> target) { public void linkToHead(LinkedNodeList<T> target) {
if (list != null) { if (list != null) {
throw new IllegalArgumentException("This node is already linked to a node"); throw new IllegalArgumentException("This node is already linked to a list");
} }
if (target.head == null) { if (target.head == null) {

View File

@ -204,7 +204,7 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
return sequence; return sequence;
} }
if (sequence.range() > count ) { if (sequence.range() > count ) {
Sequence rc = new Sequence(sequence.first, sequence.first+count); Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
sequence.first+=count; sequence.first+=count;
return rc; return rc;
} }