From 6ddbba485027571598d8976d4528e46d310555f2 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Sat, 23 Oct 2010 01:36:13 +0000 Subject: [PATCH] additional tests and refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985, not updating subscriptionAck with an unmatched message resolves recovery of unmatched selector durables, reveting the previous change git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1026543 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/store/kahadb/KahaDBStore.java | 19 +++--- .../store/kahadb/MessageDatabase.java | 32 ++++++--- .../DurableSubscriptionOfflineTest.java | 65 +++++++++++++++++-- 3 files changed, 93 insertions(+), 23 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 4d1c0a86aa..c0279c6029 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -631,19 +631,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } } else { - doAcknowledge(context, subscriptionKey, messageId); + doAcknowledge(context, subscriptionKey, messageId, ack); } } else { - doAcknowledge(context, subscriptionKey, messageId); + doAcknowledge(context, subscriptionKey, messageId, ack); } } - protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId) + protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); command.setSubscriptionKey(subscriptionKey); command.setMessageId(messageId.toString()); + if (ack != null && ack.isUnmatchedAck()) { + command.setAck(UNMATCHED); + } store(command, false, null, null); } @@ -737,7 +740,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { selectorExpression = SelectorParser.parse(selector); } sd.orderIndex.resetCursorPosition(); - sd.orderIndex.setBatch(tx, (selectorExpression == null ? cursorPos : -1)); + sd.orderIndex.setBatch(tx, cursorPos); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -766,14 +769,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); - sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : -1)); + sd.orderIndex.setBatch(tx, cursorPos); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -790,7 +792,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { @@ -800,7 +801,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); if (moc == null) { long pos = sd.subscriptionAcks.get(tx, subscriptionKey); - sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : -1)); + sd.orderIndex.setBatch(tx, pos); moc = sd.orderIndex.cursor; } else { sd.orderIndex.cursor.sync(moc); @@ -1228,7 +1229,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // apply any acks we have synchronized (this.subscriptionKeys) { for (String key : this.subscriptionKeys) { - this.topicStore.doAcknowledge(context, key, this.message.getMessageId()); + this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index cb5f499d3a..dcc3f7a09b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -88,6 +88,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0")); + protected static final Buffer UNMATCHED = new Buffer(new byte[]{}); private static final Log LOG = LogFactory.getLog(MessageDatabase.class); private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; @@ -1037,11 +1038,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar String subscriptionKey = command.getSubscriptionKey(); Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); + if (command.getAck() == UNMATCHED) { + sd.subscriptionAcks.put(tx, subscriptionKey, prev); + } + // The following method handles deleting un-referenced messages. + removeAckLocation(tx, sd, subscriptionKey, prev); + // Add it to the new location set. addAckLocation(sd, sequence, subscriptionKey); - - // The following method handles deleting un-referenced messages. - removeAckLocation(tx, sd, subscriptionKey, sequence); } } @@ -1506,16 +1510,24 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (hs != null) { hs.remove(subscriptionKey); if (hs.isEmpty()) { + HashSet firstSet = sd.ackPositions.values().iterator().next(); sd.ackPositions.remove(sequenceId); - ArrayList> deletes = new ArrayList>(); - sd.orderIndex.getDeleteList(tx, deletes, sequenceId); + // Did we just empty out the first set in the + // ordered list of ack locations? Then it's time to + // delete some messages. + if (hs == firstSet) { - // Do the actual delete. - for (Entry entry : deletes) { - sd.locationIndex.remove(tx, entry.getValue().location); - sd.messageIdIndex.remove(tx, entry.getValue().messageId); - sd.orderIndex.remove(tx, entry.getKey()); + // Find all the entries that need to get deleted. + ArrayList> deletes = new ArrayList>(); + sd.orderIndex.getDeleteList(tx, deletes, sequenceId); + + // Do the actual deletes. + for (Entry entry : deletes) { + sd.locationIndex.remove(tx, entry.getValue().location); + sd.messageIdIndex.remove(tx,entry.getValue().messageId); + sd.orderIndex.remove(tx,entry.getKey()); + } } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 9b0e642212..1be207230e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -43,8 +43,12 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp @Override protected Connection createConnection() throws Exception { + return createConnection("cliName"); + } + + protected Connection createConnection(String name) throws Exception { Connection con = super.createConnection(); - con.setClientID("cliName"); + con.setClientID(name); con.start(); return con; } @@ -190,9 +194,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(null); - final int numMessages = 10; int sent = 0; - for (int i = 0; i < numMessages; i++) { + for (int i = 0; i < 10; i++) { sent++; Message message = session.createMessage(); message.setStringProperty("filter", "true"); @@ -217,7 +220,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con.close(); LOG.info("Consumed: " + listener.count); - assertEquals(numMessages, listener.count); + assertEquals(sent, listener.count); // consume messages again, should not get any con = createConnection(); @@ -233,6 +236,60 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(0, listener.count); } + + public void testOfflineSubscription4() throws Exception { + // create durable subscription 1 + Connection con = createConnection("cliId1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // create durable subscription 2 + Connection con2 = createConnection("cliId2"); + Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener2 = new Listener(); + consumer2.setMessageListener(listener2); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + sent++; + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } + + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + // test online subs + Thread.sleep(3 * 1000); + session2.close(); + con2.close(); + + assertEquals(sent, listener2.count); + + // consume messages + con = createConnection("cliId1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals("offline consumer got all", sent, listener.count); + } public static class Listener implements MessageListener { int count = 0;