https://issues.apache.org/jira/browse/AMQ-3351 - Usage of the temp store index by the PList needs the be improved. new implementation puts the max entries in a page, reading/writing requires substantially less page access and disk access when pending messages build up

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1130607 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-06-02 15:28:30 +00:00
parent f0edbfdd67
commit 2b102598dc
22 changed files with 543 additions and 868 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
@ -228,13 +229,22 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e);
} else if (TRANSPORTLOG.isInfoEnabled()) {
} else if (TRANSPORTLOG.isInfoEnabled() && !expected(e)) {
TRANSPORTLOG.info("Transport failed: " + e);
}
stopAsync();
}
}
private boolean expected(IOException e) {
return e instanceof SocketException && isStomp() && e.getMessage().indexOf("reset") != -1;
}
private boolean isStomp() {
URI uri = connector.getUri();
return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
}
/**
* Calls the serviceException method in an async thread. Since handling a
* service exception closes a socket, we should not tie up broker threads

View File

@ -583,12 +583,12 @@ public abstract class BaseDestination implements Destination {
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (systemUsage.isSendFailIfNoSpace()) {
getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
}
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
}
} else {
@ -601,7 +601,7 @@ public abstract class BaseDestination implements Destination {
long now = System.currentTimeMillis();
if (now >= nextWarn) {
getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
nextWarn = now + blockedProducerWarningInterval;
}
}

View File

@ -631,7 +631,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} else {
if (memoryUsage.isFull()) {
waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
+ message.getProducerId() + ") stopped to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
@ -738,7 +738,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
if (message.isPersistent()) {
if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
@ -747,7 +747,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
} else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
final String logMessage = "Usage Manager Temp Store is Full ("
final String logMessage = "Temp Store is Full ("
+ systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
+"). Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."

View File

@ -293,10 +293,7 @@ public class Topic extends BaseDestination implements Task {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG
.info("Usage Manager memory limit ("
+ memoryUsage.getLimit()
+ ") reached for "
LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
+ getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
@ -304,7 +301,7 @@ public class Topic extends BaseDestination implements Task {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
+ memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
@ -379,7 +376,7 @@ public class Topic extends BaseDestination implements Task {
waitForSpace(
context,
memoryUsage,
"Usage Manager memory limit reached. Stopping producer ("
"Usage Manager Memory Usage limit reached. Stopping producer ("
+ message.getProducerId()
+ ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName()
@ -427,7 +424,7 @@ public class Topic extends BaseDestination implements Task {
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";

View File

@ -135,7 +135,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
iterating = false;
if (flushRequired) {
flushRequired = false;
flushToDisk();
if (!hasSpace()) {
flushToDisk();
}
}
}
@ -151,8 +153,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
private void destroyDiskList() throws Exception {
if (!isDiskListEmpty()) {
if (diskList != null) {
store.removePList(name);
diskList = null;
}
}
@ -335,7 +338,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/
@Override
public synchronized int size() {
return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
}
/**
@ -374,12 +377,14 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
flushRequired = true;
if (!iterating) {
expireOldMessages();
if (!hasSpace()) {
flushToDisk();
flushRequired = false;
if (!flushRequired) {
flushRequired =true;
if (!iterating) {
expireOldMessages();
if (!hasSpace()) {
flushToDisk();
flushRequired = false;
}
}
}
}
@ -412,8 +417,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
protected synchronized void flushToDisk() {
if (!memoryList.isEmpty()) {
long start = 0;
if (LOG.isTraceEnabled()) {
start = System.currentTimeMillis();
LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size() + " " + (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
}
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
node.decrementReferenceCount();
@ -429,6 +438,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
memoryList.clear();
setCacheEnabled(false);
if (LOG.isTraceEnabled()) {
LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
}
}
}
@ -471,35 +483,23 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
final class DiskIterator implements Iterator<MessageReference> {
private PListEntry next = null;
private PListEntry current = null;
PList list;
private final Iterator<PListEntry> iterator;
DiskIterator() {
try {
this.list = getDiskList();
synchronized (this.list) {
this.current = this.list.getFirst();
this.next = this.current;
}
iterator = getDiskList().iterator();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public boolean hasNext() {
return this.next != null;
return iterator.hasNext();
}
public MessageReference next() {
this.current = next;
try {
ByteSequence bs = this.current.getByteSequence();
synchronized (this.list) {
this.current = this.list.refresh(this.current);
this.next = this.list.getNext(this.current);
}
return getMessage(bs);
PListEntry entry = iterator.next();
return getMessage(entry.getByteSequence());
} catch (IOException e) {
LOG.error("I/O error", e);
throw new RuntimeException(e);
@ -507,17 +507,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
public void remove() {
try {
synchronized (this.list) {
this.current = this.list.refresh(this.current);
this.list.remove(this.current);
}
} catch (IOException e) {
LOG.error("I/O error", e);
throw new RuntimeException(e);
}
iterator.remove();
}
}

View File

@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
@ -63,6 +62,7 @@ import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.kahadb.util.LocationMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.index.BTreeIndex;
@ -816,7 +816,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* @throws IOException
*/
public JournalCommand<?> load(Location location) throws IOException {
long start = System.currentTimeMillis();
ByteSequence data = journal.read(location);
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
}
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
@ -1472,34 +1477,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
static class LocationMarshaller implements Marshaller<Location> {
final static LocationMarshaller INSTANCE = new LocationMarshaller();
public Location readPayload(DataInput dataIn) throws IOException {
Location rc = new Location();
rc.setDataFileId(dataIn.readInt());
rc.setOffset(dataIn.readInt());
return rc;
}
public void writePayload(Location object, DataOutput dataOut) throws IOException {
dataOut.writeInt(object.getDataFileId());
dataOut.writeInt(object.getOffset());
}
public int getFixedSize() {
return 8;
}
public Location deepCopy(Location source) {
return new Location(source);
}
public boolean isDeepCopySupported() {
return true;
}
}
static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
@ -1569,7 +1546,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// Figure out the next key using the last entry in the destination.
rc.orderIndex.configureLast(tx);
rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
rc.locationIndex.load(tx);

