fix regression of https://issues.apache.org/activemq/browse/AMQ-2870 from refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985 - unmatched acks are now stored as negative sequence ids so that we can differenciate between a durables that have acked messages after receipt and those that have acked because of unmatched. Unmatched messages are now deleted in the normal way

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1027644 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-10-26 16:52:08 +00:00
parent 3b51a692be
commit 6fd292d885
3 changed files with 43 additions and 15 deletions

View File

@ -718,6 +718,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
// an ack for an unmatched message is stored as a negative sequence id
// if sub has been getting unmatched acks, we need to reset
protected Long resetForSelectors(SubscriptionInfo info, Long position) {
if (info.getSelector() != null) {
if (position < NOT_ACKED) {
position = NOT_ACKED;
}
}
return position;
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
@ -731,6 +742,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// The subscription might not exist.
return 0;
}
cursorPos = resetForSelectors(info, cursorPos);
int counter = 0;
try {
@ -740,7 +752,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
selectorExpression = SelectorParser.parse(selector);
}
sd.orderIndex.resetCursorPosition();
sd.orderIndex.setBatch(tx, cursorPos);
sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@ -769,13 +781,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
cursorPos = resetForSelectors(info, cursorPos);
sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@ -792,6 +806,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@ -800,8 +815,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, pos);
Long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
pos = resetForSelectors(info, pos);
sd.orderIndex.setBatch(tx, extractSequenceId(pos));
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);

View File

@ -98,6 +98,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
static final long UNMATCHED_SEQ = -2;
static final int VERSION = 2;
@ -1018,6 +1020,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
}
protected Long extractSequenceId(Long prev) {
if (prev < NOT_ACKED) {
prev = Math.abs(prev) + UNMATCHED_SEQ;
}
return prev;
}
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
if (!command.hasSubscriptionKey()) {
@ -1039,13 +1048,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// Make sure it's a valid message id...
if (sequence != null) {
String subscriptionKey = command.getSubscriptionKey();
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
Long ackSequenceToStore = sequence;
if (command.getAck() == UNMATCHED) {
sd.subscriptionAcks.put(tx, subscriptionKey, prev);
// store negative sequence to indicate that it was unmatched
ackSequenceToStore = new Long(UNMATCHED_SEQ - sequence);
}
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, prev);
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
// Add it to the new location set.
addAckLocation(sd, sequence, subscriptionKey);
@ -1116,7 +1128,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
sd.subscriptions.remove(tx, subscriptionKey);
Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
if( prev!=null ) {
removeAckLocation(tx, sd, subscriptionKey, prev);
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
}
}
@ -1468,7 +1480,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
Entry<String, Long> entry = iterator.next();
addAckLocation(rc, entry.getValue(), entry.getKey());
addAckLocation(rc, extractSequenceId(entry.getValue()), entry.getKey());
}
if (rc.orderIndex.nextMessageId == 0) {

View File

@ -91,14 +91,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
broker.stop();
}
public void initCombosForTestOfflineSubscription() throws Exception {
public void x_initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
this.addCombinationValues("usePrioritySupport",
new Object[]{ Boolean.TRUE, Boolean.FALSE});
}
public void testOfflineSubscription() throws Exception {
public void testConsumeOnlyMatchedMessages() throws Exception {
// create durable subscription
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -140,7 +140,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count);
}
public void testOfflineSubscription2() throws Exception {
public void testConsumeAllMatchedMessages() throws Exception {
// create durable subscription
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -181,7 +181,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count);
}
public void testOfflineSubscription3() throws Exception {
public void testVerifyAllConsumedAreAcked() throws Exception {
// create durable subscription
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -237,7 +237,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(0, listener.count);
}
public void testOfflineSubscription4() throws Exception {
public void testTwoOfflineSubscriptionCanConsume() throws Exception {
// create durable subscription 1
Connection con = createConnection("cliId1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);