Fixes AMQ-4215: Simplify PList interface and provide a LevelDB store implementation.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1418686 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-12-08 15:21:14 +00:00
parent a2cb4a88f7
commit 32e63dce28
20 changed files with 1079 additions and 66 deletions

View File

@ -1530,6 +1530,16 @@ public class BrokerService implements Service {
if (!isPersistent()) {
return null;
}
try {
PersistenceAdapter pa = getPersistenceAdapter();
if( pa!=null && pa instanceof PListStore) {
return (PListStore) pa;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
boolean result = true;
boolean empty = true;
try {

View File

@ -37,12 +37,14 @@ public class IndirectMessageReference implements QueueMessageReference {
private boolean acked;
/** Direct reference to the message */
private final Message message;
private final MessageId messageId;
/**
* @param message
*/
public IndirectMessageReference(final Message message) {
this.message = message;
this.messageId = message.getMessageId().copy();
message.getMessageId();
message.getGroupID();
message.getGroupSequence();
@ -111,7 +113,7 @@ public class IndirectMessageReference implements QueueMessageReference {
}
public MessageId getMessageId() {
return message.getMessageId();
return messageId;
}
public Message.MessageDestination getRegionDestination() {

View File

@ -100,6 +100,9 @@ public class TopicSubscription extends AbstractSubscription {
if (isDuplicate(node)) {
return;
}
// Lets use an indirect reference so that we can associate a unique
// locator /w the message.
node = new IndirectMessageReference(node.getMessage());
enqueueCounter.incrementAndGet();
if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages which
@ -540,7 +543,7 @@ public class TopicSubscription extends AbstractSubscription {
}
private void dispatch(final MessageReference node) throws IOException {
Message message = (Message)node;
Message message = node.getMessage();
if (node != null) {
node.incrementReferenceCount();
}

View File

@ -278,7 +278,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
systemUsage.getTempUsage().waitForSpace();
node.decrementReferenceCount();
ByteSequence bs = getByteSequence(node.getMessage());
getDiskList().addFirst(node.getMessageId().toString(), bs);
Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
node.getMessageId().setPlistLocator(locator);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
@ -335,7 +336,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
if (!isDiskListEmpty()) {
try {
getDiskList().remove(node.getMessageId().toString());
getDiskList().remove(node.getMessageId().getPlistLocator());
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -506,7 +507,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public MessageReference next() {
try {
PListEntry entry = iterator.next();
return getMessage(entry.getByteSequence());
Message message = getMessage(entry.getByteSequence());
message.getMessageId().setPlistLocator(entry.getLocator());
return message;
} catch (IOException e) {
LOG.error("I/O error", e);
throw new RuntimeException(e);

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.store;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import java.io.IOException;
import java.util.Iterator;
@ -25,30 +27,17 @@ import java.util.Iterator;
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface PList {
void setName(String name);
String getName();
void destroy() throws IOException;
void addLast(String id, ByteSequence bs) throws IOException;
Object addFirst(String id, ByteSequence bs) throws IOException;
Object addLast(String id, ByteSequence bs) throws IOException;
void addFirst(String id, ByteSequence bs) throws IOException;
boolean remove(String id) throws IOException;
boolean remove(long position) throws IOException;
PListEntry get(long position) throws IOException;
PListEntry getFirst() throws IOException;
PListEntry getLast() throws IOException;
boolean remove(Object position) throws IOException;
boolean isEmpty();
PListIterator iterator() throws IOException;
long size();
public interface PListIterator extends Iterator<PListEntry> {

View File

@ -22,10 +22,12 @@ public class PListEntry {
private final ByteSequence byteSequence;
private final String entry;
private final Object locator;
public PListEntry(String entry, ByteSequence bs) {
public PListEntry(String entry, ByteSequence bs, Object locator) {
this.entry = entry;
this.byteSequence = bs;
this.locator = locator;
}
public ByteSequence getByteSequence() {
@ -36,7 +38,11 @@ public class PListEntry {
return this.entry;
}
public Object getLocator() {
return locator;
}
public PListEntry copy() {
return new PListEntry(this.entry, this.byteSequence);
return new PListEntry(this.entry, this.byteSequence, locator);
}
}

View File

@ -0,0 +1,666 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class PListTestSupport {
static final Logger LOG = LoggerFactory.getLogger(PListTestSupport.class);
private PListStore store;
private PList plist;
final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed" + new byte[1024]);
final Vector<Throwable> exceptions = new Vector<Throwable>();
ExecutorService executor;
@Test
public void testAddLast() throws Exception {
final int COUNT = 1000;
LinkedList<ByteSequence> list = new LinkedList<ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
list.addLast(bs);
plist.addLast(test, bs);
}
assertEquals(plist.size(), COUNT);
PList.PListIterator actual = plist.iterator();
Iterator<ByteSequence> expected = list.iterator();
while (expected.hasNext()) {
ByteSequence bs = expected.next();
assertTrue(actual.hasNext());
PListEntry entry = actual.next();
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
entry.getByteSequence().getLength());
assertEquals(origStr, plistString);
}
assertFalse(actual.hasNext());
}
@Test
public void testAddFirst() throws Exception {
final int COUNT = 1000;
LinkedList<ByteSequence> list = new LinkedList<ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
list.addFirst(bs);
plist.addFirst(test, bs);
}
assertEquals(plist.size(), COUNT);
PList.PListIterator actual = plist.iterator();
Iterator<ByteSequence> expected = list.iterator();
while (expected.hasNext()) {
ByteSequence bs = expected.next();
assertTrue(actual.hasNext());
PListEntry entry = actual.next();
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
entry.getByteSequence().getLength());
assertEquals(origStr, plistString);
}
assertFalse(actual.hasNext());
}
@Test
public void testRemove() throws IOException {
doTestRemove(2000);
}
private PListEntry getFirst(PList plist) throws IOException {
PList.PListIterator iterator = plist.iterator();
try {
if( iterator.hasNext() ) {
return iterator.next();
} else {
return null;
}
}finally {
iterator.release();
}
}
protected void doTestRemove(final int COUNT) throws IOException {
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
map.put(test, bs);
plist.addLast(test, bs);
}
assertEquals(plist.size(), COUNT);
PListEntry entry = getFirst(plist);
while (entry != null) {
plist.remove(entry.getLocator());
entry = getFirst(plist);
}
assertEquals(0,plist.size());
}
@Test
public void testDestroy() throws Exception {
doTestRemove(1);
plist.destroy();
assertEquals(0,plist.size());
}
@Test
public void testDestroyNonEmpty() throws Exception {
final int COUNT = 1000;
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
map.put(test, bs);
plist.addLast(test, bs);
}
plist.destroy();
assertEquals(0,plist.size());
}
@Test
public void testRemoveSecond() throws Exception {
Object first = plist.addLast("First", new ByteSequence("A".getBytes()));
Object second = plist.addLast("Second", new ByteSequence("B".getBytes()));
assertTrue(plist.remove(second));
assertTrue(plist.remove(first));
assertFalse(plist.remove(first));
}
@Test
public void testRemoveSingleEntry() throws Exception {
plist.addLast("First", new ByteSequence("A".getBytes()));
Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext()) {
iterator.next();
iterator.remove();
}
}
@Test
public void testRemoveSecondPosition() throws Exception {
Object first = plist.addLast("First", new ByteSequence("A".getBytes()));
Object second = plist.addLast("Second", new ByteSequence("B".getBytes()));
assertTrue(plist.remove(second));
assertTrue(plist.remove(first));
assertFalse(plist.remove(first));
}
@Test
public void testConcurrentAddRemove() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = createConcurrentAddRemovePListStore();
store.setDirectory(directory);
store.start();
final ByteSequence payload = new ByteSequence(new byte[1024*2]);
final Vector<Throwable> exceptions = new Vector<Throwable>();
final int iterations = 1000;
final int numLists = 10;
final PList[] lists = new PList[numLists];
String threadName = Thread.currentThread().getName();
for (int i=0; i<numLists; i++) {
Thread.currentThread().setName("C:"+String.valueOf(i));
lists[i] = store.getPList(String.valueOf(i));
}
Thread.currentThread().setName(threadName);
executor = Executors.newFixedThreadPool(100);
class A implements Runnable {
@Override
public void run() {
final String threadName = Thread.currentThread().getName();
try {
for (int i=0; i<iterations; i++) {
PList candidate = lists[i%numLists];
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
Object last = candidate.addLast(String.valueOf(i), payload);
getFirst(candidate);
assertTrue(candidate.remove(last));
}
}
} catch (Exception error) {
LOG.error("Unexpcted ex", error);
error.printStackTrace();
exceptions.add(error);
} finally {
Thread.currentThread().setName(threadName);
}
}
};
class B implements Runnable {
@Override
public void run() {
final String threadName = Thread.currentThread().getName();
try {
for (int i=0; i<iterations; i++) {
PList candidate = lists[i%numLists];
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
Object last = candidate.addLast(String.valueOf(i), payload);
getFirst(candidate);
assertTrue(candidate.remove(last));
}
}
} catch (Exception error) {
error.printStackTrace();
exceptions.add(error);
} finally {
Thread.currentThread().setName(threadName);
}
}
};
executor.execute(new A());
executor.execute(new A());
executor.execute(new A());
executor.execute(new B());
executor.execute(new B());
executor.execute(new B());
executor.shutdown();
boolean finishedInTime = executor.awaitTermination(30, TimeUnit.SECONDS);
assertTrue("no exceptions", exceptions.isEmpty());
assertTrue("finished ok", finishedInTime);
}
protected abstract PListStore createConcurrentAddRemovePListStore();
@Test
public void testConcurrentAddLast() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = createPListStore();
store.setDirectory(directory);
store.start();
final int numThreads = 20;
final int iterations = 1000;
executor = Executors.newFixedThreadPool(100);
for (int i=0; i<numThreads; i++) {
new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
}
for (int i=0; i<numThreads; i++) {
executor.execute(new Job(i, PListTestSupport.TaskType.ITERATE, iterations));
}
for (int i=0; i<100; i++) {
executor.execute(new Job(i+20, PListTestSupport.TaskType.ADD, 100));
}
executor.shutdown();
boolean finishedInTime = executor.awaitTermination(60*5, TimeUnit.SECONDS);
assertTrue("finished ok", finishedInTime);
}
@Test
public void testOverFlow() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = createPListStore();
store.setDirectory(directory);
store.start();
for (int i=0;i<2000; i++) {
new Job(i, PListTestSupport.TaskType.ADD, 5).run();
}
// LOG.info("After Load index file: " + store.pageFile.getFile().length());
// LOG.info("After remove index file: " + store.pageFile.getFile().length());
}
@Test
public void testConcurrentAddRemoveWithPreload() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = createConcurrentAddRemoveWithPreloadPListStore();
store.setDirectory(directory);
store.start();
final int iterations = 500;
final int numLists = 10;
// prime the store
// create/delete
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.CREATE, iterations).run();
}
LOG.info("delete");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.DELETE, iterations).run();
}
LOG.info("fill");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
}
LOG.info("remove");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.REMOVE, iterations).run();
}
LOG.info("check empty");
for (int i=0; i<numLists;i++) {
assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
}
LOG.info("delete again");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.DELETE, iterations).run();
}
LOG.info("fill again");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
}
LOG.info("parallel add and remove");
executor = Executors.newFixedThreadPool(numLists*2);
for (int i=0; i<numLists*2; i++) {
executor.execute(new Job(i, i>=numLists ? PListTestSupport.TaskType.ADD : PListTestSupport.TaskType.REMOVE, iterations));
}
executor.shutdown();
LOG.info("wait for parallel work to complete");
boolean finishedInTime = executor.awaitTermination(60*5, TimeUnit.SECONDS);
assertTrue("no exceptions", exceptions.isEmpty());
assertTrue("finished ok", finishedInTime);
}
protected abstract PListStore createConcurrentAddRemoveWithPreloadPListStore();
// for non determinant issues, increasing this may help diagnose
final int numRepeats = 1;
@Test
public void testRepeatStressWithCache() throws Exception {
for (int i=0; i<numRepeats;i++) {
do_testConcurrentAddIterateRemove(true);
}
}
@Test
public void testRepeatStressWithOutCache() throws Exception {
for (int i=0; i<numRepeats;i++) {
do_testConcurrentAddIterateRemove(false);
}
}
public void do_testConcurrentAddIterateRemove(boolean enablePageCache) throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = createConcurrentAddIterateRemovePListStore(enablePageCache);
store.setDirectory(directory);
store.start();
final int iterations = 500;
final int numLists = 10;
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.CREATE, iterations).run();
}
LOG.info("fill");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.ADD, iterations).run();
}
LOG.info("parallel add and remove");
executor = Executors.newFixedThreadPool(400);
final int numProducer = 5;
final int numConsumer = 10;
for (int i=0; i<numLists; i++) {
for (int j=0; j<numProducer; j++) {
executor.execute(new Job(i, PListTestSupport.TaskType.ADD, iterations*2));
}
for (int k=0;k<numConsumer; k++) {
executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations/4));
}
}
for (int i=numLists; i<numLists*10; i++) {
executor.execute(new Job(i, PListTestSupport.TaskType.ADD, iterations));
}
executor.shutdown();
LOG.info("wait for parallel work to complete");
boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
assertTrue("test did not timeout ", shutdown);
}
protected abstract PListStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache);
@Ignore("Takes too long.. might have broken it.")
@Test
public void testConcurrentAddIterate() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = createConcurrentAddIteratePListStore();
store.setDirectory(directory);
store.start();
final int iterations = 250;
final int numLists = 10;
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTestSupport.TaskType.CREATE, iterations).run();
}
LOG.info("parallel add and iterate");
// We want a lot of adds occurring so that new free pages get created along
// with overlapping seeks from the iterators so that we are likely to seek into
// some bad area in the page file.
executor = Executors.newFixedThreadPool(400);
final int numProducer = 300;
final int numConsumer = 100;
for (int i=0; i<numLists; i++) {
for (int j=0; j<numProducer; j++) {
executor.execute(new Job(i, PListTestSupport.TaskType.ADD, iterations));
}
for (int k=0;k<numConsumer; k++) {
executor.execute(new Job(i, TaskType.ITERATE, iterations*2));
}
}
executor.shutdown();
LOG.info("wait for parallel work to complete");
boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
assertTrue("test did not timeout ", shutdown);
// LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
}
abstract protected PListStore createConcurrentAddIteratePListStore();
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
ConcurrentHashMap<String, Object> entries = new ConcurrentHashMap<String, Object>();
class Job implements Runnable {
int id;
TaskType task;
int iterations;
public Job(int id, TaskType t, int iterations) {
this.id = id;
this.task = t;
this.iterations = iterations;
}
@Override
public void run() {
final String threadName = Thread.currentThread().getName();
try {
PList plist = null;
switch (task) {
case CREATE:
Thread.currentThread().setName("C:"+id);
plist = store.getPList(String.valueOf(id));
LOG.info("Job-" + id + ", CREATE");
break;
case DELETE:
Thread.currentThread().setName("D:"+id);
store.removePList(String.valueOf(id));
break;
case ADD:
Thread.currentThread().setName("A:"+id);
plist = store.getPList(String.valueOf(id));
for (int j = 0; j < iterations; j++) {
synchronized (plistLocks(plist)) {
if (exceptions.isEmpty()) {
String key = "PL>" + id + idSeed + "-" + j;
entries.put(key, plist.addLast(key, payload));
} else {
break;
}
}
}
if (exceptions.isEmpty()) {
LOG.info("Job-" + id + ", Add, done: " + iterations);
}
break;
case REMOVE:
Thread.currentThread().setName("R:"+id);
plist = store.getPList(String.valueOf(id));
synchronized (plistLocks(plist)) {
for (int j = iterations -1; j >= 0; j--) {
String key = "PL>" + id + idSeed + "-" + j;
Object position = entries.remove(key);
if( position!=null ) {
plist.remove(position);
}
if (j > 0 && j % (iterations / 2) == 0) {
LOG.info("Job-" + id + " Done remove: " + j);
}
}
}
break;
case ITERATE:
Thread.currentThread().setName("I:"+id);
plist = store.getPList(String.valueOf(id));
int iterateCount = 0;
synchronized (plistLocks(plist)) {
if (exceptions.isEmpty()) {
Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext() && exceptions.isEmpty()) {
iterator.next();
iterateCount++;
}
//LOG.info("Job-" + id + " Done iterate: it=" + iterator + ", count:" + iterateCount + ", size:" + plist.size());
if (plist.size() != iterateCount) {
System.err.println("Count Wrong: " + iterator);
}
assertEquals("iterate got all " + id + " iterator:" + iterator , plist.size(), iterateCount);
}
}
break;
case ITERATE_REMOVE:
Thread.currentThread().setName("IRM:"+id);
plist = store.getPList(String.valueOf(id));
int removeCount = 0;
synchronized (plistLocks(plist)) {
Iterator<PListEntry> removeIterator = plist.iterator();
while (removeIterator.hasNext()) {
removeIterator.next();
removeIterator.remove();
if (removeCount++ > iterations) {
break;
}
}
}
LOG.info("Job-" + id + " Done remove: " + removeCount);
break;
default:
}
} catch (Exception e) {
LOG.warn("Job["+id+"] caught exception: " + e.getMessage());
e.printStackTrace();
exceptions.add(e);
if (executor != null) {
executor.shutdownNow();
}
} finally {
Thread.currentThread().setName(threadName);
}
}
}
Map<PList, Object> locks = new HashMap<PList, Object>();
private Object plistLocks(PList plist) {
Object lock = null;
synchronized (locks) {
if (locks.containsKey(plist)) {
lock = locks.get(plist);
} else {
lock = new Object();
locks.put(plist, lock);
}
}
return lock;
}
@Before
public void setUp() throws Exception {
File directory = new File("target/test/PlistDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
startStore(directory);
}
protected void startStore(File directory) throws Exception {
store = createPListStore();
store.setDirectory(directory);
store.start();
plist = store.getPList("main");
}
abstract protected PListStore createPListStore();
@After
public void tearDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
store.stop();
exceptions.clear();
}
}

View File

@ -35,6 +35,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
private transient AtomicReference<Object> dataLocator = new AtomicReference<Object>();
private transient Object entryLocator;
private transient Object plistLocator;
public MessageId() {
this.producerId = new ProducerId();
@ -153,6 +154,8 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
copy.key = key;
copy.brokerSequenceId = brokerSequenceId;
copy.dataLocator = new AtomicReference<Object>(dataLocator != null ? dataLocator.get() : null);
copy.entryLocator = entryLocator;
copy.plistLocator = plistLocator;
return copy;
}
@ -192,4 +195,12 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
public void setEntryLocator(Object entryLocator) {
this.entryLocator = entryLocator;
}
public Object getPlistLocator() {
return plistLocator;
}
public void setPlistLocator(Object plistLocator) {
this.plistLocator = plistLocator;
}
}

View File

@ -23,10 +23,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.*;
import org.apache.activemq.usage.SystemUsage;
import org.apache.derby.iapi.jdbc.BrokeredStatement;
import org.junit.After;
@ -52,6 +49,7 @@ public class PriorityNetworkDispatchPolicyTest {
info.setConsumerId(id);
info.setNetworkSubscription(true);
info.setNetworkConsumerPath(new ConsumerId[]{id});
node.setMessageId(new MessageId("test:1:1:1:1"));
}
@After