View File

@ -1,162 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.util.VariableMarshaller;
class EntryLocation {
static final long NOT_SET = -1;
private String id;
private Page<EntryLocation> page;
private long next;
private long prev;
private Location location;
static class EntryLocationMarshaller extends VariableMarshaller<EntryLocation> {
static final EntryLocationMarshaller INSTANCE = new EntryLocationMarshaller();
public EntryLocation readPayload(DataInput dataIn) throws IOException {
EntryLocation result = new EntryLocation();
result.readExternal(dataIn);
return result;
}
public void writePayload(EntryLocation value, DataOutput dataOut) throws IOException {
value.writeExternal(dataOut);
}
}
EntryLocation(Location location) {
this.location = location;
}
EntryLocation() {
}
EntryLocation copy() {
EntryLocation result = new EntryLocation();
result.id=this.id;
result.location=this.location;
result.next=this.next;
result.prev=this.prev;
result.page=this.page;
return result;
}
void reset() {
this.id = "";
this.next = NOT_SET;
this.prev = NOT_SET;
}
public void readExternal(DataInput in) throws IOException {
this.id = in.readUTF();
this.prev = in.readLong();
this.next = in.readLong();
if (this.location == null) {
this.location = new Location();
}
this.location.readExternal(in);
}
public void writeExternal(DataOutput out) throws IOException {
out.writeUTF(this.id);
out.writeLong(this.prev);
out.writeLong(this.next);
if (this.location == null) {
this.location = new Location();
}
this.location.writeExternal(out);
}
/**
* @return the jobId
*/
String getId() {
return this.id;
}
/**
* @param id
* the id to set
*/
void setId(String id) {
this.id = id;
}
Location getLocation() {
return this.location;
}
/**
* @param location
* the location to set
*/
void setLocation(Location location) {
this.location = location;
}
/**
* @return the next
*/
long getNext() {
return this.next;
}
/**
* @param next
* the next to set
*/
void setNext(long next) {
this.next = next;
}
/**
* @return the prev
*/
long getPrev() {
return this.prev;
}
/**
* @param prev
* the prev to set
*/
void setPrev(long prev) {
this.prev = prev;
}
/**
* @return the page
*/
Page<EntryLocation> getPage() {
return this.page;
}
/**
* @param page
* the page to set
*/
void setPage(Page<EntryLocation> page) {
this.page = page;
}
}

View File

@ -19,505 +19,241 @@ package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
import org.apache.kahadb.index.ListIndex;
import org.apache.kahadb.index.ListNode;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.LocationMarshaller;
import org.apache.kahadb.util.StringMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PList {
public class PList extends ListIndex<String, Location> {
static final Logger LOG = LoggerFactory.getLogger(PList.class);
final PListStore store;
private String name;
private long rootId = EntryLocation.NOT_SET;
private long lastId = EntryLocation.NOT_SET;
private final AtomicBoolean loaded = new AtomicBoolean();
private int size = 0;
Object indexLock;
PList(PListStore store) {
this.store = store;
this.indexLock = store.getIndexLock();
setPageFile(store.getPageFile());
setKeyMarshaller(StringMarshaller.INSTANCE);
setValueMarshaller(LocationMarshaller.INSTANCE);
}
public void setName(String name) {
this.name = name;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#getName()
*/
public String getName() {
return this.name;
}
public synchronized int size() {
return this.size;
}
public synchronized boolean isEmpty() {
return size == 0;
}
/**
* @return the rootId
*/
public long getRootId() {
return this.rootId;
}
/**
* @param rootId
* the rootId to set
*/
public void setRootId(long rootId) {
this.rootId = rootId;
}
/**
* @return the lastId
*/
public long getLastId() {
return this.lastId;
}
/**
* @param lastId
* the lastId to set
*/
public void setLastId(long lastId) {
this.lastId = lastId;
}
/**
* @return the loaded
*/
public boolean isLoaded() {
return this.loaded.get();
}
void read(DataInput in) throws IOException {
this.rootId = in.readLong();
this.name = in.readUTF();
this.headPageId = in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(this.rootId);
out.writeUTF(name);
out.writeLong(this.headPageId);
}
public synchronized void destroy() throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
destroy(tx);
clear(tx);
unload(tx);
}
});
}
}
void destroy(Transaction tx) throws IOException {
// start from the first
EntryLocation entry = getFirst(tx);
while (entry != null) {
EntryLocation toRemove = entry.copy();
entry = getNext(tx, entry.getNext());
doRemove(tx, toRemove);
public void addLast(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
add(tx, id, location);
}
});
}
}
synchronized void load(Transaction tx) throws IOException {
if (loaded.compareAndSet(false, true)) {
final Page<EntryLocation> p = tx.load(this.rootId, null);
if (p.getType() == Page.PAGE_FREE_TYPE) {
// Need to initialize it..
EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
public void addFirst(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, location);
}
});
}
}
storeEntry(tx, root);
this.lastId = root.getPage().getPageId();
} else {
// find last id
long nextId = this.rootId;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation next = getNext(tx, nextId);
if (next != null) {
this.lastId = next.getPage().getPageId();
nextId = next.getNext();
this.size++;
public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id) != null);
}
});
}
return result.get();
}
public boolean remove(final long position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
if (iterator.hasNext()) {
iterator.next();
iterator.remove();
result.set(true);
} else {
result.set(false);
}
}
}
}
}
synchronized public void unload() {
if (loaded.compareAndSet(true, false)) {
this.rootId = EntryLocation.NOT_SET;
this.lastId = EntryLocation.NOT_SET;
this.size=0;
}
}
synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addLast(tx, id, bs, location);
}
});
}
}
private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
entry.setLocation(location);
storeEntry(tx, entry);
EntryLocation last = loadEntry(tx, this.lastId);
last.setNext(entry.getPage().getPageId());
storeEntry(tx, last);
this.lastId = entry.getPage().getPageId();
this.size++;
}
synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, bs, location);
}
});
}
}
private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
entry.setLocation(location);
EntryLocation oldFirst = getFirst(tx);
if (oldFirst != null) {
oldFirst.setPrev(entry.getPage().getPageId());
storeEntry(tx, oldFirst);
entry.setNext(oldFirst.getPage().getPageId());
}
EntryLocation root = getRoot(tx);
root.setNext(entry.getPage().getPageId());
storeEntry(tx, root);
storeEntry(tx, entry);
this.size++;
}
synchronized public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id));
}
});
}
return result.get();
}
synchronized public boolean remove(final int position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, position));
}
});
}
return result.get();
}
synchronized public boolean remove(final PListEntry entry) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(doRemove(tx, entry.getEntry()));
}
});
}
return result.get();
}
synchronized public PListEntry get(final int position) throws IOException {
public PListEntry get(final long position) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(get(tx, position));
Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
ref.set(iterator.next());
}
});
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
ByteSequence bs = this.store.getPayload(ref.get().getValue());
result = new PListEntry(ref.get().getKey(), bs);
}
return result;
}
synchronized public PListEntry getFirst() throws IOException {
public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getFirst(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getValue());
result = new PListEntry(ref.get().getKey(), bs);
}
return result;
}
synchronized public PListEntry getLast() throws IOException {
public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
}
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getValue());
result = new PListEntry(ref.get().getKey(), bs);
}
return result;
}
synchronized public PListEntry getNext(PListEntry entry) throws IOException {
PListEntry result = null;
final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
if (nextId != EntryLocation.NOT_SET) {
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
synchronized (indexLock) {
public boolean isEmpty() {
return size() == 0;
}
synchronized public Iterator<PListEntry> iterator() throws IOException {
return new PListIterator();
}
private final class PListIterator implements Iterator<PListEntry> {
final Iterator<Map.Entry<String, Location>> iterator;
final Transaction tx;
PListIterator() throws IOException {
tx = store.pageFile.tx();
this.iterator = iterator(tx);
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public PListEntry next() {
Map.Entry<String, Location> entry = iterator.next();
ByteSequence bs = null;
try {
bs = store.getPayload(entry.getValue());
} catch (IOException unexpected) {
NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
e.initCause(unexpected);
throw e;
}
return new PListEntry(entry.getKey(), bs);
}
@Override
public void remove() {
try {
synchronized (indexLock) {
tx.execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
iterator.remove();
}
});
}
} catch (IOException unexpected) {
IllegalStateException e = new IllegalStateException(unexpected);
e.initCause(unexpected);
throw e;
}
}
}
public void claimFileLocations(final Set<Integer> candidates) throws IOException {
synchronized (indexLock) {
if (loaded.get()) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getNext(tx, nextId));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
}
}
return result;
}
synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
}
});
if (ref.get() != null) {
result = new PListEntry(ref.get(), entry.getByteSequence());
}
}
return result;
}
synchronized public void claimFileLocations(final Set<Integer> candidates) throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
long nextId = rootId;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
candidates.remove(entry.getLocation().getDataFileId());
nextId = entry.getNext();
} else {
break;
Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
while (iterator.hasNext()) {
Location location = iterator.next().getValue();
candidates.remove(location.getDataFileId());
}
}
}
});
}
}
boolean remove(Transaction tx, String id) throws IOException {
boolean result = false;
long nextId = this.rootId;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
if (entry.getId().equals(id)) {
result = doRemove(tx, entry);
break;
}
nextId = entry.getNext();
} else {
// not found
break;
});
}
}
return result;
}
boolean remove(Transaction tx, int position) throws IOException {
boolean result = false;
long nextId = this.rootId;
int count = 0;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
if (count == position) {
result = doRemove(tx, entry);
break;
}
nextId = entry.getNext();
} else {
// not found
break;
}
count++;
}
return result;
}
EntryLocation get(Transaction tx, int position) throws IOException {
EntryLocation result = null;
long nextId = this.rootId;
int count = -1;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
if (count == position) {
result = entry;
break;
}
nextId = entry.getNext();
} else {
break;
}
count++;
}
return result;
}
EntryLocation getFirst(Transaction tx) throws IOException {
long offset = getRoot(tx).getNext();
if (offset != EntryLocation.NOT_SET) {
return loadEntry(tx, offset);
}
return null;
}
EntryLocation getLast(Transaction tx) throws IOException {
if (this.lastId != EntryLocation.NOT_SET) {
return loadEntry(tx, this.lastId);
}
return null;
}
private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
boolean result = false;
if (entry != null) {
EntryLocation prev = getPrevious(tx, entry.getPrev());
EntryLocation next = getNext(tx, entry.getNext());
long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
if (next != null) {
next.setPrev(prevId);
storeEntry(tx, next);
} else {
// we are deleting the last one in the list
this.lastId = prevId;
}
if (prev != null) {
prev.setNext(nextId);
storeEntry(tx, prev);
}
entry.reset();
storeEntry(tx, entry);
tx.free(entry.getPage().getPageId());
result = true;
this.size--;
}
return result;
}
private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
Page<EntryLocation> p = tx.allocate();
EntryLocation result = new EntryLocation();
result.setPage(p);
p.set(result);
result.setId(id);
result.setPrev(previous);
result.setNext(next);
return result;
}
private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
EntryLocation result = new EntryLocation();
result.setPage(p);
p.set(result);
result.setId(id);
result.setPrev(previous);
result.setNext(next);
return result;
}
EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
EntryLocation entry = page.get();
if (entry != null) {
entry.setPage(page);
}
return entry;
}
private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
}
EntryLocation getNext(Transaction tx, long next) throws IOException {
EntryLocation result = null;
if (next != EntryLocation.NOT_SET) {
result = loadEntry(tx, next);
}
return result;
}
private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
EntryLocation result = null;
if (previous != EntryLocation.NOT_SET) {
result = loadEntry(tx, previous);
}
return result;
}
private EntryLocation getRoot(Transaction tx) throws IOException {
EntryLocation result = loadEntry(tx, this.rootId);
return result;
}
ByteSequence getPayload(EntryLocation entry) throws IOException {
return this.store.getPayload(entry.getLocation());
@Override
public String toString() {
return "" + name + ",[headPageId=" + headPageId + ",tailPageId=" + tailPageId + ", size=" + size() + "]";
}
}

View File

@ -21,39 +21,22 @@ import org.apache.kahadb.util.ByteSequence;
public class PListEntry {
private final ByteSequence byteSequence;
private final EntryLocation entry;
private final String entry;
PListEntry(EntryLocation entry, ByteSequence bs) {
PListEntry(String entry, ByteSequence bs) {
this.entry = entry;
this.byteSequence = bs;
}
/**
* @return the byteSequence
*/
public ByteSequence getByteSequence() {
return this.byteSequence;
}
public String getId() {
return this.entry.getId();
}
/**
* @return the entry
*/
EntryLocation getEntry() {
return this.entry;
}
public PListEntry copy() {
return new PListEntry(this.entry, this.byteSequence);
}
@Override
public String toString() {
return this.entry.getId() + "[pageId=" + this.entry.getPage().getPageId() + ",next=" + this.entry.getNext()
+ "]";
}
}

View File

@ -73,6 +73,10 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
private Scheduler scheduler;
private long cleanupInterval = 30000;
private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
public Object getIndexLock() {
return indexLock;
}
@ -82,6 +86,30 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
this.scheduler = brokerService.getScheduler();
}
public int getIndexPageSize() {
return indexPageSize;
}
public int getIndexCacheSize() {
return indexCacheSize;
}
public int getIndexWriteBatchSize() {
return indexWriteBatchSize;
}
public void setIndexPageSize(int indexPageSize) {
this.indexPageSize = indexPageSize;
}
public void setIndexCacheSize(int indexCacheSize) {
this.indexCacheSize = indexCacheSize;
}
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.indexWriteBatchSize = indexWriteBatchSize;
}
protected class MetaData {
protected MetaData(PListStore store) {
this.store = store;
@ -89,34 +117,34 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
private final PListStore store;
Page<MetaData> page;
BTreeIndex<String, PList> storedSchedulers;
BTreeIndex<String, PList> lists;
void createIndexes(Transaction tx) throws IOException {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.storedSchedulers.load(tx);
this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
this.lists.setValueMarshaller(new PListMarshaller(this.store));
this.lists.load(tx);
}
void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
Entry<String, PList> entry = i.next();
entry.getValue().load(tx);
schedulers.put(entry.getKey(), entry.getValue());
lists.put(entry.getKey(), entry.getValue());
}
}
public void read(DataInput is) throws IOException {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
this.lists.setValueMarshaller(new PListMarshaller(this.store));
}
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
os.writeLong(this.lists.getPageId());
}
}
@ -137,29 +165,9 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
List<EntryLocation> result = new ArrayList<EntryLocation>();
int size = dataIn.readInt();
for (int i = 0; i < size; i++) {
EntryLocation jobLocation = new EntryLocation();
jobLocation.readExternal(dataIn);
result.add(jobLocation);
}
return result;
}
public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (EntryLocation jobLocation : value) {
jobLocation.writeExternal(dataOut);
}
}
}
class JobSchedulerMarshaller extends VariableMarshaller<PList> {
class PListMarshaller extends VariableMarshaller<PList> {
private final PListStore store;
JobSchedulerMarshaller(PListStore store) {
PListMarshaller(PListStore store) {
this.store = store;
}
public PList readPayload(DataInput dataIn) throws IOException {
@ -168,8 +176,8 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
return result;
}
public void writePayload(PList js, DataOutput dataOut) throws IOException {
js.write(dataOut);
public void writePayload(PList list, DataOutput dataOut) throws IOException {
list.write(dataOut);
}
}
@ -207,9 +215,9 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
pl.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
pl.setRootId(tx.allocate().getPageId());
pl.setHeadPageId(tx.allocate().getPageId());
pl.load(tx);
metaData.storedSchedulers.put(tx, name, pl);
metaData.lists.put(tx, name, pl);
}
});
result = pl;
@ -236,8 +244,8 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
if (result) {
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
metaData.storedSchedulers.remove(tx, name);
pl.destroy(tx);
metaData.lists.remove(tx, name);
pl.destroy();
}
});
}
@ -261,6 +269,9 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
this.journal.start();
this.pageFile = new PageFile(directory, "tmpDB");
this.pageFile.setPageSize(getIndexPageSize());
this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
this.pageFile.setPageCacheSize(getIndexCacheSize());
this.pageFile.load();
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
@ -310,7 +321,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
for (PList pl : this.persistentLists.values()) {
pl.unload();
pl.unload(null);
}
if (this.pageFile != null) {
this.pageFile.unload();
@ -351,20 +362,13 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
private void claimCandidates(PListEntry entry, Set<Integer> candidates) {
EntryLocation location = entry.getEntry();
if (location != null) {
candidates.remove(location.getLocation().getDataFileId());
}
}
synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
ByteSequence result = null;
result = this.journal.read(location);
return result;
}
synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
return this.journal.write(payload, sync);
}
@ -440,7 +444,8 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
@Override
public String toString() {
return "PListStore:" + this.directory;
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
return "PListStore:[" + path + " ]";
}
}

