diff --git a/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java b/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java index b7c4ba4f5f..a9ea4a5504 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.kahadb.index.ListNode.ListIterator; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; @@ -103,6 +104,11 @@ public class ListIndex implements Index { synchronized public boolean containsKey(Transaction tx, Key key) throws IOException { assertLoaded(); + + if (size.get() == 0) { + return false; + } + for (Iterator> iterator = iterator(tx); iterator.hasNext(); ) { Map.Entry candidate = iterator.next(); if (key.equals(candidate.getKey())) { @@ -112,11 +118,17 @@ public class ListIndex implements Index { return false; } + private ListNode lastGetNodeCache = null; + private Map.Entry lastGetEntryCache = null; + + @SuppressWarnings({ "rawtypes", "unchecked" }) synchronized public Value get(Transaction tx, Key key) throws IOException { assertLoaded(); for (Iterator> iterator = iterator(tx); iterator.hasNext(); ) { Map.Entry candidate = iterator.next(); if (key.equals(candidate.getKey())) { + this.lastGetNodeCache = ((ListIterator) iterator).getCurrent(); + this.lastGetEntryCache = candidate; return candidate.getValue(); } } @@ -124,10 +136,52 @@ public class ListIndex implements Index { } /** - * appends to the list - * @return null + * Update the value of the item with the given key in the list if ot exists, otherwise + * it appends the value to the end of the list. + * + * @return the old value contained in the list if one exists or null. */ + @SuppressWarnings({ "rawtypes" }) synchronized public Value put(Transaction tx, Key key, Value value) throws IOException { + + Value oldValue = null; + + if (lastGetNodeCache != null) { + + if(lastGetEntryCache.getKey().equals(key)) { + oldValue = lastGetEntryCache.setValue(value); + lastGetEntryCache.setValue(value); + lastGetNodeCache.storeUpdate(tx); + return oldValue; + } + + // This searches from the last location of a call to get for the element to replace + // all the way to the end of the ListIndex. + Iterator> iterator = lastGetNodeCache.iterator(tx); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().equals(key)) { + oldValue = entry.setValue(value); + ((ListIterator) iterator).getCurrent().storeUpdate(tx); + return oldValue; + } + } + } + + // Not found because the cache wasn't set or its not at the end of the list so we + // start from the beginning and go to the cached location or the end, then we do + // an add if its not found. + Iterator> iterator = iterator(tx); + while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) { + Map.Entry entry = iterator.next(); + if (entry.getKey().equals(key)) { + oldValue = entry.setValue(value); + ((ListIterator) iterator).getCurrent().storeUpdate(tx); + return oldValue; + } + } + + // Not found so add it last. return add(tx, key, value); } @@ -145,15 +199,40 @@ public class ListIndex implements Index { return null; } + @SuppressWarnings("rawtypes") synchronized public Value remove(Transaction tx, Key key) throws IOException { assertLoaded(); - for (Iterator> iterator = iterator(tx); iterator.hasNext(); ) { - Map.Entry candidate = iterator.next(); - if (key.equals(candidate.getKey())) { - iterator.remove(); - return candidate.getValue(); + + if (size.get() == 0) { + return null; + } + + if (lastGetNodeCache != null) { + + // This searches from the last location of a call to get for the element to remove + // all the way to the end of the ListIndex. + Iterator> iterator = lastGetNodeCache.iterator(tx); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().equals(key)) { + iterator.remove(); + return entry.getValue(); + } } } + + // Not found because the cache wasn't set or its not at the end of the list so we + // start from the beginning and go to the cached location or the end to find the + // element to remove. + Iterator> iterator = iterator(tx); + while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) { + Map.Entry entry = iterator.next(); + if (entry.getKey().equals(key)) { + iterator.remove(); + return entry.getValue(); + } + } + return null; } @@ -227,6 +306,7 @@ public class ListIndex implements Index { public void storeNode(Transaction tx, ListNode node, boolean overflow) throws IOException { tx.store(node.getPage(), marshaller, overflow); + flushCache(); } public PageFile getPageFile() { @@ -270,4 +350,9 @@ public class ListIndex implements Index { public long size() { return size.get(); } + + private void flushCache() { + this.lastGetEntryCache = null; + this.lastGetNodeCache = null; + } } diff --git a/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java b/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java index 9f8e675c00..28ccc47cb1 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java @@ -57,7 +57,7 @@ public final class ListNode { static final class KeyValueEntry extends LinkedNode> implements Entry { private final Key key; - private final Value value; + private Value value; public KeyValueEntry(Key key, Value value) { this.key = key; @@ -73,7 +73,9 @@ public final class ListNode { } public Value setValue(Value value) { - throw new UnsupportedOperationException(); + Value oldValue = this.value; + this.value = value; + return oldValue; } @Override @@ -121,7 +123,7 @@ public final class ListNode { } } - private final class ListIterator implements Iterator> { + final class ListIterator implements Iterator> { private final Transaction tx; private final ListIndex targetList; @@ -220,6 +222,10 @@ public final class ListNode { throw e; } } + + ListNode getCurrent() { + return this.currentNode; + } } /** @@ -285,9 +291,28 @@ public final class ListNode { return null; } + public void storeUpdate(Transaction tx) throws IOException { + try { + if (this.entries.size() == 1) { + getContainingList().storeNode(tx, this, true); + } else { + getContainingList().storeNode(tx, this, false); + } + } catch ( Transaction.PageOverflowIOException e ) { + split(tx, ADD_FIRST); + } + } + private void store(Transaction tx, boolean addFirst) throws IOException { try { - getContainingList().storeNode(tx, this, false); + // When we split to a node of one element we can span multiple + // pages for that entry, otherwise we keep the entries on one + // page to avoid fragmented reads and segment the list traversal. + if (this.entries.size() == 1) { + getContainingList().storeNode(tx, this, true); + } else { + getContainingList().storeNode(tx, this, false); + } } catch ( Transaction.PageOverflowIOException e ) { // If we get an overflow split(tx, addFirst); @@ -295,7 +320,11 @@ public final class ListNode { } private void store(Transaction tx) throws IOException { - getContainingList().storeNode(tx, this, false); + if (this.entries.size() == 1) { + getContainingList().storeNode(tx, this, true); + } else { + getContainingList().storeNode(tx, this, false); + } } private void split(Transaction tx, boolean isAddFirst) throws IOException { @@ -311,7 +340,7 @@ public final class ListNode { getContainingList().setTailPageId(extension.getPageId()); } extension.store(tx, isAddFirst); - store(tx); + store(tx, true); } // called after a split diff --git a/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java b/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java index 9df193f730..437b925208 100644 --- a/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java +++ b/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java @@ -16,12 +16,24 @@ */ package org.apache.kahadb.index; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.text.NumberFormat; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Random; + +import org.apache.kahadb.page.PageFile; import org.apache.kahadb.util.LongMarshaller; import org.apache.kahadb.util.StringMarshaller; +import org.apache.kahadb.util.VariableMarshaller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +96,38 @@ public class ListIndexTest extends IndexTestSupport { tx.commit(); } + public void testPut() throws Exception { + createPageFileAndIndex(100); + + ListIndex listIndex = ((ListIndex) this.index); + this.index.load(tx); + tx.commit(); + + int count = 30; + tx = pf.tx(); + doInsert(count); + tx.commit(); + assertEquals("correct size", count, listIndex.size()); + + tx = pf.tx(); + Long value = listIndex.get(tx, key(10)); + assertNotNull(value); + listIndex.put(tx, key(10), Long.valueOf(1024)); + tx.commit(); + + tx = pf.tx(); + value = listIndex.get(tx, key(10)); + assertEquals(1024L, value.longValue()); + assertTrue(listIndex.size() == 30); + tx.commit(); + + tx = pf.tx(); + value = listIndex.put(tx, key(31), Long.valueOf(2048)); + assertNull(value); + assertTrue(listIndex.size() == 31); + tx.commit(); + } + public void testAddFirst() throws Exception { createPageFileAndIndex(100); @@ -273,7 +317,7 @@ public class ListIndexTest extends IndexTestSupport { final int COUNT = 50000; long start = System.currentTimeMillis(); for (int i = 0; i < COUNT; i++) { - listIndex.put(tx, key(i), (long) i); + listIndex.add(tx, key(i), (long) i); tx.commit(); } LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills"); @@ -295,9 +339,85 @@ public class ListIndexTest extends IndexTestSupport { LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount()); } + private int getMessageSize(int min, int max) { + return min + (int)(Math.random() * ((max - min) + 1)); + } + + public void testLargeValueOverflow() throws Exception { + pf = new PageFile(directory, getClass().getName()); + pf.setPageSize(4*1024); + pf.setEnablePageCaching(false); + pf.setWriteBatchSize(1); + pf.load(); + tx = pf.tx(); + long id = tx.allocate().getPageId(); + + ListIndex test = new ListIndex(pf, id); + test.setKeyMarshaller(LongMarshaller.INSTANCE); + test.setValueMarshaller(StringMarshaller.INSTANCE); + test.load(tx); + tx.commit(); + + final long NUM_ADDITIONS = 32L; + + LinkedList expected = new LinkedList(); + + tx = pf.tx(); + for (long i = 0; i < NUM_ADDITIONS; ++i) { + final int stringSize = getMessageSize(1, 4096); + String val = new String(new byte[stringSize]); + expected.add(Long.valueOf(stringSize)); + test.add(tx, i, val); + } + tx.commit(); + + tx = pf.tx(); + for (long i = 0; i < NUM_ADDITIONS; i++) { + String s = test.get(tx, i); + assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length())); + } + tx.commit(); + + expected.clear(); + + tx = pf.tx(); + for (long i = 0; i < NUM_ADDITIONS; ++i) { + final int stringSize = getMessageSize(1, 4096); + String val = new String(new byte[stringSize]); + expected.add(Long.valueOf(stringSize)); + test.addFirst(tx, i+NUM_ADDITIONS, val); + } + tx.commit(); + + tx = pf.tx(); + for (long i = 0; i < NUM_ADDITIONS; i++) { + String s = test.get(tx, i+NUM_ADDITIONS); + assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length())); + } + tx.commit(); + + expected.clear(); + + tx = pf.tx(); + for (long i = 0; i < NUM_ADDITIONS; ++i) { + final int stringSize = getMessageSize(1, 4096); + String val = new String(new byte[stringSize]); + expected.add(Long.valueOf(stringSize)); + test.put(tx, i, val); + } + tx.commit(); + + tx = pf.tx(); + for (long i = 0; i < NUM_ADDITIONS; i++) { + String s = test.get(tx, i); + assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length())); + } + tx.commit(); + } + void doInsertReverse(int count) throws Exception { for (int i = count - 1; i >= 0; i--) { - ((ListIndex) index).addFirst(tx, key(i), (long) i); + ((ListIndex) index).addFirst(tx, key(i), (long) i); tx.commit(); } } @@ -306,4 +426,35 @@ public class ListIndexTest extends IndexTestSupport { protected String key(int i) { return "key:" + nf.format(i); } + + static class HashSetStringMarshaller extends VariableMarshaller> { + final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); + + public void writePayload(HashSet object, DataOutput dataOut) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oout = new ObjectOutputStream(baos); + oout.writeObject(object); + oout.flush(); + oout.close(); + byte[] data = baos.toByteArray(); + dataOut.writeInt(data.length); + dataOut.write(data); + } + + @SuppressWarnings("unchecked") + public HashSet readPayload(DataInput dataIn) throws IOException { + int dataLen = dataIn.readInt(); + byte[] data = new byte[dataLen]; + dataIn.readFully(data); + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream oin = new ObjectInputStream(bais); + try { + return (HashSet) oin.readObject(); + } catch (ClassNotFoundException cfe) { + IOException ioe = new IOException("Failed to read HashSet: " + cfe); + ioe.initCause(cfe); + throw ioe; + } + } + } }