Some updates and changes to support some work on https://issues.apache.org/jira/browse/AMQ-3467

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1164936 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-09-03 21:39:12 +00:00
parent 937d5dd0c4
commit 82e3be347c
4 changed files with 272 additions and 23 deletions

View File

@ -52,6 +52,11 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
setHeadPageId(headPageId);
}
@SuppressWarnings("rawtypes")
public ListIndex(PageFile pageFile, Page page) {
this(pageFile, page.getPageId());
}
synchronized public void load(Transaction tx) throws IOException {
if (loaded.compareAndSet(false, true)) {
LOG.debug("loading");
@ -82,12 +87,12 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
}
}
}
synchronized public void unload(Transaction tx) {
if (loaded.compareAndSet(true, false)) {
}
}
}
protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
return loadNode(tx, getHeadPageId());
}
@ -181,7 +186,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
return getHead(tx).iterator(tx);
}
synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
return getHead(tx).iterator(tx, initialPosition);
}
@ -223,7 +228,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
tx.store(node.getPage(), marshaller, overflow);
}
public PageFile getPageFile() {
return pageFile;
}

View File

@ -54,7 +54,6 @@ public final class ListNode<Key,Value> {
// The next page after this one.
private long next = ListIndex.NOT_SET;
static final class KeyValueEntry<Key, Value> extends LinkedNode<KeyValueEntry<Key, Value>> implements Entry<Key, Value>
{
private final Key key;
@ -254,7 +253,7 @@ public final class ListNode<Key,Value> {
}
}
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>();
node.next = is.readLong();
@ -290,8 +289,8 @@ public final class ListNode<Key,Value> {
try {
getContainingList().storeNode(tx, this, false);
} catch ( Transaction.PageOverflowIOException e ) {
// If we get an overflow
split(tx, addFirst);
// If we get an overflow
split(tx, addFirst);
}
}
@ -384,7 +383,7 @@ public final class ListNode<Key,Value> {
///////////////////////////////////////////////////////////////////
// Implementation methods
///////////////////////////////////////////////////////////////////
public long getPageId() {
return page.getPageId();
}

View File

@ -20,6 +20,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -27,15 +28,15 @@ import java.util.NoSuchElementException;
* Keeps track of a added long values. Collapses ranges of numbers using a
* Sequence representation. Use to keep track of received message ids to find
* out if a message is duplicate or if there are any missing messages.
*
*
* @author chirino
*/
public class SequenceSet extends LinkedNodeList<Sequence> {
public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
public static final Marshaller INSTANCE = new Marshaller();
public SequenceSet readPayload(DataInput in) throws IOException {
SequenceSet value = new SequenceSet();
int count = in.readInt();
@ -85,17 +86,16 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
return true;
}
}
public void add(Sequence value) {
// TODO we can probably optimize this a bit
for(long i=value.first; i<value.last+1; i++) {
add(i);
}
}
/**
*
*
* @param value
* the value to add to the list
* @return false if the value was a duplicate.
@ -160,7 +160,44 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
addLast(new Sequence(value));
return true;
}
/**
* Removes the given value from the Sequence set, splitting a
* contained sequence if necessary.
*
* @param value
* The value that should be removed from the SequenceSet.
*
* @return true if the value was removed from the set, false if there
* was no sequence in the set that contained the given value.
*/
public boolean remove(long value) {
Sequence sequence = getHead();
while (sequence != null ) {
if(sequence.contains(value)) {
if (sequence.range() == 1) {
sequence.unlink();
return true;
} else if (sequence.getFirst() == value) {
sequence.setFirst(value+1);
return true;
} else if (sequence.getLast() == value) {
sequence.setLast(value-1);
return true;
} else {
sequence.linkBefore(new Sequence(sequence.first, value-1));
sequence.linkAfter(new Sequence(value+1, sequence.last));
sequence.unlink();
return true;
}
}
sequence = sequence.getNext();
}
return false;
}
/**
* Removes and returns the first element from this list.
*
@ -171,12 +208,16 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
if (isEmpty()) {
throw new NoSuchElementException();
}
Sequence rc = removeFirstSequence(1);
return rc.first;
}
/**
* Removes and returns the last sequence from this list.
*
* @return the last sequence from this list or null if the list is empty.
*/
public Sequence removeLastSequence() {
if (isEmpty()) {
return null;
@ -196,7 +237,7 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
if (isEmpty()) {
return null;
}
Sequence sequence = getHead();
while (sequence != null ) {
if (sequence.range() == count ) {
@ -213,7 +254,6 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
return null;
}
/**
* @return all the id Sequences that are missing from this set that are not
* in between the range provided.
@ -266,6 +306,31 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
return rc;
}
/**
* Returns true if the value given is contained within one of the
* sequences held in this set.
*
* @param value
* The value to search for in the set.
*
* @return true if the value is contained in the set.
*/
public boolean contains(long value) {
if (isEmpty()) {
return false;
}
Sequence sequence = getHead();
while (sequence != null) {
if (sequence.contains(value)) {
return true;
}
sequence = sequence.getNext();
}
return false;
}
public boolean contains(int first, int last) {
if (isEmpty()) {
return false;
@ -273,13 +338,20 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
Sequence sequence = getHead();
while (sequence != null) {
if (sequence.first <= first && first <= sequence.last ) {
return last <= sequence.last ;
return last <= sequence.last;
}
sequence = sequence.getNext();
}
return false;
}
/**
* Computes the size of this Sequence by summing the values of all
* the contained sequences.
*
* @return the total number of values contained in this set if it
* were to be iterated over like an array.
*/
public long rangeSize() {
long result = 0;
Sequence sequence = getHead();
@ -290,4 +362,48 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
return result;
}
public Iterator<Long> iterator() {
return new SequenceIterator();
}
private class SequenceIterator implements Iterator<Long> {
private Sequence currentEntry;
private long lastReturned;
public SequenceIterator() {
currentEntry = getHead();
lastReturned = currentEntry.first - 1;
}
public boolean hasNext() {
return currentEntry != null;
}
public Long next() {
if (currentEntry == null) {
throw new NoSuchElementException();
}
if(lastReturned < currentEntry.first) {
lastReturned = currentEntry.first;
if (currentEntry.range() == 1) {
currentEntry = currentEntry.getNext();
}
} else {
lastReturned++;
if (lastReturned == currentEntry.last) {
currentEntry = currentEntry.getNext();
}
}
return lastReturned;
}
public void remove() {
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.util;
import static org.junit.Assert.*;
import java.util.Iterator;
import org.junit.Test;
public class SequenceSetTest {
@Test
public void testAddLong() {
SequenceSet set = new SequenceSet();
set.add(1);
assertEquals(1, set.rangeSize());
set.add(10);
set.add(20);
assertEquals(3, set.rangeSize());
}
@Test
public void testRangeSize() {
SequenceSet set = new SequenceSet();
set.add(1);
assertEquals(1, set.rangeSize());
set.add(10);
set.add(20);
assertEquals(3, set.rangeSize());
set.clear();
assertEquals(0, set.rangeSize());
}
@Test
public void testIsEmpty() {
SequenceSet set = new SequenceSet();
assertTrue(set.isEmpty());
set.add(1);
assertFalse(set.isEmpty());
}
@Test
public void testClear() {
SequenceSet set = new SequenceSet();
set.clear();
assertTrue(set.isEmpty());
set.add(1);
assertFalse(set.isEmpty());
set.clear();
assertTrue(set.isEmpty());
}
@Test
public void testContains() {
SequenceSet set = new SequenceSet();
set.add(new Sequence(0, 10));
set.add(new Sequence(21, 42));
set.add(new Sequence(47, 90));
set.add(new Sequence(142, 512));
assertTrue(set.contains(0));
assertTrue(set.contains(42));
assertTrue(set.contains(49));
assertTrue(set.contains(153));
assertFalse(set.contains(43));
assertFalse(set.contains(99));
assertFalse(set.contains(-1));
assertFalse(set.contains(11));
}
@Test
public void testRemove() {
SequenceSet set = new SequenceSet();
set.add(new Sequence(0, 100));
assertEquals(101, set.rangeSize());
assertEquals(1, set.size());
assertFalse(set.remove(101));
assertTrue(set.remove(50));
assertEquals(2, set.size());
assertEquals(100, set.rangeSize());
assertFalse(set.remove(101));
set.remove(0);
assertEquals(2, set.size());
assertEquals(99, set.rangeSize());
set.remove(100);
assertEquals(2, set.size());
assertEquals(98, set.rangeSize());
set.remove(10);
assertEquals(3, set.size());
assertEquals(97, set.rangeSize());
}
@Test
public void testIterator() {
SequenceSet set = new SequenceSet();
set.add(new Sequence(0, 2));
set.add(new Sequence(4, 5));
set.add(new Sequence(7));
set.add(new Sequence(20, 21));
long expected[] = new long[]{0, 1, 2, 4, 5, 7, 20, 21};
int index = 0;
Iterator<Long> iterator = set.iterator();
while(iterator.hasNext()) {
assertEquals(expected[index++], iterator.next().longValue());
}
}
}