additional tests and refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985, selector scan was off by one and unmatched acking skipped some messages leaving them available to subsequent consumers. Also acking always left the first consumed message available for a reconnected durable

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1026457 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-10-22 19:31:18 +00:00
parent df7cb7727b
commit 514ef7de46
4 changed files with 127 additions and 39 deletions

View File

@ -1655,7 +1655,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+ destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount()); + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
+ ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
} }
if (isLazyDispatch() && !force) { if (isLazyDispatch() && !force) {

View File

@ -737,7 +737,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
selectorExpression = SelectorParser.parse(selector); selectorExpression = SelectorParser.parse(selector);
} }
sd.orderIndex.resetCursorPosition(); sd.orderIndex.resetCursorPosition();
sd.orderIndex.setBatch(tx, (selectorExpression != null? 0 : cursorPos)); sd.orderIndex.setBatch(tx, (selectorExpression == null ? cursorPos : -1));
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) { .hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next(); Entry<Long, MessageKeys> entry = iterator.next();
@ -773,7 +773,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void execute(Transaction tx) throws Exception { public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx); StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : 0)); sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : -1));
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) { .hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next(); Entry<Long, MessageKeys> entry = iterator.next();
@ -800,7 +800,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) { if (moc == null) {
long pos = sd.subscriptionAcks.get(tx, subscriptionKey); long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : 0)); sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : -1));
moc = sd.orderIndex.cursor; moc = sd.orderIndex.cursor;
} else { } else {
sd.orderIndex.cursor.sync(moc); sd.orderIndex.cursor.sync(moc);

View File

@ -1037,11 +1037,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
String subscriptionKey = command.getSubscriptionKey(); String subscriptionKey = command.getSubscriptionKey();
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, prev);
// Add it to the new location set. // Add it to the new location set.
addAckLocation(sd, sequence, subscriptionKey); addAckLocation(sd, sequence, subscriptionKey);
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, sequence);
} }
} }
@ -1152,16 +1152,17 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
break; break;
} }
} }
LOG.trace("gc candidates after first tx:" + firstTxLocation.getDataFileId() + ", " + gcCandidateSet);
} }
// Go through all the destinations to see if any of them can remove GC candidates. // Go through all the destinations to see if any of them can remove GC candidates.
for (StoredDestination sd : storedDestinations.values()) { for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
if( gcCandidateSet.isEmpty() ) { if( gcCandidateSet.isEmpty() ) {
break; break;
} }
// Use a visitor to cut down the number of pages that we load // Use a visitor to cut down the number of pages that we load
sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1; int last=-1;
public boolean isInterestedInKeysBetween(Location first, Location second) { public boolean isInterestedInKeysBetween(Location first, Location second) {
if( first==null ) { if( first==null ) {
@ -1199,10 +1200,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
}); });
LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
} }
// check we are not deleting file with ack for in-use journal files // check we are not deleting file with ack for in-use journal files
LOG.debug("gc candidates: " + gcCandidateSet); LOG.trace("gc candidates: " + gcCandidateSet);
final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet); final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
Iterator<Integer> candidates = gcCandidateSet.iterator(); Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) { while (candidates.hasNext()) {
@ -1219,7 +1221,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (gcCandidateSet.contains(candidate)) { if (gcCandidateSet.contains(candidate)) {
ackMessageFileMap.remove(candidate); ackMessageFileMap.remove(candidate);
} else { } else {
LOG.debug("not removing data file: " + candidate LOG.trace("not removing data file: " + candidate
+ " as contained ack(s) refer to referenced file: " + referencedFileIds); + " as contained ack(s) refer to referenced file: " + referencedFileIds);
} }
} }
@ -1504,24 +1506,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (hs != null) { if (hs != null) {
hs.remove(subscriptionKey); hs.remove(subscriptionKey);
if (hs.isEmpty()) { if (hs.isEmpty()) {
HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
sd.ackPositions.remove(sequenceId); sd.ackPositions.remove(sequenceId);
// Did we just empty out the first set in the ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
// ordered list of ack locations? Then it's time to sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
// delete some messages.
if (hs == firstSet) {
// Find all the entries that need to get deleted. // Do the actual delete.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); for (Entry<Long, MessageKeys> entry : deletes) {
sd.orderIndex.getDeleteList(tx, deletes, sequenceId); sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
// Do the actual deletes. sd.orderIndex.remove(tx, entry.getKey());
for (Entry<Long, MessageKeys> entry : deletes) {
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx,entry.getValue().messageId);
sd.orderIndex.remove(tx,entry.getKey());
}
} }
} }
} }
@ -2033,18 +2027,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
for (Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx); iterator.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next(); Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
if (entry.getKey().compareTo(sequenceId) == 0) { deletes.add(iterator.next());
// We don't do the actually delete while we are
// iterating the BTree since
// iterating would fail.
deletes.add(entry);
} else {
// no point in iterating the in-order sequences anymore
break;
}
}
} }
long getNextMessageId(int priority) { long getNextMessageId(int priority) {

View File

@ -26,10 +26,13 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.*; import javax.jms.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File; import java.io.File;
public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
private static final Log LOG = LogFactory.getLog(DurableSubscriptionOfflineTest.class);
public Boolean usePrioritySupport = Boolean.TRUE; public Boolean usePrioritySupport = Boolean.TRUE;
private BrokerService broker; private BrokerService broker;
private ActiveMQTopic topic; private ActiveMQTopic topic;
@ -65,6 +68,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
broker.setBrokerName(getName(true)); broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
broker.getManagementContext().setCreateConnector(false);
if (usePrioritySupport) { if (usePrioritySupport) {
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
@ -132,6 +136,104 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(sent, listener.count); assertEquals(sent, listener.count);
} }
public void testOfflineSubscription2() throws Exception {
// create durable subscription
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int sent = 0;
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
}
Thread.sleep(1 * 1000);
session.close();
con.close();
// consume messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
con.close();
assertEquals(sent, listener.count);
}
public void testOfflineSubscription3() throws Exception {
// create durable subscription
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
final int numMessages = 10;
int sent = 0;
for (int i = 0; i < numMessages; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
}
Thread.sleep(1 * 1000);
session.close();
con.close();
// consume messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
con.close();
LOG.info("Consumed: " + listener.count);
assertEquals(numMessages, listener.count);
// consume messages again, should not get any
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
listener = new Listener();
consumer.setMessageListener(listener);
Thread.sleep(3 * 1000);
session.close();
con.close();
assertEquals(0, listener.count);
}
public static class Listener implements MessageListener { public static class Listener implements MessageListener {
int count = 0; int count = 0;