View File

@ -16,13 +16,18 @@
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.xbean.XBeanBrokerService;

View File

@ -284,7 +284,10 @@ public abstract class Usage<T extends Usage> implements Service {
@Override
public String toString() {
return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
return "Usage(" + getName() + ") percentUsage=" + percentUsage
+ "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+ ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
+ (parent != null ? ";Parent:" + parent.toString() : "");
}
@SuppressWarnings("unchecked")

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.kahadb.journal.Location;
public class LocationMarshaller implements Marshaller<Location> {
public final static LocationMarshaller INSTANCE = new LocationMarshaller();
public Location readPayload(DataInput dataIn) throws IOException {
Location rc = new Location();
rc.setDataFileId(dataIn.readInt());
rc.setOffset(dataIn.readInt());
return rc;
}
public void writePayload(Location object, DataOutput dataOut) throws IOException {
dataOut.writeInt(object.getDataFileId());
dataOut.writeInt(object.getOffset());
}
public int getFixedSize() {
return 8;
}
public Location deepCopy(Location source) {
return new Location(source);
}
public boolean isDeepCopySupported() {
return true;
}
}

View File

@ -184,6 +184,18 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
DataArrayResponse dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
// ensure we can close a connection with prepared transactions
connection.request(closeConnectionInfo(connectionInfo));
// open again to deliver outcome
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Commit the prepared transactions.
for (int i = 0; i < dar.getData().length; i++) {
connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));

View File

@ -17,23 +17,42 @@
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.plist.PList;
import org.apache.activemq.usage.SystemUsage;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.util.ByteSequence;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class FilePendingMessageCursorTest {
private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
BrokerService brokerService;
FilePendingMessageCursor underTest;
@Before
public void createBrokerWithTempStoreLimit() throws Exception {
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.getTempDataStore().stop();
}
}
private void createBrokerWithTempStoreLimit() throws Exception {
brokerService = new BrokerService();
SystemUsage usage = brokerService.getSystemUsage();
usage.getTempUsage().setLimit(1025*1024*15);
@ -45,7 +64,7 @@ public class FilePendingMessageCursorTest {
@Test
public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
createBrokerWithTempStoreLimit();
SystemUsage usage = brokerService.getSystemUsage();
assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
@ -57,4 +76,60 @@ public class FilePendingMessageCursorTest {
assertFalse("cursor is not full", underTest.isFull());
}
@Test
public void testAddRemoveAddIndexSize() throws Exception {
brokerService = new BrokerService();
SystemUsage usage = brokerService.getSystemUsage();
usage.getMemoryUsage().setLimit(1024*150);
String body = new String(new byte[1024]);
Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
underTest.setSystemUsage(usage);
LOG.info("start");
final PageFile pageFile = underTest.getDiskList().getPageFile();
LOG.info("page count: " +pageFile.getPageCount());
LOG.info("free count: " + pageFile.getFreePageCount());
LOG.info("content size: " +pageFile.getPageContentSize());
final long initialPageCount = pageFile.getPageCount();
final int numMessages = 1000;
for (int j=0; j<10; j++) {
// ensure free pages are reused
for (int i=0; i< numMessages; i++) {
ActiveMQMessage mqMessage = new ActiveMQMessage();
mqMessage.setStringProperty("body", body);
mqMessage.setMessageId(new MessageId("1:2:3:" + i));
mqMessage.setMemoryUsage(usage.getMemoryUsage());
mqMessage.setRegionDestination(destination);
underTest.addMessageLast(new IndirectMessageReference(mqMessage));
}
assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull());
underTest.reset();
long receivedCount = 0;
while(underTest.hasNext()) {
MessageReference ref = underTest.next();
underTest.remove();
assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
}
assertEquals("got all messages back", receivedCount, numMessages);
LOG.info("page count: " +pageFile.getPageCount());
LOG.info("free count: " + pageFile.getFreePageCount());
LOG.info("content size: " + pageFile.getPageContentSize());
}
assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount() );
LOG.info("Destroy");
underTest.destroy();
LOG.info("page count: " + pageFile.getPageCount());
LOG.info("free count: " + pageFile.getFreePageCount());
LOG.info("content size: " + pageFile.getPageContentSize());
assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
@ -80,7 +81,7 @@ public class PListTest {
plist.addFirst(test, bs);
}
assertEquals(plist.size(), COUNT);
int count = plist.size() - 1;
long count = plist.size() - 1;
for (ByteSequence bs : map.values()) {
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
PListEntry entry = plist.get(count);
@ -107,7 +108,7 @@ public class PListTest {
assertEquals(plist.size(), COUNT);
PListEntry entry = plist.getFirst();
while (entry != null) {
plist.remove(entry.copy());
plist.remove(entry.getId());
entry = plist.getFirst();
}
assertEquals(0,plist.size());
@ -133,7 +134,6 @@ public class PListTest {
}
plist.destroy();
assertEquals(0,plist.size());
assertNull("no first entry", plist.getFirst());
}
@Test
@ -292,47 +292,56 @@ public class PListTest {
store.setCleanupInterval(5000);
store.start();
final int iterations = 500;
final int iterations = 5000;
final int numLists = 10;
// prime the store
// create/delete
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.CREATE, iterations).run();
}
LOG.info("delete");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.DELETE, iterations).run();
}
// fill
LOG.info("fill");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
// empty
LOG.info("remove");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.REMOVE, iterations).run();
}
// empty
LOG.info("check empty");
for (int i=0; i<numLists;i++) {
assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
}
LOG.info("delete again");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.DELETE, iterations).run();
}
// fill
LOG.info("fill again");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
// parallel
ExecutorService executor = Executors.newFixedThreadPool(100);
LOG.info("parallel add and remove");
ExecutorService executor = Executors.newFixedThreadPool(numLists*2);
for (int i=0; i<numLists*2; i++) {
executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
}
executor.shutdown();
LOG.info("wait for parallel work to complete");
executor.awaitTermination(60*5, TimeUnit.SECONDS);
assertTrue("no excepitons", exceptions.isEmpty());
assertTrue("no exceptions", exceptions.isEmpty());
}
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE}
@ -373,7 +382,7 @@ public class PListTest {
case REMOVE:
plist = store.getPList("List-" + id);
for (int j = iterations; j > 0; j--) {
for (int j = iterations -1; j >= 0; j--) {
plist.remove(idSeed + "id" + j);
if (j > 0 && j % (iterations / 2) == 0) {
LOG.info("Job-" + id + " Done remove: " + j);
@ -383,9 +392,10 @@ public class PListTest {
case ITERATE:
plist = store.getPList("List-" + id);
PListEntry element = plist.getFirst();
while (element != null) {
element = plist.getNext(element);
Iterator<PListEntry> iterator = plist.iterator();
PListEntry element = null;
while (iterator.hasNext()) {
element = iterator.next();
}
break;
default:

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,7 +36,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
protected PageFile pageFile;
protected long headPageId;
protected long tailPageId;
private long size;
private AtomicLong size = new AtomicLong(0);
protected AtomicBoolean loaded = new AtomicBoolean();
@ -43,9 +44,12 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
private Marshaller<Key> keyMarshaller;
private Marshaller<Value> valueMarshaller;
public ListIndex(PageFile pageFile, long rootPageId) {
public ListIndex() {
}
public ListIndex(PageFile pageFile, long headPageId) {
this.pageFile = pageFile;
this.headPageId = rootPageId;
this.headPageId = headPageId;
}
synchronized public void load(Transaction tx) throws IOException {
@ -61,15 +65,15 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
final Page<ListNode<Key,Value>> p = tx.load(headPageId, null);
if( p.getType() == Page.PAGE_FREE_TYPE ) {
// Need to initialize it..
ListNode<Key, Value> root = createNode(p, null);
ListNode<Key, Value> root = createNode(p);
storeNode(tx, root, true);
tailPageId = headPageId;
tailPageId = headPageId = p.getPageId();
} else {
ListNode<Key, Value> node = loadNode(tx, headPageId, null);
size += node.size(tx);
ListNode<Key, Value> node = loadNode(tx, headPageId);
size.addAndGet(node.size(tx));
while (node.getNext() != -1) {
node = loadNode(tx, node.getNext(), node);
size += node.size(tx);
node = loadNode(tx, node.getNext());
size.addAndGet(node.size(tx));
tailPageId = node.getPageId();
}
}
@ -82,11 +86,11 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
}
protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
return loadNode(tx, headPageId, null);
return loadNode(tx, headPageId);
}
protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
return loadNode(tx, tailPageId, null);
return loadNode(tx, tailPageId);
}
synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
@ -122,14 +126,14 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
assertLoaded();
getTail(tx).put(tx, key, value);
size ++;
size.incrementAndGet();
return null;
}
synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
assertLoaded();
getHead(tx).addFirst(tx, key, value);
size++;
size.incrementAndGet();
return null;
}
@ -146,7 +150,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
}
public void onRemove() {
size--;
size.decrementAndGet();
}
public boolean isTransient() {
@ -157,8 +161,10 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
ListNode<Key,Value>candidate = iterator.next();
candidate.clear(tx);
// break up the transaction
tx.commit();
}
size = 0;
size.set(0);
}
synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
@ -173,7 +179,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
return getHead(tx).iterator(tx);
}
synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, int initialPosition) throws IOException {
synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
return getHead(tx).iterator(tx, initialPosition);
}
@ -191,29 +197,24 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
}
}
ListNode<Key,Value> loadNode(Transaction tx, long pageId, ListNode<Key,Value> parent) throws IOException {
ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
ListNode<Key, Value> node = page.get();
node.setPage(page);
node.setParent(parent);
return node;
}
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> p, ListNode<Key,Value> parent) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(this);
node.setPage(p);
node.setParent(parent);
node.setEmpty();
p.set(node);
return node;
}
ListNode<Key,Value> createNode(Transaction tx, ListNode<Key,Value> parent) throws IOException {
Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), marshaller);
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(this);
node.setPage(page);
page.set(node);
return node;
}
ListNode<Key,Value> createNode(Transaction tx) throws IOException {
Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), null);
ListNode<Key,Value> node = new ListNode<Key,Value>(this);
node.setPage(page);
node.setParent(parent);
node.setEmpty();
page.set(node);
return node;
}
@ -225,6 +226,11 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
public PageFile getPageFile() {
return pageFile;
}
public void setPageFile(PageFile pageFile) {
this.pageFile = pageFile;
}
public long getHeadPageId() {
return headPageId;
}
@ -252,6 +258,6 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
}
public long size() {
return size;
return size.get();
}
}

View File

@ -33,18 +33,20 @@ import org.apache.kahadb.util.VariableMarshaller;
* one overflowing Page of a PageFile.
*/
public final class ListNode<Key,Value> {
private final static boolean ADD_FIRST = true;
private final static boolean ADD_LAST = false;
private final static long NOT_SET = -1;
// The index that this node is part of.
private final ListIndex<Key,Value> index;
// The parent node or null if this is the root node of the List
private ListNode<Key,Value> parent;
// The page associated with this node
private Page<ListNode<Key,Value>> page;
protected LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>();
// The next page after this one.
private long next = -1;
private long next = NOT_SET;
public int size(Transaction tx) {
return entries.size();
@ -95,9 +97,9 @@ public final class ListNode<Key,Value> {
public ListNode<Key,Value> next() {
ListNode<Key,Value> current = nextEntry;
if( nextEntry !=null ) {
if (nextEntry.next != -1) {
if (nextEntry.next != NOT_SET) {
try {
nextEntry = index.loadNode(tx, current.next, current);
nextEntry = index.loadNode(tx, current.next);
} catch (IOException unexpected) {
IllegalStateException e = new IllegalStateException("failed to load next: " + current.next + ", reason: " + unexpected.getLocalizedMessage());
e.initCause(unexpected);
@ -118,16 +120,16 @@ public final class ListNode<Key,Value> {
private final class ListIterator implements Iterator<Entry<Key, Value>> {
private final Transaction tx;
ListNode<Key,Value> current;
ListNode<Key,Value> current, prev;
KeyValueEntry<Key, Value> nextEntry;
KeyValueEntry<Key, Value> toRemove;
private ListIterator(Transaction tx, ListNode<Key,Value> current, int nextIndex) throws IOException {
private ListIterator(Transaction tx, ListNode<Key,Value> current, long nextIndex) throws IOException {
this.tx = tx;
this.current = current;
nextEntry = current.entries.getHead();
if (nextIndex > 0) {
for (int i=0; i<nextIndex; i++) {
if (nextIndex > 0 && nextEntry != null) {
for (long i=0; i<nextIndex; i++) {
nextEntry = nextEntry.getNext();
if (nextEntry == null) {
if (!nextFromNextListNode())
@ -139,9 +141,10 @@ public final class ListNode<Key,Value> {
private boolean nextFromNextListNode() {
boolean haveNext = false;
if (current.getNext() != -1) {
if (current.getNext() != NOT_SET) {
try {
current = index.loadNode(tx, current.getNext(), current);
prev = current;
current = index.loadNode(tx, current.getNext());
} catch (IOException unexpected) {
NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
e.initCause(unexpected);
@ -172,7 +175,7 @@ public final class ListNode<Key,Value> {
throw new IllegalStateException("can only remove once, call next again");
}
try {
doRemove(tx, current, toRemove);
doRemove(tx, current, prev, toRemove);
index.onRemove();
toRemove = null;
} catch (IOException unexpected) {
@ -197,7 +200,7 @@ public final class ListNode<Key,Value> {
}
public void writePayload(ListNode<Key,Value> node, DataOutput os) throws IOException {
// Write the keys
os.writeLong(node.next);
short count = (short)node.entries.size(); // cast may truncate value...
if( count != node.entries.size() ) {
throw new IOException("short over flow, too many entries in list: " + node.entries.size());
@ -215,6 +218,7 @@ public final class ListNode<Key,Value> {
@SuppressWarnings("unchecked")
public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(index);
node.next = is.readLong();
final short size = is.readShort();
for (short i = 0; i < size; i++) {
node.entries.addLast(
@ -229,40 +233,26 @@ public final class ListNode<Key,Value> {
this.index = index;
}
public void setEmpty() {
}
public Value remove(Transaction tx, Key key) throws IOException {
Value result = null;
KeyValueEntry<Key, Value> entry = entries.getHead();
while (entry != null) {
if (entry.getKey().equals(key)) {
result = entry.getValue();
doRemove(tx, this, entry);
break;
}
entry = entry.getNext();
}
return result;
}
private void doRemove(Transaction tx, ListNode current, KeyValueEntry<Key, Value> entry) throws IOException {
private void doRemove(final Transaction tx, final ListNode current, final ListNode prev, KeyValueEntry<Key, Value> entry) throws IOException {
entry.unlink();
if (current.entries.isEmpty()) {
if (current.getPageId() == index.getHeadPageId()) {
if (current.getNext() != -1) {
if (current.getNext() != NOT_SET) {
// new head
index.setHeadPageId(current.getNext());
tx.free(current.getPageId());
} else {
// store current in empty state
store(tx);
}
} else {
// need to unlink the node
current.parent.setNext(current.getNext());
prev.setNext(current.next);
index.storeNode(tx, prev, false);
tx.free(current.getPageId());
index.storeNode(tx, current.parent, false);
}
} else {
store(tx, true);
store(tx);
}
}
@ -271,7 +261,7 @@ public final class ListNode<Key,Value> {
throw new IllegalArgumentException("Key cannot be null");
}
entries.addLast(new KeyValueEntry(key, value));
store(tx, false);
store(tx, ADD_LAST);
return null;
}
@ -280,29 +270,30 @@ public final class ListNode<Key,Value> {
throw new IllegalArgumentException("Key cannot be null");
}
entries.addFirst(new KeyValueEntry(key, value));
store(tx, true);
store(tx, ADD_FIRST);
return null;
}
private void store(Transaction tx, boolean addFirst) throws IOException {
try {
index.storeNode(tx, this, allowOverflow());
index.storeNode(tx, this, false);
} catch ( Transaction.PageOverflowIOException e ) {
// If we get an overflow
split(tx, addFirst);
}
}
private boolean allowOverflow() {
return false;
private void store(Transaction tx) throws IOException {
index.storeNode(tx, this, false);
}
private void split(Transaction tx, boolean isAddFirst) throws IOException {
ListNode<Key, Value> extension = index.createNode(tx, this);
ListNode<Key, Value> extension = index.createNode(tx);
if (isAddFirst) {
extension.setEntries(entries.getHead().splitAfter());
// head keeps the first entry, insert extension with the rest
extension.setNext(this.getNext());
this.setNext(extension.getPageId());
extension.setEntries(entries.getHead().splitAfter());
} else {
index.setTailPageId(extension.getPageId());
this.setNext(extension.getPageId());
@ -345,7 +336,7 @@ public final class ListNode<Key,Value> {
return entries.getTail();
}
public Iterator<Entry<Key,Value>> iterator(final Transaction tx, int pos) throws IOException {
public Iterator<Entry<Key,Value>> iterator(final Transaction tx, long pos) throws IOException {
return new ListIterator(tx, this, pos);
}
@ -386,14 +377,6 @@ public final class ListNode<Key,Value> {
return page.getPageId();
}
public ListNode<Key, Value> getParent() {
return parent;
}
public void setParent(ListNode<Key, Value> parent) {
this.parent = parent;
}
public Page<ListNode<Key, Value>> getPage() {
return page;
}
@ -412,7 +395,7 @@ public final class ListNode<Key,Value> {
@Override
public String toString() {
return "[ListNode "+ entries.toString() + "]";
return "[ListNode(" + page.getPageId() + "->" + next + ") " + entries.toString() + "]";
}
}

View File

@ -64,6 +64,7 @@ public class PageFile {
// 4k Default page size.
public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4));
public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", ""+100));;
private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
private static final int PAGE_FILE_HEADER_SIZE=1024*4;
@ -103,8 +104,8 @@ public class PageFile {
// The cache of recently used pages.
private boolean enablePageCaching=true;
// How many pages will we keep in the cache?
private int pageCacheSize = 100;
private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
// Should first log the page write to the recovery buffer? Avoids partial
// page write failures..
private boolean enableRecoveryFile=true;
@ -129,7 +130,7 @@ public class PageFile {
// Persistent settings stored in the page file.
private MetaData metaData;
/**
* Use to keep track of updated pages which have not yet been committed.
*/
@ -682,7 +683,7 @@ public class PageFile {
public long getFreePageCount() {
assertLoaded();
return freeList.size();
return freeList.rangeSize();
}
public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {

View File

@ -38,7 +38,7 @@ import org.apache.kahadb.util.SequenceSet;
* do multiple update operations in a single unit of work.
*/
public class Transaction implements Iterable<Page> {
/**
* The PageOverflowIOException occurs when a page write is requested
* and it's data is larger than what would fit into a single page.
@ -142,7 +142,7 @@ public class Transaction implements Iterable<Page> {
/**
* Frees up a previously allocated page so that it can be re-allocated again.
*
* @param page the page to free up
* @param pageId the page to free up
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
@ -155,7 +155,7 @@ public class Transaction implements Iterable<Page> {
/**
* Frees up a previously allocated sequence of pages so that it can be re-allocated again.
*
* @param page the initial page of the sequence that will be getting freed
* @param pageId the initial page of the sequence that will be getting freed
* @param count the number of pages in the sequence
*
* @throws IOException
@ -216,6 +216,8 @@ public class Transaction implements Iterable<Page> {
}
page.makeFree(getWriteTransactionId());
// ensure free page is visible while write is pending
pageFile.addToCache(page.copy());
DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
page.write(out);
@ -451,7 +453,7 @@ public class Transaction implements Iterable<Page> {
}
if (page.getType() == Page.PAGE_FREE_TYPE) {
throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
}
return page;
@ -560,7 +562,6 @@ public class Transaction implements Iterable<Page> {
* iterated.
*
* @param includeFreePages - if true, free pages are included in the iteration
* @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
* @throws IllegalStateException
* if the PageFile is not loaded
*/

View File

@ -280,4 +280,14 @@ public class SequenceSet extends LinkedNodeList<Sequence> {
return false;
}
public long rangeSize() {
long result = 0;
Sequence sequence = getHead();
while (sequence != null) {
result += sequence.range();
sequence = sequence.getNext();
}
return result;
}
}

View File

@ -228,39 +228,9 @@ public class ListIndexTest extends IndexTestSupport {
tx.commit();
}
public void testVisitor() throws Exception {
createPageFileAndIndex(100);
ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
this.index.load(tx);
tx.commit();
// Insert in reverse order..
doInsert(1000);
this.index.unload(tx);
tx.commit();
this.index.load(tx);
tx.commit();
// BTree should iterate it in sorted order.
/*index.visit(tx, new BTreeVisitor<String, Long>(){
public boolean isInterestedInKeysBetween(String first, String second) {
return true;
}
public void visit(List<String> keys, List<Long> values) {
}
});*/
this.index.unload(tx);
tx.commit();
}
public void testRandomRemove() throws Exception {
createPageFileAndIndex(100);
createPageFileAndIndex(4*1024);
ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
this.index.load(tx);
tx.commit();
@ -295,21 +265,34 @@ public class ListIndexTest extends IndexTestSupport {
index.remove(tx, key(1566));
}
public void testLargeAppendTimed() throws Exception {
createPageFileAndIndex(100);
public void testLargeAppendRemoveTimed() throws Exception {
createPageFileAndIndex(1024*4);
ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
this.index.load(tx);
tx.commit();
final int COUNT = 50000;
long start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
//String test = new String("test" + i);
//ByteSequence bs = new ByteSequence(test.getBytes());
listIndex.put(tx, key(i), (long) i);
tx.commit();
}
LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
start = System.currentTimeMillis();
tx = pf.tx();
int removeCount = 0;
Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
while (iterator.hasNext()) {
iterator.next();
iterator.remove();
removeCount++;
}
tx.commit();
assertEquals("Removed all", COUNT, removeCount);
LOG.info("Time to remove " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
}
void doInsertReverse(int count) throws Exception {