From 5cbb4db7f655ffdddc73329fdd2633c037c64c63 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 18 Oct 2010 09:27:43 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2985 - the use of selectors means replay and recovery from the begining of the store. unmatched are removed on initial dispatch git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1023704 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/DurableTopicSubscription.java | 1 + .../activemq/store/kahadb/KahaDBStore.java | 18 ++- .../store/kahadb/MessageDatabase.java | 2 +- .../DurableSubscriptionOfflineTest.java | 123 ++++++++++++++++++ 4 files changed, 136 insertions(+), 8 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 21537e322b..7a3b631f00 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -150,6 +150,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } public void deactivate(boolean keepDurableSubsActive) throws Exception { + LOG.debug("Dectivating " + this); active = false; this.usageManager.getMemoryUsage().removeUsageListener(this); synchronized (pending) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 48e79ba91d..76213abd3e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -727,8 +727,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // The subscription might not exist. return 0; } - sd.orderIndex.resetCursorPosition(); - sd.orderIndex.setBatch(tx, cursorPos); + int counter = 0; try { String selector = info.getSelector(); @@ -736,6 +735,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (selector != null) { selectorExpression = SelectorParser.parse(selector); } + sd.orderIndex.resetCursorPosition(); + sd.orderIndex.setBatch(tx, (selectorExpression != null? 0 : cursorPos)); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -764,28 +765,31 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.readLock().lock(); + final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); + indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); - MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1); - for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator + sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos : 0)); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); listener.recoverMessage(loadMessage(entry.getValue().location)); } + sd.orderIndex.resetCursorPosition(); } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); + final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { @@ -795,7 +799,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); if (moc == null) { long pos = sd.subscriptionAcks.get(tx, subscriptionKey); - sd.orderIndex.setBatch(tx, pos); + sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos : 0)); moc = sd.orderIndex.cursor; } else { sd.orderIndex.cursor.sync(moc); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8a19ee219a..da4ce911f4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2035,7 +2035,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar BTreeIndex index, Long sequenceId) throws IOException { for (Iterator> iterator = index.iterator(tx); iterator.hasNext();) { Entry entry = iterator.next(); - if (entry.getKey().compareTo(sequenceId) <= 0) { + if (entry.getKey().compareTo(sequenceId) == 0) { // We don't do the actually delete while we are // iterating the BTree since // iterating would fail. diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java new file mode 100644 index 0000000000..3a9f04a590 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; + +import javax.jms.*; +import java.io.File; + +public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { + + private BrokerService broker; + private ActiveMQTopic topic; + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://" + getName()); + } + + @Override + protected Connection createConnection() throws Exception { + Connection con = super.createConnection(); + con.setClientID("cliName"); + con.start(); + return con; + } + + protected void setUp() throws Exception { + topic = (ActiveMQTopic) createDestination(); + createBroker(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + destroyBroker(); + } + + private void createBroker() throws Exception { + broker = BrokerFactory.createBroker("broker:(vm://localhost)"); + broker.setBrokerName(getName()); + broker.setDeleteAllMessagesOnStartup(true); + + broker.setPersistent(true); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File("activemq-data-kaha/" + getName())); + broker.setPersistenceAdapter(persistenceAdapter); + + broker.start(); + } + + private void destroyBroker() throws Exception { + if (broker != null) + broker.stop(); + } + + public void testOfflineSubscription() throws Exception { + // create durable subscription + Connection con = createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + boolean filter = i % 2 == 1; + if (filter) + sent++; + + Message message = session.createMessage(); + message.setStringProperty("filter", filter ? "true" : "false"); + producer.send(topic, message); + } + + session.close(); + con.close(); + + // consume messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals(sent, listener.count); + } + + public static class Listener implements MessageListener { + int count = 0; + + public void onMessage(Message message) { + count++; + } + } +}