From 514ef7de46556cadb2535420673de310f804c7af Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 22 Oct 2010 19:31:18 +0000 Subject: [PATCH] additional tests and refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985, selector scan was off by one and unmatched acking skipped some messages leaving them available to subsequent consumers. Also acking always left the first consumed message available for a reconnected durable git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1026457 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 3 +- .../activemq/store/kahadb/KahaDBStore.java | 6 +- .../store/kahadb/MessageDatabase.java | 55 ++++------ .../DurableSubscriptionOfflineTest.java | 102 ++++++++++++++++++ 4 files changed, 127 insertions(+), 39 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index fb1214b4b6..e20e440333 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1655,7 +1655,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (LOG.isDebugEnabled()) { LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " - + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount()); + + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount() + + ", dequeueCount: " + destinationStatistics.getDequeues().getCount()); } if (isLazyDispatch() && !force) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index d20c34ae2c..4d1c0a86aa 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -737,7 +737,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { selectorExpression = SelectorParser.parse(selector); } sd.orderIndex.resetCursorPosition(); - sd.orderIndex.setBatch(tx, (selectorExpression != null? 0 : cursorPos)); + sd.orderIndex.setBatch(tx, (selectorExpression == null ? cursorPos : -1)); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -773,7 +773,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); - sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : 0)); + sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : -1)); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -800,7 +800,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); if (moc == null) { long pos = sd.subscriptionAcks.get(tx, subscriptionKey); - sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : 0)); + sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : -1)); moc = sd.orderIndex.cursor; } else { sd.orderIndex.cursor.sync(moc); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index da4ce911f4..cb5f499d3a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1037,11 +1037,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar String subscriptionKey = command.getSubscriptionKey(); Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); - // The following method handles deleting un-referenced messages. - removeAckLocation(tx, sd, subscriptionKey, prev); - // Add it to the new location set. addAckLocation(sd, sequence, subscriptionKey); + + // The following method handles deleting un-referenced messages. + removeAckLocation(tx, sd, subscriptionKey, sequence); } } @@ -1152,16 +1152,17 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar break; } } + LOG.trace("gc candidates after first tx:" + firstTxLocation.getDataFileId() + ", " + gcCandidateSet); } // Go through all the destinations to see if any of them can remove GC candidates. - for (StoredDestination sd : storedDestinations.values()) { + for (Entry entry : storedDestinations.entrySet()) { if( gcCandidateSet.isEmpty() ) { break; } // Use a visitor to cut down the number of pages that we load - sd.locationIndex.visit(tx, new BTreeVisitor() { + entry.getValue().locationIndex.visit(tx, new BTreeVisitor() { int last=-1; public boolean isInterestedInKeysBetween(Location first, Location second) { if( first==null ) { @@ -1199,10 +1200,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } }); + LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); } // check we are not deleting file with ack for in-use journal files - LOG.debug("gc candidates: " + gcCandidateSet); + LOG.trace("gc candidates: " + gcCandidateSet); final TreeSet gcCandidates = new TreeSet(gcCandidateSet); Iterator candidates = gcCandidateSet.iterator(); while (candidates.hasNext()) { @@ -1219,7 +1221,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (gcCandidateSet.contains(candidate)) { ackMessageFileMap.remove(candidate); } else { - LOG.debug("not removing data file: " + candidate + LOG.trace("not removing data file: " + candidate + " as contained ack(s) refer to referenced file: " + referencedFileIds); } } @@ -1504,24 +1506,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (hs != null) { hs.remove(subscriptionKey); if (hs.isEmpty()) { - HashSet firstSet = sd.ackPositions.values().iterator().next(); sd.ackPositions.remove(sequenceId); - // Did we just empty out the first set in the - // ordered list of ack locations? Then it's time to - // delete some messages. - if (hs == firstSet) { + ArrayList> deletes = new ArrayList>(); + sd.orderIndex.getDeleteList(tx, deletes, sequenceId); - // Find all the entries that need to get deleted. - ArrayList> deletes = new ArrayList>(); - sd.orderIndex.getDeleteList(tx, deletes, sequenceId); - - // Do the actual deletes. - for (Entry entry : deletes) { - sd.locationIndex.remove(tx, entry.getValue().location); - sd.messageIdIndex.remove(tx,entry.getValue().messageId); - sd.orderIndex.remove(tx,entry.getKey()); - } + // Do the actual delete. + for (Entry entry : deletes) { + sd.locationIndex.remove(tx, entry.getValue().location); + sd.messageIdIndex.remove(tx, entry.getValue().messageId); + sd.orderIndex.remove(tx, entry.getKey()); } } } @@ -2033,19 +2027,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar void getDeleteList(Transaction tx, ArrayList> deletes, BTreeIndex index, Long sequenceId) throws IOException { - for (Iterator> iterator = index.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - if (entry.getKey().compareTo(sequenceId) == 0) { - // We don't do the actually delete while we are - // iterating the BTree since - // iterating would fail. - deletes.add(entry); - } else { - // no point in iterating the in-order sequences anymore - break; - } - } - } + + Iterator> iterator = index.iterator(tx, sequenceId); + deletes.add(iterator.next()); + } long getNextMessageId(int priority) { return nextMessageId++; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index fbde89ca51..9b0e642212 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -26,10 +26,13 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import javax.jms.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.io.File; public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { + private static final Log LOG = LogFactory.getLog(DurableSubscriptionOfflineTest.class); public Boolean usePrioritySupport = Boolean.TRUE; private BrokerService broker; private ActiveMQTopic topic; @@ -65,6 +68,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); broker.setBrokerName(getName(true)); broker.setDeleteAllMessagesOnStartup(true); + broker.getManagementContext().setCreateConnector(false); if (usePrioritySupport) { PolicyEntry policy = new PolicyEntry(); @@ -132,6 +136,104 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(sent, listener.count); } + public void testOfflineSubscription2() throws Exception { + // create durable subscription + Connection con = createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + sent++; + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } + + Thread.sleep(1 * 1000); + + session.close(); + con.close(); + + // consume messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals(sent, listener.count); + } + + public void testOfflineSubscription3() throws Exception { + // create durable subscription + Connection con = createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + final int numMessages = 10; + int sent = 0; + for (int i = 0; i < numMessages; i++) { + sent++; + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } + + Thread.sleep(1 * 1000); + + session.close(); + con.close(); + + // consume messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + LOG.info("Consumed: " + listener.count); + assertEquals(numMessages, listener.count); + + // consume messages again, should not get any + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + listener = new Listener(); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals(0, listener.count); + } + public static class Listener implements MessageListener { int count = 0;