View File

@ -46,6 +46,7 @@ public class FilePendingMessageCursorTestSupport {
private void createBrokerWithTempStoreLimit() throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(false);
SystemUsage usage = brokerService.getSystemUsage();
usage.getTempUsage().setLimit(1025*1024*15);

View File

@ -22,6 +22,7 @@ public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursor
@Test
public void testAddRemoveAddIndexSize() throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(false);
SystemUsage usage = brokerService.getSystemUsage();
usage.getMemoryUsage().setLimit(1024*150);
String body = new String(new byte[1024]);

View File

@ -50,6 +50,19 @@ public class PListTest {
final Vector<Throwable> exceptions = new Vector<Throwable>();
ExecutorService executor;
private PListEntry getFirst(PList plist) throws IOException {
PList.PListIterator iterator = plist.iterator();
try {
if( iterator.hasNext() ) {
return iterator.next();
} else {
return null;
}
}finally {
iterator.release();
}
}
@Test
public void testAddLast() throws Exception {
final int COUNT = 1000;
@ -208,7 +221,7 @@ public class PListTest {
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
candidate.addLast(String.valueOf(i), payload);
candidate.getFirst();
getFirst(candidate);
assertTrue(candidate.remove(String.valueOf(i)));
}
}
@ -232,7 +245,7 @@ public class PListTest {
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
candidate.addLast(String.valueOf(i), payload);
candidate.getFirst();
getFirst(candidate);
assertTrue(candidate.remove(String.valueOf(i)));
}
}

View File

@ -123,6 +123,12 @@
<!-- =============================== -->
<!-- Testing Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -26,6 +26,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
@ -34,6 +36,7 @@ import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,7 +54,6 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
setValueMarshaller(LocationMarshaller.INSTANCE);
}
@Override
public void setName(String name) {
this.name = name;
}
@ -81,8 +83,20 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
}
class Locator {
final String id;
Locator(String id) {
this.id = id;
}
PListImpl plist() {
return PListImpl.this;
}
}
@Override
public void addLast(final String id, final ByteSequence bs) throws IOException {
public Object addLast(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@ -91,10 +105,11 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
});
}
return new Locator(id);
}
@Override
public void addFirst(final String id, final ByteSequence bs) throws IOException {
public Object addFirst(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@ -103,9 +118,17 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
});
}
return new Locator(id);
}
@Override
public boolean remove(final Object l) throws IOException {
Locator locator = (Locator) l;
assert locator!=null;
assert locator.plist()==this;
return remove(locator.id);
}
public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
@ -118,7 +141,6 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
return result.get();
}
@Override
public boolean remove(final long position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
@ -138,7 +160,6 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
return result.get();
}
@Override
public PListEntry get(final long position) throws IOException {
PListEntry result = null;
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@ -152,12 +173,11 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getValue());
result = new PListEntry(ref.get().getKey(), bs);
result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
}
return result;
}
@Override
public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@ -170,12 +190,11 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getValue());
result = new PListEntry(ref.get().getKey(), bs);
result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
}
return result;
}
@Override
public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@ -188,7 +207,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getValue());
result = new PListEntry(ref.get().getKey(), bs);
result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
}
return result;
}
@ -230,7 +249,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList {
e.initCause(unexpected);
throw e;
}
return new PListEntry(entry.getKey(), bs);
return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey()));
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb.plist;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListStore;
@ -31,6 +32,7 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.*;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -0,0 +1,52 @@
package org.apache.activemq.store.kahadb.plist;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PListTestSupport;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class PListImplTest extends PListTestSupport {
@Override
protected PListStoreImpl createPListStore() {
return new PListStoreImpl();
}
protected PListStore createConcurrentAddIteratePListStore() {
PListStoreImpl store = createPListStore();
store.setIndexPageSize(2 * 1024);
store.setJournalMaxFileLength(1024 * 1024);
store.setCleanupInterval(-1);
store.setIndexEnablePageCaching(false);
store.setIndexWriteBatchSize(100);
return store;
}
@Override
protected PListStore createConcurrentAddRemovePListStore() {
PListStoreImpl store = createPListStore();
store.setCleanupInterval(400);
store.setJournalMaxFileLength(1024*5);
store.setLazyInit(false);
return store;
}
@Override
protected PListStore createConcurrentAddRemoveWithPreloadPListStore() {
PListStoreImpl store = createPListStore();
store.setJournalMaxFileLength(1024*5);
store.setCleanupInterval(5000);
store.setIndexWriteBatchSize(500);
return store;
}
@Override
protected PListStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache) {
PListStoreImpl store = createPListStore();
store.setIndexEnablePageCaching(enablePageCache);
store.setIndexPageSize(2*1024);
return store;
}
}

View File

@ -32,6 +32,15 @@ import util.TimeMetric
import java.util.HashMap
import collection.mutable.{HashSet, ListBuffer}
import org.apache.activemq.util.ByteSequence
import org.apache.activemq.leveldb.QueueEntryRecord
import util.TimeMetric
import org.apache.activemq.leveldb.SubAckRecord
import scala.Some
import org.apache.activemq.leveldb.UowManagerConstants.QueueEntryKey
import org.apache.activemq.leveldb.CountDownFuture
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.MessageRecord
import org.apache.activemq.leveldb.DurableSubscription
case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
var locator:(Long, Int) = _
@ -359,6 +368,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
class DBManager(val parent:LevelDBStore) {
var lastCollectionKey = new AtomicLong(0)
var lastPListKey = new AtomicLong(0)
val client:LevelDBClient = parent.createClient
def writeExecutor = client.writeExecutor
@ -658,7 +668,7 @@ class DBManager(val parent:LevelDBStore) {
}
def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE))
parent.createQueueMessageStore(dest, createCollection(utf8(dest.getQualifiedName), QUEUE_COLLECTION_TYPE))
}
def destroyQueueStore(key:Long) = writeExecutor.sync {
client.removeCollection(key)
@ -697,14 +707,14 @@ class DBManager(val parent:LevelDBStore) {
}
def createTopicStore(dest:ActiveMQTopic) = {
var key = createStore(dest, TOPIC_COLLECTION_TYPE)
var key = createCollection(utf8(dest.getQualifiedName), TOPIC_COLLECTION_TYPE)
parent.createTopicMessageStore(dest, key)
}
def createStore(destination:ActiveMQDestination, collectionType:Int) = {
def createCollection(name:Buffer, collectionType:Int) = {
val collection = new CollectionRecord.Bean()
collection.setType(collectionType)
collection.setMeta(utf8(destination.getQualifiedName))
collection.setMeta(name)
collection.setKey(lastCollectionKey.incrementAndGet())
val buffer = collection.freeze()
buffer.toFramedBuffer // eager encode the record.
@ -714,19 +724,10 @@ class DBManager(val parent:LevelDBStore) {
collection.getKey
}
def createTransactionContainer(name:XATransactionId) = {
val collection = new CollectionRecord.Bean()
collection.setType(TRANSACTION_COLLECTION_TYPE)
var packet = parent.wireFormat.marshal(name)
collection.setMeta(new Buffer(packet.data, packet.offset, packet.length))
collection.setKey(lastCollectionKey.incrementAndGet())
val buffer = collection.freeze()
buffer.toFramedBuffer // eager encode the record.
writeExecutor.sync {
client.addCollection(buffer)
}
collection.getKey
}
def buffer(packet:ByteSequence) = new Buffer(packet.data, packet.offset, packet.length)
def createTransactionContainer(id:XATransactionId) =
createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE)
def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
client.removeCollection(key)
@ -773,6 +774,18 @@ class DBManager(val parent:LevelDBStore) {
lastCollectionKey.set(last)
}
def createPList(name:String):LevelDBStore#LevelDBPList = {
parent.createPList(name, lastPListKey.incrementAndGet())
}
def destroyPList(key:Long) = writeExecutor.sync {
client.removePlist(key)
}
def plistPut(key:Array[Byte], value:Array[Byte]) = client.plistPut(key, value)
def plistGet(key:Array[Byte]) = client.plistGet(key)
def plistDelete(key:Array[Byte]) = client.plistDelete(key)
def plistIterator = client.plistIterator
def getMessage(x: MessageId):Message = {
val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)

View File

@ -27,16 +27,18 @@ import org.iq80.leveldb._
import org.fusesource.hawtdispatch._
import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
import util._
import org.apache.activemq.leveldb.util._
import java.util.concurrent._
import org.fusesource.hawtbuf._
import java.io.{ObjectInputStream, ObjectOutputStream, File}
import scala.Option._
import org.apache.activemq.command.{MessageAck, DataStructure, Message}
import org.apache.activemq.command.{MessageAck, Message}
import org.apache.activemq.util.ByteSequence
import org.apache.activemq.leveldb.RecordLog.LogInfo
import java.text.SimpleDateFormat
import java.util.{Date, Collections}
import org.apache.activemq.leveldb.util.TimeMetric
import org.apache.activemq.leveldb.RecordLog.LogInfo
import scala.Some
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -58,6 +60,8 @@ object LevelDBClient extends Log {
override def shutdownNow = Collections.emptyList[Runnable]
}
val PLIST_WRITE_OPTIONS = new WriteOptions().sync(false)
final val DIRTY_INDEX_KEY = bytes(":dirty")
final val LOG_REF_INDEX_KEY = bytes(":log-refs")
final val COLLECTION_META_KEY = bytes(":collection-meta")
@ -112,6 +116,18 @@ object LevelDBClient extends Log {
(in.readVarLong(), in.readVarInt())
}
def encodeLongLong(a1:Long, a2:Long) = {
val out = new DataByteArrayOutputStream(8)
out.writeLong(a1)
out.writeLong(a2)
out.toBuffer
}
def decodeLongLong(bytes:Array[Byte]):(Long, Long) = {
val in = new DataByteArrayInputStream(bytes)
(in.readLong(), in.readLong())
}
def encodeLong(a1:Long) = {
val out = new DataByteArrayOutputStream(8)
out.writeLong(a1)
@ -404,6 +420,7 @@ class LevelDBClient(store: LevelDBStore) {
var log:RecordLog = _
var index:RichDB = _
var plist:RichDB = _
var indexOptions:Options = _
var lastIndexSnapshotPos:Long = _
@ -414,6 +431,7 @@ class LevelDBClient(store: LevelDBStore) {
val collectionMeta = HashMap[Long, CollectionMeta]()
def plistIndexFile = directory / ("plist"+INDEX_SUFFIX)
def dirtyIndexFile = directory / ("dirty"+INDEX_SUFFIX)
def tempIndexFile = directory / ("temp"+INDEX_SUFFIX)
def snapshotIndexFile(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
@ -526,6 +544,11 @@ class LevelDBClient(store: LevelDBStore) {
retry {
// Setup the plist index.
plistIndexFile.recursiveDelete
plistIndexFile.mkdirs()
plist = new RichDB(factory.open(plistIndexFile, indexOptions));
// Delete the dirty indexes
dirtyIndexFile.recursiveDelete
dirtyIndexFile.mkdirs()
@ -1275,4 +1298,23 @@ class LevelDBClient(store: LevelDBStore) {
}
}
def removePlist(collectionKey: Long) = {
val entryKeyPrefix = encodeLong(collectionKey)
collectionMeta.remove(collectionKey)
retry {
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(false)
plist.cursorPrefixed(entryKeyPrefix, ro) { (key, value)=>
plist.delete(key)
true
}
}
}
def plistPut(key:Array[Byte], value:Array[Byte]) = plist.put(key, value, PLIST_WRITE_OPTIONS)
def plistDelete(key:Array[Byte]) = plist.delete(key, PLIST_WRITE_OPTIONS)
def plistGet(key:Array[Byte]) = plist.get(key)
def plistIterator = plist.db.iterator()
}

View File

@ -23,8 +23,8 @@ import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage
import java.io.File
import java.io.IOException
import java.util.concurrent.{CountDownLatch, ExecutionException, Future}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ExecutionException, Future}
import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import reflect.BeanProperty
import org.apache.activemq.store._
import java.util._
@ -34,6 +34,9 @@ import javax.management.ObjectName
import org.apache.activemq.broker.jmx.AnnotatedMBean
import org.apache.activemq.util._
import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
import org.apache.activemq.store.PList.PListIterator
import java.lang
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer}
object LevelDBStore extends Log {
val DEFAULT_DIRECTORY = new File("LevelDB");
@ -111,7 +114,7 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
import LevelDBStore._
class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore {
final val wireFormat = new OpenWireFormat
final val db = new DBManager(this)
@ -157,6 +160,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
def init() = {}
@ -299,7 +303,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
if( prepared ) {
uow.dequeue(xacontainer_id, message.getMessageId)
}
message.setMessageId(message.getMessageId.copy())
var copy = message.getMessageId.copy()
copy.setEntryLocator(null)
message.setMessageId(copy)
store.doAdd(uow, message, delay)
}
@ -370,7 +376,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
preCommit.run()
transactions.remove(txid) match {
case None=>
println("The transaction does not exist")
postCommit.run()
case Some(tx)=>
val done = new CountDownLatch(1)
@ -433,6 +438,31 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
def getPList(name: String): PList = {
this.synchronized(plists.get(name)).getOrElse(db.createPList(name))
}
def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = {
var rc = new LevelDBPList(name, key)
this.synchronized {
plists.put(name, rc)
}
rc
}
def removePList(name: String): Boolean = {
plists.remove(name) match {
case Some(list)=>
db.destroyPList(list.key)
list.listSize.set(0)
true
case None =>
false
}
}
def createMessageStore(destination: ActiveMQDestination):LevelDBStore#LevelDBMessageStore = {
destination match {
case destination:ActiveMQQueue =>
@ -531,6 +561,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
message.getMessageId.setEntryLocator(null)
if( message.getTransactionId!=null ) {
transaction(message.getTransactionId).add(this, message, delay)
DONE
@ -764,6 +795,101 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
}
class LevelDBPList(val name: String, val key: Long) extends PList {
import LevelDBClient._
val lastSeq = new AtomicLong(Long.MaxValue/2)
val firstSeq = new AtomicLong(lastSeq.get+1)
val listSize = new AtomicLong(0)
def getName: String = name
def destroy() = {
removePList(name)
}
def addFirst(id: String, bs: ByteSequence): AnyRef = {
var pos = lastSeq.decrementAndGet()
add(pos, id, bs)
listSize.incrementAndGet()
new lang.Long(pos)
}
def addLast(id: String, bs: ByteSequence): AnyRef = {
var pos = lastSeq.incrementAndGet()
add(pos, id, bs)
listSize.incrementAndGet()
new lang.Long(pos)
}
def add(pos:Long, id: String, bs: ByteSequence) = {
val encoded_key = encodeLongLong(key, pos)
val encoded_id = new UTF8Buffer(id)
val os = new DataByteArrayOutputStream(2+encoded_id.length+bs.length)
os.writeShort(encoded_id.length)
os.write(encoded_id.data, encoded_id.offset, encoded_id.length)
os.write(bs.getData, bs.getOffset, bs.getLength)
db.plistPut(encoded_key, os.toBuffer.toByteArray)
}
def remove(position: AnyRef): Boolean = {
val pos = position.asInstanceOf[lang.Long].longValue()
val encoded_key = encodeLongLong(key, pos)
db.plistGet(encoded_key) match {
case Some(value) =>
db.plistDelete(encoded_key)
listSize.decrementAndGet()
true
case None =>
false
}
}
def isEmpty = size()==0
def size(): Long = listSize.get()
def iterator() = new PListIterator() {
val prefix = LevelDBClient.encodeLong(key)
var dbi = db.plistIterator
var last_key:Array[Byte] = _
dbi.seek(prefix);
def hasNext: Boolean = dbi!=null && dbi.hasNext && dbi.peekNext.getKey.startsWith(prefix)
def next() = {
if ( dbi==null || !dbi.hasNext ) {
throw new NoSuchElementException();
}
val n = dbi.peekNext();
last_key = n.getKey
val (k, pos) = decodeLongLong(last_key)
if( k!=key ) {
throw new NoSuchElementException();
}
var value = n.getValue
val is = new org.fusesource.hawtbuf.DataByteArrayInputStream(value)
val id = is.readBuffer(is.readShort()).utf8().toString
val data = new ByteSequence(value, is.getPos, value.length-is.getPos)
dbi.next()
new PListEntry(id, data, pos)
}
def release() = {
dbi.close()
dbi = null
}
def remove() = {
if( last_key==null ) {
throw new NoSuchElementException();
}
db.plistDelete(last_key)
listSize.decrementAndGet()
last_key = null
}
}
}
///////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.leveldb;
import org.apache.activemq.store.PListTestSupport;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class LevelDBPlistTest extends PListTestSupport {
@Override
protected LevelDBStore createPListStore() {
return new LevelDBStore();
}
protected LevelDBStore createConcurrentAddIteratePListStore() {
return new LevelDBStore();
}
@Override
protected LevelDBStore createConcurrentAddRemovePListStore() {
return new LevelDBStore();
}
@Override
protected LevelDBStore createConcurrentAddRemoveWithPreloadPListStore() {
return new LevelDBStore();
}
@Override
protected LevelDBStore createConcurrentAddIterateRemovePListStore(boolean enablePageCache) {
return new LevelDBStore();
}
}