From 943db3c3cb12b4c4504b4966135cf9a0cc69f0ba Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 16 Sep 2011 19:31:30 +0000 Subject: [PATCH] fixes for https://issues.apache.org/jira/browse/AMQ-3467 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1171743 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/kahadb/KahaDBStore.java | 46 ++- .../store/kahadb/MessageDatabase.java | 280 ++++++++++++++---- 2 files changed, 239 insertions(+), 87 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index b03dc8e23a..b8742557c1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -157,7 +157,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public boolean isConcurrentStoreAndDispatchTransactions() { return this.concurrentStoreAndDispatchTransactions; } - + /** * @return the maxAsyncJobs */ @@ -361,7 +361,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); - + } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { @@ -479,7 +479,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } - + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { indexLock.readLock().lock(); try { @@ -534,10 +534,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // Hopefully one day the page file supports concurrent read // operations... but for now we must // externally synchronize... - + indexLock.writeLock().lock(); try { - pageFile.tx().execute(new Transaction.Closure() { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long location = sd.messageIdIndex.get(tx, key); @@ -546,14 +546,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } }); - }finally { + } finally { indexLock.writeLock().unlock(); } - } finally { unlockAsyncJobQueue(); } - } @Override @@ -618,7 +616,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { - String subscriptionKey = subscriptionKey(clientId, subscriptionName); + String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); if (isConcurrentStoreAndDispatchTopics()) { AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); StoreTopicTask task = null; @@ -660,7 +658,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { .getSubscriptionName()); KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey); + command.setSubscriptionKey(subscriptionKey.toString()); command.setRetroactive(retroactive); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); @@ -671,7 +669,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void deleteSubscription(String clientId, String subscriptionName) throws IOException { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); + command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); store(command, isEnableJournalDiskSyncs() && true, null, null); this.subscriptionCount.decrementAndGet(); } @@ -730,21 +728,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return pageFile.tx().execute(new Transaction.CallableClosure() { public Integer execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); if (cursorPos == null) { // The subscription might not exist. return 0; } - int counter = 0; - for (Iterator>> iterator = - sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) { - Entry> entry = iterator.next(); - if (entry.getValue().contains(subscriptionKey)) { - counter++; - } - } - return counter; + return (int) getStoredMessageCount(tx, sd, subscriptionKey); } }); }finally { @@ -755,13 +745,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + @SuppressWarnings("unused") final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); sd.orderIndex.setBatch(tx, cursorPos); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { @@ -779,6 +770,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + @SuppressWarnings("unused") final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); indexLock.writeLock().lock(); try { @@ -788,7 +780,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.resetCursorPosition(); MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); if (moc == null) { - LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey); + LastAck pos = getLastAck(tx, sd, subscriptionKey); if (pos == null) { // sub deleted return; @@ -858,7 +850,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * Cleanup method to remove any state associated with the given destination. * This method does not stop the message store (it might not be cached). - * + * * @param destination * Destination to forget */ @@ -868,7 +860,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * Cleanup method to remove any state associated with the given destination * This method does not stop the message store (it might not be cached). - * + * * @param destination * Destination to forget */ @@ -920,7 +912,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public long getLastMessageBrokerSequenceId() throws IOException { return 0; } - + public long getLastProducerSequenceId(ProducerId id) { indexLock.readLock().lock(); try { @@ -1184,7 +1176,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * add a key - * + * * @param key * @return true if all acknowledgements received */ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 7a7705e611..082631e81b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -27,8 +27,23 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -56,11 +71,9 @@ import org.apache.activemq.util.Callback; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; -import org.apache.kahadb.util.LocationMarshaller; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.kahadb.index.BTreeIndex; import org.apache.kahadb.index.BTreeVisitor; +import org.apache.kahadb.index.ListIndex; import org.apache.kahadb.journal.DataFile; import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Location; @@ -70,6 +83,7 @@ import org.apache.kahadb.page.Transaction; import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.DataByteArrayInputStream; import org.apache.kahadb.util.DataByteArrayOutputStream; +import org.apache.kahadb.util.LocationMarshaller; import org.apache.kahadb.util.LockFile; import org.apache.kahadb.util.LongMarshaller; import org.apache.kahadb.util.Marshaller; @@ -77,6 +91,8 @@ import org.apache.kahadb.util.Sequence; import org.apache.kahadb.util.SequenceSet; import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.VariableMarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { @@ -97,7 +113,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar static final long NOT_ACKED = -1; static final long UNMATCHED_SEQ = -2; - static final int VERSION = 3; + static final int VERSION = 4; protected class Metadata { protected Page page; @@ -513,6 +529,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } + @SuppressWarnings("unused") private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { return TransactionIdConversion.convertToLocal(tx); } @@ -1185,7 +1202,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar sd.ackPositions.clear(tx); sd.ackPositions.unload(tx); - tx.free(sd.ackPositions.getPageId()); + tx.free(sd.ackPositions.getHeadPageId()); } String key = key(command.getDestination()); @@ -1207,10 +1224,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey); } sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); + sd.subscriptionCache.add(subscriptionKey); } else { // delete the sub... sd.subscriptions.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey); + sd.subscriptionCache.remove(subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey); } } @@ -1468,7 +1487,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar BTreeIndex subscriptions; BTreeIndex subscriptionAcks; HashMap subscriptionCursors; - BTreeIndex> ackPositions; + ListIndex ackPositions; + + // Transient data used to track which Messages are no longer needed. + final TreeMap messageReferences = new TreeMap(); + final HashSet subscriptionCache = new LinkedHashSet(); } protected class StoredDestinationMarshaller extends VariableMarshaller { @@ -1483,15 +1506,43 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar value.subscriptions = new BTreeIndex(pageFile, dataIn.readLong()); value.subscriptionAcks = new BTreeIndex(pageFile, dataIn.readLong()); if (metadata.version >= 3) { - value.ackPositions = new BTreeIndex>(pageFile, dataIn.readLong()); + value.ackPositions = new ListIndex(pageFile, dataIn.readLong()); } else { // upgrade pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { - value.ackPositions = new BTreeIndex>(pageFile, tx.allocate()); - value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); - value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); - value.ackPositions.load(tx); + BTreeIndex> oldAckPositions = + new BTreeIndex>(pageFile, tx.allocate()); + oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); + oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); + oldAckPositions.load(tx); + + LinkedHashMap temp = new LinkedHashMap(); + + // Do the initial build of the data in memory before writing into the store + // based Ack Positions List to avoid a lot of disk thrashing. + Iterator>> iterator = oldAckPositions.iterator(tx); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + + for(String subKey : entry.getValue()) { + SequenceSet pendingAcks = temp.get(subKey); + if (pendingAcks == null) { + pendingAcks = new SequenceSet(); + temp.put(subKey, pendingAcks); + } + + pendingAcks.add(entry.getKey()); + } + } + + // Now move the pending messages to ack data into the store backed + // structure. + value.ackPositions = new ListIndex(pageFile, tx.allocate()); + for(String subscriptionKey : temp.keySet()) { + value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); + } + } }); } @@ -1527,7 +1578,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar dataOut.writeBoolean(true); dataOut.writeLong(value.subscriptions.getPageId()); dataOut.writeLong(value.subscriptionAcks.getPageId()); - dataOut.writeLong(value.ackPositions.getPageId()); + dataOut.writeLong(value.ackPositions.getHeadPageId()); } else { dataOut.writeBoolean(false); } @@ -1594,7 +1645,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (topic) { rc.subscriptions = new BTreeIndex(pageFile, tx.allocate()); rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); - rc.ackPositions = new BTreeIndex>(pageFile, tx.allocate()); + rc.ackPositions = new ListIndex(pageFile, tx.allocate()); } metadata.destinations.put(tx, key, rc); } @@ -1624,8 +1675,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); rc.subscriptionAcks.load(tx); - rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); - rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); + rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); + rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); rc.ackPositions.load(tx); rc.subscriptionCursors = new HashMap(); @@ -1645,6 +1696,27 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } + // Configure the message references index + Iterator> subscriptions = rc.ackPositions.iterator(tx); + while (subscriptions.hasNext()) { + Entry subscription = subscriptions.next(); + if (subscription.getValue() != null) { + for(Long sequenceId : subscription.getValue()) { + Long current = rc.messageReferences.get(sequenceId); + if (current == null) { + current = new Long(0); + } + rc.messageReferences.put(sequenceId, Long.valueOf(current.longValue() + 1)); + } + } + } + + // Configure the subscription cache + for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { + Entry entry = iterator.next(); + rc.subscriptionCache.add(entry.getKey()); + } + if (rc.orderIndex.nextMessageId == 0) { // check for existing durable sub all acked out - pull next seq from acks as messages are gone if (!rc.subscriptionAcks.isEmpty(tx)) { @@ -1656,16 +1728,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } else { // update based on ackPositions for unmatched, last entry is always the next - if (!rc.ackPositions.isEmpty(tx)) { - Entry> last = rc.ackPositions.getLast(tx); + if (!rc.messageReferences.isEmpty()) { + Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; rc.orderIndex.nextMessageId = - Math.max(rc.orderIndex.nextMessageId, last.getKey()); + Math.max(rc.orderIndex.nextMessageId, nextMessageId); } } } - if (metadata.version < 3) { + if (metadata.version < VERSION) { // store again after upgrade metadata.destinations.put(tx, key, rc); } @@ -1673,42 +1745,105 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { - HashSet hs = sd.ackPositions.get(tx, messageSequence); - if (hs == null) { - hs = new HashSet(); + SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); + if (sequences == null) { + sequences = new SequenceSet(); + sequences.add(messageSequence); + sd.ackPositions.put(tx, subscriptionKey, sequences); + } else { + sequences.add(messageSequence); + sd.ackPositions.add(tx, subscriptionKey, sequences); } - hs.add(subscriptionKey); - // every ack location addition needs to be a btree modification to get it stored - sd.ackPositions.put(tx, messageSequence, hs); + + Long count = sd.messageReferences.get(messageSequence); + if (count == null) { + count = Long.valueOf(0L); + } + count = count.longValue() + 1; + sd.messageReferences.put(messageSequence, count); } // new sub is interested in potentially all existing messages private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { - for (Iterator>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) { - Entry> entry = iterator.next(); - entry.getValue().add(subscriptionKey); - sd.ackPositions.put(tx, entry.getKey(), entry.getValue()); + SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); + if (sequences == null) { + sequences = new SequenceSet(); + sequences.add(messageSequence); + sd.ackPositions.put(tx, subscriptionKey, sequences); + } else { + sequences.add(messageSequence); + sd.ackPositions.add(tx, subscriptionKey, sequences); } + + Long count = sd.messageReferences.get(messageSequence); + if (count == null) { + count = Long.valueOf(0L); + } + count = count.longValue() + 1; + sd.messageReferences.put(messageSequence, count); } - final HashSet nextMessageIdMarker = new HashSet(); // on a new message add, all existing subs are interested in this message private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { - HashSet hs = new HashSet(); - for (Iterator> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - hs.add(entry.getKey()); + for(String subscriptionKey : sd.subscriptionCache) { + SequenceSet sequences = null; + sequences = sd.ackPositions.get(tx, subscriptionKey); + if (sequences == null) { + sequences = new SequenceSet(); + sequences.add(new Sequence(messageSequence, messageSequence + 1)); + sd.ackPositions.put(tx, subscriptionKey, sequences); + } else { + sequences.add(new Sequence(messageSequence, messageSequence + 1)); + sd.ackPositions.add(tx, subscriptionKey, sequences); + } + + Long count = sd.messageReferences.get(messageSequence); + if (count == null) { + count = Long.valueOf(0L); + } + count = count.longValue() + 1; + sd.messageReferences.put(messageSequence, count); + sd.messageReferences.put(messageSequence+1, Long.valueOf(0L)); } - sd.ackPositions.put(tx, messageSequence, hs); - // add empty next to keep track of nextMessage - sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker); } private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { if (!sd.ackPositions.isEmpty(tx)) { - Long end = sd.ackPositions.getLast(tx).getKey(); - for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) { - removeAckLocation(tx, sd, subscriptionKey, sequence); + SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); + if (sequences == null || sequences.isEmpty()) { + return; + } + + ArrayList unreferenced = new ArrayList(); + + for(Long sequenceId : sequences) { + long references = 0; + Long count = sd.messageReferences.get(sequenceId); + if (count != null) { + references = count.longValue() - 1; + } else { + continue; + } + + if (references > 0) { + sd.messageReferences.put(sequenceId, Long.valueOf(references)); + } else { + sd.messageReferences.remove(sequenceId); + unreferenced.add(sequenceId); + } + } + + for(Long sequenceId : unreferenced) { + // Find all the entries that need to get deleted. + ArrayList> deletes = new ArrayList>(); + sd.orderIndex.getDeleteList(tx, deletes, sequenceId); + + // Do the actual deletes. + for (Entry entry : deletes) { + sd.locationIndex.remove(tx, entry.getValue().location); + sd.messageIdIndex.remove(tx, entry.getValue().messageId); + sd.orderIndex.remove(tx, entry.getKey()); + } } } } @@ -1723,31 +1858,54 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { // Remove the sub from the previous location set.. if (sequenceId != null) { - HashSet hs = sd.ackPositions.get(tx, sequenceId); - if (hs != null) { - hs.remove(subscriptionKey); - if (hs.isEmpty()) { - HashSet firstSet = sd.ackPositions.getFirst(tx).getValue(); - sd.ackPositions.remove(tx, sequenceId); - - // Find all the entries that need to get deleted. - ArrayList> deletes = new ArrayList>(); - sd.orderIndex.getDeleteList(tx, deletes, sequenceId); - - // Do the actual deletes. - for (Entry entry : deletes) { - sd.locationIndex.remove(tx, entry.getValue().location); - sd.messageIdIndex.remove(tx, entry.getValue().messageId); - sd.orderIndex.remove(tx, entry.getKey()); - } + SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); + if (range != null && !range.isEmpty()) { + range.remove(sequenceId); + if (!range.isEmpty()) { + sd.ackPositions.put(tx, subscriptionKey, range); } else { - // update - sd.ackPositions.put(tx, sequenceId, hs); + sd.ackPositions.remove(tx, subscriptionKey); + } + + // Check if the message is reference by any other subscription. + Long count = sd.messageReferences.get(sequenceId); + long references = count.longValue() - 1; + if (references > 0) { + sd.messageReferences.put(sequenceId, Long.valueOf(references)); + return; + } else { + sd.messageReferences.remove(sequenceId); + } + + // Find all the entries that need to get deleted. + ArrayList> deletes = new ArrayList>(); + sd.orderIndex.getDeleteList(tx, deletes, sequenceId); + + // Do the actual deletes. + for (Entry entry : deletes) { + sd.locationIndex.remove(tx, entry.getValue().location); + sd.messageIdIndex.remove(tx, entry.getValue().messageId); + sd.orderIndex.remove(tx, entry.getKey()); } } } } + public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + return sd.subscriptionAcks.get(tx, subscriptionKey); + } + + public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); + if (messageSequences != null) { + long result = messageSequences.rangeSize(); + // if there's anything in the range the last value is always the nextMessage marker, so remove 1. + return result > 0 ? result - 1 : 0; + } + + return 0; + } + private String key(KahaDestination destination) { return destination.getType().getNumber() + ":" + destination.getName(); } @@ -1799,6 +1957,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return tx; } + @SuppressWarnings("unused") private TransactionId key(KahaTransactionInfo transactionInfo) { return TransactionIdConversion.convert(transactionInfo); } @@ -2452,6 +2611,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar dataOut.write(data); } + @SuppressWarnings("unchecked") public HashSet readPayload(DataInput dataIn) throws IOException { int dataLen = dataIn.readInt(); byte[] data = new byte[dataLen];