mirror of https://github.com/apache/activemq.git
Some updates and changes to support some work on https://issues.apache.org/jira/browse/AMQ-3467
Enhance the ListIndex to improve performance of the remove and put operations, put is now a real put and will update the element with the given key if it exists in the list, otherwise it will add it to the end. Also adds the ability for a single key/value pair to span more than one page when needed, multiple elements will still reside on one page whenever possible. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1170849 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06c00c1db8
commit
334ade2f5d
|
@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.kahadb.index.ListNode.ListIterator;
|
||||
import org.apache.kahadb.page.Page;
|
||||
import org.apache.kahadb.page.PageFile;
|
||||
import org.apache.kahadb.page.Transaction;
|
||||
|
@ -103,6 +104,11 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
|||
|
||||
synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
|
||||
assertLoaded();
|
||||
|
||||
if (size.get() == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
|
||||
Map.Entry<Key,Value> candidate = iterator.next();
|
||||
if (key.equals(candidate.getKey())) {
|
||||
|
@ -112,11 +118,17 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
|||
return false;
|
||||
}
|
||||
|
||||
private ListNode<Key, Value> lastGetNodeCache = null;
|
||||
private Map.Entry<Key, Value> lastGetEntryCache = null;
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
synchronized public Value get(Transaction tx, Key key) throws IOException {
|
||||
assertLoaded();
|
||||
for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
|
||||
Map.Entry<Key,Value> candidate = iterator.next();
|
||||
if (key.equals(candidate.getKey())) {
|
||||
this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
|
||||
this.lastGetEntryCache = candidate;
|
||||
return candidate.getValue();
|
||||
}
|
||||
}
|
||||
|
@ -124,10 +136,52 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
|||
}
|
||||
|
||||
/**
|
||||
* appends to the list
|
||||
* @return null
|
||||
* Update the value of the item with the given key in the list if ot exists, otherwise
|
||||
* it appends the value to the end of the list.
|
||||
*
|
||||
* @return the old value contained in the list if one exists or null.
|
||||
*/
|
||||
@SuppressWarnings({ "rawtypes" })
|
||||
synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
|
||||
|
||||
Value oldValue = null;
|
||||
|
||||
if (lastGetNodeCache != null) {
|
||||
|
||||
if(lastGetEntryCache.getKey().equals(key)) {
|
||||
oldValue = lastGetEntryCache.setValue(value);
|
||||
lastGetEntryCache.setValue(value);
|
||||
lastGetNodeCache.storeUpdate(tx);
|
||||
return oldValue;
|
||||
}
|
||||
|
||||
// This searches from the last location of a call to get for the element to replace
|
||||
// all the way to the end of the ListIndex.
|
||||
Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Key, Value> entry = iterator.next();
|
||||
if (entry.getKey().equals(key)) {
|
||||
oldValue = entry.setValue(value);
|
||||
((ListIterator) iterator).getCurrent().storeUpdate(tx);
|
||||
return oldValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Not found because the cache wasn't set or its not at the end of the list so we
|
||||
// start from the beginning and go to the cached location or the end, then we do
|
||||
// an add if its not found.
|
||||
Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
|
||||
while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
|
||||
Map.Entry<Key, Value> entry = iterator.next();
|
||||
if (entry.getKey().equals(key)) {
|
||||
oldValue = entry.setValue(value);
|
||||
((ListIterator) iterator).getCurrent().storeUpdate(tx);
|
||||
return oldValue;
|
||||
}
|
||||
}
|
||||
|
||||
// Not found so add it last.
|
||||
return add(tx, key, value);
|
||||
}
|
||||
|
||||
|
@ -145,15 +199,40 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
|||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
synchronized public Value remove(Transaction tx, Key key) throws IOException {
|
||||
assertLoaded();
|
||||
for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
|
||||
Map.Entry<Key,Value> candidate = iterator.next();
|
||||
if (key.equals(candidate.getKey())) {
|
||||
|
||||
if (size.get() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (lastGetNodeCache != null) {
|
||||
|
||||
// This searches from the last location of a call to get for the element to remove
|
||||
// all the way to the end of the ListIndex.
|
||||
Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Key, Value> entry = iterator.next();
|
||||
if (entry.getKey().equals(key)) {
|
||||
iterator.remove();
|
||||
return candidate.getValue();
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Not found because the cache wasn't set or its not at the end of the list so we
|
||||
// start from the beginning and go to the cached location or the end to find the
|
||||
// element to remove.
|
||||
Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
|
||||
while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
|
||||
Map.Entry<Key, Value> entry = iterator.next();
|
||||
if (entry.getKey().equals(key)) {
|
||||
iterator.remove();
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -227,6 +306,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
|||
|
||||
public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
|
||||
tx.store(node.getPage(), marshaller, overflow);
|
||||
flushCache();
|
||||
}
|
||||
|
||||
public PageFile getPageFile() {
|
||||
|
@ -270,4 +350,9 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
|||
public long size() {
|
||||
return size.get();
|
||||
}
|
||||
|
||||
private void flushCache() {
|
||||
this.lastGetEntryCache = null;
|
||||
this.lastGetNodeCache = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public final class ListNode<Key,Value> {
|
|||
static final class KeyValueEntry<Key, Value> extends LinkedNode<KeyValueEntry<Key, Value>> implements Entry<Key, Value>
|
||||
{
|
||||
private final Key key;
|
||||
private final Value value;
|
||||
private Value value;
|
||||
|
||||
public KeyValueEntry(Key key, Value value) {
|
||||
this.key = key;
|
||||
|
@ -73,7 +73,9 @@ public final class ListNode<Key,Value> {
|
|||
}
|
||||
|
||||
public Value setValue(Value value) {
|
||||
throw new UnsupportedOperationException();
|
||||
Value oldValue = this.value;
|
||||
this.value = value;
|
||||
return oldValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -121,7 +123,7 @@ public final class ListNode<Key,Value> {
|
|||
}
|
||||
}
|
||||
|
||||
private final class ListIterator implements Iterator<Entry<Key, Value>> {
|
||||
final class ListIterator implements Iterator<Entry<Key, Value>> {
|
||||
|
||||
private final Transaction tx;
|
||||
private final ListIndex<Key,Value> targetList;
|
||||
|
@ -220,6 +222,10 @@ public final class ListNode<Key,Value> {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
ListNode<Key, Value> getCurrent() {
|
||||
return this.currentNode;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -285,9 +291,28 @@ public final class ListNode<Key,Value> {
|
|||
return null;
|
||||
}
|
||||
|
||||
public void storeUpdate(Transaction tx) throws IOException {
|
||||
try {
|
||||
if (this.entries.size() == 1) {
|
||||
getContainingList().storeNode(tx, this, true);
|
||||
} else {
|
||||
getContainingList().storeNode(tx, this, false);
|
||||
}
|
||||
} catch ( Transaction.PageOverflowIOException e ) {
|
||||
split(tx, ADD_FIRST);
|
||||
}
|
||||
}
|
||||
|
||||
private void store(Transaction tx, boolean addFirst) throws IOException {
|
||||
try {
|
||||
// When we split to a node of one element we can span multiple
|
||||
// pages for that entry, otherwise we keep the entries on one
|
||||
// page to avoid fragmented reads and segment the list traversal.
|
||||
if (this.entries.size() == 1) {
|
||||
getContainingList().storeNode(tx, this, true);
|
||||
} else {
|
||||
getContainingList().storeNode(tx, this, false);
|
||||
}
|
||||
} catch ( Transaction.PageOverflowIOException e ) {
|
||||
// If we get an overflow
|
||||
split(tx, addFirst);
|
||||
|
@ -295,8 +320,12 @@ public final class ListNode<Key,Value> {
|
|||
}
|
||||
|
||||
private void store(Transaction tx) throws IOException {
|
||||
if (this.entries.size() == 1) {
|
||||
getContainingList().storeNode(tx, this, true);
|
||||
} else {
|
||||
getContainingList().storeNode(tx, this, false);
|
||||
}
|
||||
}
|
||||
|
||||
private void split(Transaction tx, boolean isAddFirst) throws IOException {
|
||||
ListNode<Key, Value> extension = getContainingList().createNode(tx);
|
||||
|
@ -311,7 +340,7 @@ public final class ListNode<Key,Value> {
|
|||
getContainingList().setTailPageId(extension.getPageId());
|
||||
}
|
||||
extension.store(tx, isAddFirst);
|
||||
store(tx);
|
||||
store(tx, true);
|
||||
}
|
||||
|
||||
// called after a split
|
||||
|
|
|
@ -16,12 +16,24 @@
|
|||
*/
|
||||
package org.apache.kahadb.index;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.kahadb.page.PageFile;
|
||||
import org.apache.kahadb.util.LongMarshaller;
|
||||
import org.apache.kahadb.util.StringMarshaller;
|
||||
import org.apache.kahadb.util.VariableMarshaller;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -84,6 +96,38 @@ public class ListIndexTest extends IndexTestSupport {
|
|||
tx.commit();
|
||||
}
|
||||
|
||||
public void testPut() throws Exception {
|
||||
createPageFileAndIndex(100);
|
||||
|
||||
ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
|
||||
this.index.load(tx);
|
||||
tx.commit();
|
||||
|
||||
int count = 30;
|
||||
tx = pf.tx();
|
||||
doInsert(count);
|
||||
tx.commit();
|
||||
assertEquals("correct size", count, listIndex.size());
|
||||
|
||||
tx = pf.tx();
|
||||
Long value = listIndex.get(tx, key(10));
|
||||
assertNotNull(value);
|
||||
listIndex.put(tx, key(10), Long.valueOf(1024));
|
||||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
value = listIndex.get(tx, key(10));
|
||||
assertEquals(1024L, value.longValue());
|
||||
assertTrue(listIndex.size() == 30);
|
||||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
value = listIndex.put(tx, key(31), Long.valueOf(2048));
|
||||
assertNull(value);
|
||||
assertTrue(listIndex.size() == 31);
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
public void testAddFirst() throws Exception {
|
||||
createPageFileAndIndex(100);
|
||||
|
||||
|
@ -273,7 +317,7 @@ public class ListIndexTest extends IndexTestSupport {
|
|||
final int COUNT = 50000;
|
||||
long start = System.currentTimeMillis();
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
listIndex.put(tx, key(i), (long) i);
|
||||
listIndex.add(tx, key(i), (long) i);
|
||||
tx.commit();
|
||||
}
|
||||
LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
|
||||
|
@ -295,9 +339,85 @@ public class ListIndexTest extends IndexTestSupport {
|
|||
LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
|
||||
}
|
||||
|
||||
private int getMessageSize(int min, int max) {
|
||||
return min + (int)(Math.random() * ((max - min) + 1));
|
||||
}
|
||||
|
||||
public void testLargeValueOverflow() throws Exception {
|
||||
pf = new PageFile(directory, getClass().getName());
|
||||
pf.setPageSize(4*1024);
|
||||
pf.setEnablePageCaching(false);
|
||||
pf.setWriteBatchSize(1);
|
||||
pf.load();
|
||||
tx = pf.tx();
|
||||
long id = tx.allocate().getPageId();
|
||||
|
||||
ListIndex<Long, String> test = new ListIndex<Long, String>(pf, id);
|
||||
test.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
test.setValueMarshaller(StringMarshaller.INSTANCE);
|
||||
test.load(tx);
|
||||
tx.commit();
|
||||
|
||||
final long NUM_ADDITIONS = 32L;
|
||||
|
||||
LinkedList<Long> expected = new LinkedList<Long>();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i = 0; i < NUM_ADDITIONS; ++i) {
|
||||
final int stringSize = getMessageSize(1, 4096);
|
||||
String val = new String(new byte[stringSize]);
|
||||
expected.add(Long.valueOf(stringSize));
|
||||
test.add(tx, i, val);
|
||||
}
|
||||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i = 0; i < NUM_ADDITIONS; i++) {
|
||||
String s = test.get(tx, i);
|
||||
assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
|
||||
}
|
||||
tx.commit();
|
||||
|
||||
expected.clear();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i = 0; i < NUM_ADDITIONS; ++i) {
|
||||
final int stringSize = getMessageSize(1, 4096);
|
||||
String val = new String(new byte[stringSize]);
|
||||
expected.add(Long.valueOf(stringSize));
|
||||
test.addFirst(tx, i+NUM_ADDITIONS, val);
|
||||
}
|
||||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i = 0; i < NUM_ADDITIONS; i++) {
|
||||
String s = test.get(tx, i+NUM_ADDITIONS);
|
||||
assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
|
||||
}
|
||||
tx.commit();
|
||||
|
||||
expected.clear();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i = 0; i < NUM_ADDITIONS; ++i) {
|
||||
final int stringSize = getMessageSize(1, 4096);
|
||||
String val = new String(new byte[stringSize]);
|
||||
expected.add(Long.valueOf(stringSize));
|
||||
test.put(tx, i, val);
|
||||
}
|
||||
tx.commit();
|
||||
|
||||
tx = pf.tx();
|
||||
for (long i = 0; i < NUM_ADDITIONS; i++) {
|
||||
String s = test.get(tx, i);
|
||||
assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
|
||||
}
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
void doInsertReverse(int count) throws Exception {
|
||||
for (int i = count - 1; i >= 0; i--) {
|
||||
((ListIndex) index).addFirst(tx, key(i), (long) i);
|
||||
((ListIndex<String, Long>) index).addFirst(tx, key(i), (long) i);
|
||||
tx.commit();
|
||||
}
|
||||
}
|
||||
|
@ -306,4 +426,35 @@ public class ListIndexTest extends IndexTestSupport {
|
|||
protected String key(int i) {
|
||||
return "key:" + nf.format(i);
|
||||
}
|
||||
|
||||
static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
|
||||
final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
|
||||
|
||||
public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
ObjectOutputStream oout = new ObjectOutputStream(baos);
|
||||
oout.writeObject(object);
|
||||
oout.flush();
|
||||
oout.close();
|
||||
byte[] data = baos.toByteArray();
|
||||
dataOut.writeInt(data.length);
|
||||
dataOut.write(data);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public HashSet<String> readPayload(DataInput dataIn) throws IOException {
|
||||
int dataLen = dataIn.readInt();
|
||||
byte[] data = new byte[dataLen];
|
||||
dataIn.readFully(data);
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(data);
|
||||
ObjectInputStream oin = new ObjectInputStream(bais);
|
||||
try {
|
||||
return (HashSet<String>) oin.readObject();
|
||||
} catch (ClassNotFoundException cfe) {
|
||||
IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
|
||||
ioe.initCause(cfe);
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue