git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1171743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-09-16 19:31:30 +00:00
parent 4a697f852a
commit 943db3c3cb
2 changed files with 239 additions and 87 deletions

View File

@ -549,11 +549,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} finally { } finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
} }
} finally { } finally {
unlockAsyncJobQueue(); unlockAsyncJobQueue();
} }
} }
@Override @Override
@ -618,7 +616,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) MessageId messageId, MessageAck ack)
throws IOException { throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName); String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
if (isConcurrentStoreAndDispatchTopics()) { if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null; StoreTopicTask task = null;
@ -660,7 +658,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
.getSubscriptionName()); .getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand(); KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest); command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey); command.setSubscriptionKey(subscriptionKey.toString());
command.setRetroactive(retroactive); command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
@ -671,7 +669,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void deleteSubscription(String clientId, String subscriptionName) throws IOException { public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand(); KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest); command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
store(command, isEnableJournalDiskSyncs() && true, null, null); store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet(); this.subscriptionCount.decrementAndGet();
} }
@ -730,21 +728,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException { public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
if (cursorPos == null) { if (cursorPos == null) {
// The subscription might not exist. // The subscription might not exist.
return 0; return 0;
} }
int counter = 0; return (int) getStoredMessageCount(tx, sd, subscriptionKey);
for (Iterator<Entry<Long, HashSet<String>>> iterator =
sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().contains(subscriptionKey)) {
counter++;
}
}
return counter;
} }
}); });
}finally { }finally {
@ -755,13 +745,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception { throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
@SuppressWarnings("unused")
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try { try {
pageFile.tx().execute(new Transaction.Closure<Exception>() { pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception { public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos); sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) { .hasNext();) {
@ -779,6 +770,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception { final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
@SuppressWarnings("unused")
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try { try {
@ -788,7 +780,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.resetCursorPosition(); sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) { if (moc == null) {
LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey); LastAck pos = getLastAck(tx, sd, subscriptionKey);
if (pos == null) { if (pos == null) {
// sub deleted // sub deleted
return; return;

View File

@ -27,8 +27,23 @@ import java.io.InputStream;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.Stack;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -56,11 +71,9 @@ import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.kahadb.util.LocationMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.index.BTreeIndex; import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor; import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.index.ListIndex;
import org.apache.kahadb.journal.DataFile; import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location; import org.apache.kahadb.journal.Location;
@ -70,6 +83,7 @@ import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream; import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream; import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LocationMarshaller;
import org.apache.kahadb.util.LockFile; import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.LongMarshaller; import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller; import org.apache.kahadb.util.Marshaller;
@ -77,6 +91,8 @@ import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet; import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller; import org.apache.kahadb.util.VariableMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
@ -97,7 +113,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
static final long NOT_ACKED = -1; static final long NOT_ACKED = -1;
static final long UNMATCHED_SEQ = -2; static final long UNMATCHED_SEQ = -2;
static final int VERSION = 3; static final int VERSION = 4;
protected class Metadata { protected class Metadata {
protected Page<Metadata> page; protected Page<Metadata> page;
@ -513,6 +529,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
} }
@SuppressWarnings("unused")
private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
return TransactionIdConversion.convertToLocal(tx); return TransactionIdConversion.convertToLocal(tx);
} }
@ -1185,7 +1202,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
sd.ackPositions.clear(tx); sd.ackPositions.clear(tx);
sd.ackPositions.unload(tx); sd.ackPositions.unload(tx);
tx.free(sd.ackPositions.getPageId()); tx.free(sd.ackPositions.getHeadPageId());
} }
String key = key(command.getDestination()); String key = key(command.getDestination());
@ -1207,10 +1224,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey); addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
} }
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
sd.subscriptionCache.add(subscriptionKey);
} else { } else {
// delete the sub... // delete the sub...
sd.subscriptions.remove(tx, subscriptionKey); sd.subscriptions.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey);
} }
} }
@ -1468,7 +1487,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
BTreeIndex<String, KahaSubscriptionCommand> subscriptions; BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
BTreeIndex<String, LastAck> subscriptionAcks; BTreeIndex<String, LastAck> subscriptionAcks;
HashMap<String, MessageOrderCursor> subscriptionCursors; HashMap<String, MessageOrderCursor> subscriptionCursors;
BTreeIndex<Long, HashSet<String>> ackPositions; ListIndex<String, SequenceSet> ackPositions;
// Transient data used to track which Messages are no longer needed.
final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
} }
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
@ -1483,15 +1506,43 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
if (metadata.version >= 3) { if (metadata.version >= 3) {
value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
} else { } else {
// upgrade // upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() { pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate()); BTreeIndex<Long, HashSet<String>> oldAckPositions =
value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
value.ackPositions.load(tx); oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
oldAckPositions.load(tx);
LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
// Do the initial build of the data in memory before writing into the store
// based Ack Positions List to avoid a lot of disk thrashing.
Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
while (iterator.hasNext()) {
Entry<Long, HashSet<String>> entry = iterator.next();
for(String subKey : entry.getValue()) {
SequenceSet pendingAcks = temp.get(subKey);
if (pendingAcks == null) {
pendingAcks = new SequenceSet();
temp.put(subKey, pendingAcks);
}
pendingAcks.add(entry.getKey());
}
}
// Now move the pending messages to ack data into the store backed
// structure.
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
for(String subscriptionKey : temp.keySet()) {
value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
}
} }
}); });
} }
@ -1527,7 +1578,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
dataOut.writeBoolean(true); dataOut.writeBoolean(true);
dataOut.writeLong(value.subscriptions.getPageId()); dataOut.writeLong(value.subscriptions.getPageId());
dataOut.writeLong(value.subscriptionAcks.getPageId()); dataOut.writeLong(value.subscriptionAcks.getPageId());
dataOut.writeLong(value.ackPositions.getPageId()); dataOut.writeLong(value.ackPositions.getHeadPageId());
} else { } else {
dataOut.writeBoolean(false); dataOut.writeBoolean(false);
} }
@ -1594,7 +1645,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (topic) { if (topic) {
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate()); rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
} }
metadata.destinations.put(tx, key, rc); metadata.destinations.put(tx, key, rc);
} }
@ -1624,8 +1675,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
rc.subscriptionAcks.load(tx); rc.subscriptionAcks.load(tx);
rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
rc.ackPositions.load(tx); rc.ackPositions.load(tx);
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
@ -1645,6 +1696,27 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
} }
// Configure the message references index
Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
while (subscriptions.hasNext()) {
Entry<String, SequenceSet> subscription = subscriptions.next();
if (subscription.getValue() != null) {
for(Long sequenceId : subscription.getValue()) {
Long current = rc.messageReferences.get(sequenceId);
if (current == null) {
current = new Long(0);
}
rc.messageReferences.put(sequenceId, Long.valueOf(current.longValue() + 1));
}
}
}
// Configure the subscription cache
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
Entry<String, LastAck> entry = iterator.next();
rc.subscriptionCache.add(entry.getKey());
}
if (rc.orderIndex.nextMessageId == 0) { if (rc.orderIndex.nextMessageId == 0) {
// check for existing durable sub all acked out - pull next seq from acks as messages are gone // check for existing durable sub all acked out - pull next seq from acks as messages are gone
if (!rc.subscriptionAcks.isEmpty(tx)) { if (!rc.subscriptionAcks.isEmpty(tx)) {
@ -1656,16 +1728,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
} else { } else {
// update based on ackPositions for unmatched, last entry is always the next // update based on ackPositions for unmatched, last entry is always the next
if (!rc.ackPositions.isEmpty(tx)) { if (!rc.messageReferences.isEmpty()) {
Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx); Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
rc.orderIndex.nextMessageId = rc.orderIndex.nextMessageId =
Math.max(rc.orderIndex.nextMessageId, last.getKey()); Math.max(rc.orderIndex.nextMessageId, nextMessageId);
} }
} }
} }
if (metadata.version < 3) { if (metadata.version < VERSION) {
// store again after upgrade // store again after upgrade
metadata.destinations.put(tx, key, rc); metadata.destinations.put(tx, key, rc);
} }
@ -1673,42 +1745,105 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
HashSet<String> hs = sd.ackPositions.get(tx, messageSequence); SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
if (hs == null) { if (sequences == null) {
hs = new HashSet<String>(); sequences = new SequenceSet();
sequences.add(messageSequence);
sd.ackPositions.put(tx, subscriptionKey, sequences);
} else {
sequences.add(messageSequence);
sd.ackPositions.add(tx, subscriptionKey, sequences);
} }
hs.add(subscriptionKey);
// every ack location addition needs to be a btree modification to get it stored Long count = sd.messageReferences.get(messageSequence);
sd.ackPositions.put(tx, messageSequence, hs); if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
} }
// new sub is interested in potentially all existing messages // new sub is interested in potentially all existing messages
private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) { SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
Entry<Long, HashSet<String>> entry = iterator.next(); if (sequences == null) {
entry.getValue().add(subscriptionKey); sequences = new SequenceSet();
sd.ackPositions.put(tx, entry.getKey(), entry.getValue()); sequences.add(messageSequence);
} sd.ackPositions.put(tx, subscriptionKey, sequences);
} else {
sequences.add(messageSequence);
sd.ackPositions.add(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
} }
final HashSet nextMessageIdMarker = new HashSet<String>();
// on a new message add, all existing subs are interested in this message // on a new message add, all existing subs are interested in this message
private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
HashSet hs = new HashSet<String>(); for(String subscriptionKey : sd.subscriptionCache) {
for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) { SequenceSet sequences = null;
Entry<String, LastAck> entry = iterator.next(); sequences = sd.ackPositions.get(tx, subscriptionKey);
hs.add(entry.getKey()); if (sequences == null) {
sequences = new SequenceSet();
sequences.add(new Sequence(messageSequence, messageSequence + 1));
sd.ackPositions.put(tx, subscriptionKey, sequences);
} else {
sequences.add(new Sequence(messageSequence, messageSequence + 1));
sd.ackPositions.add(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
} }
sd.ackPositions.put(tx, messageSequence, hs);
// add empty next to keep track of nextMessage
sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
} }
private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
if (!sd.ackPositions.isEmpty(tx)) { if (!sd.ackPositions.isEmpty(tx)) {
Long end = sd.ackPositions.getLast(tx).getKey(); SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) { if (sequences == null || sequences.isEmpty()) {
removeAckLocation(tx, sd, subscriptionKey, sequence); return;
}
ArrayList<Long> unreferenced = new ArrayList<Long>();
for(Long sequenceId : sequences) {
long references = 0;
Long count = sd.messageReferences.get(sequenceId);
if (count != null) {
references = count.longValue() - 1;
} else {
continue;
}
if (references > 0) {
sd.messageReferences.put(sequenceId, Long.valueOf(references));
} else {
sd.messageReferences.remove(sequenceId);
unreferenced.add(sequenceId);
}
}
for(Long sequenceId : unreferenced) {
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
// Do the actual deletes.
for (Entry<Long, MessageKeys> entry : deletes) {
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
}
} }
} }
} }
@ -1723,12 +1858,24 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
// Remove the sub from the previous location set.. // Remove the sub from the previous location set..
if (sequenceId != null) { if (sequenceId != null) {
HashSet<String> hs = sd.ackPositions.get(tx, sequenceId); SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
if (hs != null) { if (range != null && !range.isEmpty()) {
hs.remove(subscriptionKey); range.remove(sequenceId);
if (hs.isEmpty()) { if (!range.isEmpty()) {
HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue(); sd.ackPositions.put(tx, subscriptionKey, range);
sd.ackPositions.remove(tx, sequenceId); } else {
sd.ackPositions.remove(tx, subscriptionKey);
}
// Check if the message is reference by any other subscription.
Long count = sd.messageReferences.get(sequenceId);
long references = count.longValue() - 1;
if (references > 0) {
sd.messageReferences.put(sequenceId, Long.valueOf(references));
return;
} else {
sd.messageReferences.remove(sequenceId);
}
// Find all the entries that need to get deleted. // Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
@ -1740,12 +1887,23 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey()); sd.orderIndex.remove(tx, entry.getKey());
} }
} else {
// update
sd.ackPositions.put(tx, sequenceId, hs);
} }
} }
} }
public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
return sd.subscriptionAcks.get(tx, subscriptionKey);
}
public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
if (messageSequences != null) {
long result = messageSequences.rangeSize();
// if there's anything in the range the last value is always the nextMessage marker, so remove 1.
return result > 0 ? result - 1 : 0;
}
return 0;
} }
private String key(KahaDestination destination) { private String key(KahaDestination destination) {
@ -1799,6 +1957,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
return tx; return tx;
} }
@SuppressWarnings("unused")
private TransactionId key(KahaTransactionInfo transactionInfo) { private TransactionId key(KahaTransactionInfo transactionInfo) {
return TransactionIdConversion.convert(transactionInfo); return TransactionIdConversion.convert(transactionInfo);
} }
@ -2452,6 +2611,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
dataOut.write(data); dataOut.write(data);
} }
@SuppressWarnings("unchecked")
public HashSet<String> readPayload(DataInput dataIn) throws IOException { public HashSet<String> readPayload(DataInput dataIn) throws IOException {
int dataLen = dataIn.readInt(); int dataLen = dataIn.readInt();
byte[] data = new byte[dataLen]; byte[] data = new byte[dataLen];