additional tests and refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985, not updating subscriptionAck with an unmatched message resolves recovery of unmatched selector durables, reveting the previous change

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1026543 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-10-23 01:36:13 +00:00
parent 514ef7de46
commit 6ddbba4850
3 changed files with 93 additions and 23 deletions

View File

@ -631,19 +631,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
} else {
doAcknowledge(context, subscriptionKey, messageId);
doAcknowledge(context, subscriptionKey, messageId, ack);
}
} else {
doAcknowledge(context, subscriptionKey, messageId);
doAcknowledge(context, subscriptionKey, messageId, ack);
}
}
protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId)
protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
}
store(command, false, null, null);
}
@ -737,7 +740,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
selectorExpression = SelectorParser.parse(selector);
}
sd.orderIndex.resetCursorPosition();
sd.orderIndex.setBatch(tx, (selectorExpression == null ? cursorPos : -1));
sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@ -766,14 +769,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : -1));
sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@ -790,7 +792,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@ -800,7 +801,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : -1));
sd.orderIndex.setBatch(tx, pos);
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
@ -1228,7 +1229,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// apply any acks we have
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
}
}

View File

@ -88,6 +88,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
protected static final Buffer UNMATCHED = new Buffer(new byte[]{});
private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@ -1037,11 +1038,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
String subscriptionKey = command.getSubscriptionKey();
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
if (command.getAck() == UNMATCHED) {
sd.subscriptionAcks.put(tx, subscriptionKey, prev);
}
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, prev);
// Add it to the new location set.
addAckLocation(sd, sequence, subscriptionKey);
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, sequence);
}
}
@ -1506,16 +1510,24 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (hs != null) {
hs.remove(subscriptionKey);
if (hs.isEmpty()) {
HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
sd.ackPositions.remove(sequenceId);
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
// Did we just empty out the first set in the
// ordered list of ack locations? Then it's time to
// delete some messages.
if (hs == firstSet) {
// Do the actual delete.
for (Entry<Long, MessageKeys> entry : deletes) {
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
// Do the actual deletes.
for (Entry<Long, MessageKeys> entry : deletes) {
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx,entry.getValue().messageId);
sd.orderIndex.remove(tx,entry.getKey());
}
}
}
}

View File

@ -43,8 +43,12 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
@Override
protected Connection createConnection() throws Exception {
return createConnection("cliName");
}
protected Connection createConnection(String name) throws Exception {
Connection con = super.createConnection();
con.setClientID("cliName");
con.setClientID(name);
con.start();
return con;
}
@ -190,9 +194,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
final int numMessages = 10;
int sent = 0;
for (int i = 0; i < numMessages; i++) {
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
@ -217,7 +220,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con.close();
LOG.info("Consumed: " + listener.count);
assertEquals(numMessages, listener.count);
assertEquals(sent, listener.count);
// consume messages again, should not get any
con = createConnection();
@ -233,6 +236,60 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(0, listener.count);
}
public void testOfflineSubscription4() throws Exception {
// create durable subscription 1
Connection con = createConnection("cliId1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
// create durable subscription 2
Connection con2 = createConnection("cliId2");
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener2 = new Listener();
consumer2.setMessageListener(listener2);
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int sent = 0;
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
}
Thread.sleep(1 * 1000);
session.close();
con.close();
// test online subs
Thread.sleep(3 * 1000);
session2.close();
con2.close();
assertEquals(sent, listener2.count);
// consume messages
con = createConnection("cliId1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
con.close();
assertEquals("offline consumer got all", sent, listener.count);
}
public static class Listener implements MessageListener {
int count = 0;