diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 14d9718537..48e79ba91d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -717,7 +717,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public int getMessageCount(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { return pageFile.tx().execute(new Transaction.CallableClosure() { public Integer execute(Transaction tx) throws IOException { @@ -727,8 +727,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // The subscription might not exist. return 0; } - MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1); - + sd.orderIndex.resetCursorPosition(); + sd.orderIndex.setBatch(tx, cursorPos); int counter = 0; try { String selector = info.getSelector(); @@ -736,7 +736,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (selector != null) { selectorExpression = SelectorParser.parse(selector); } - for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); if (selectorExpression != null) { @@ -757,7 +757,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } @@ -786,15 +786,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.readLock().lock(); + indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); + sd.orderIndex.resetCursorPosition(); MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); if (moc == null) { long pos = sd.subscriptionAcks.get(tx, subscriptionKey); - moc = new MessageOrderCursor(pos+1); + sd.orderIndex.setBatch(tx, pos); + moc = sd.orderIndex.cursor; + } else { + sd.orderIndex.cursor.sync(moc); } Entry entry = null; @@ -813,11 +817,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (entry != null) { MessageOrderCursor copy = sd.orderIndex.cursor.copy(); sd.subscriptionCursors.put(subscriptionKey, copy); + if (LOG.isDebugEnabled()) { + LOG.debug("updated moc: " + copy + ", recovered: " + counter); + } } } }); }finally { - indexLock.readLock().unlock(); + indexLock.writeLock().unlock(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 3b83ee0398..8a19ee219a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1875,6 +1875,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar lowPriorityCursorPosition++; } } + + public String toString() { + return "MessageOrderCursor:[def:" + defaultCursorPosition + + ", low:" + lowPriorityCursorPosition + + ", high:" + highPriorityCursorPosition + "]"; + } + + public void sync(MessageOrderCursor other) { + this.defaultCursorPosition=other.defaultCursorPosition; + this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; + this.highPriorityCursorPosition=other.highPriorityCursorPosition; + } } class MessageOrderIndex{ @@ -2010,11 +2022,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar void getDeleteList(Transaction tx, ArrayList> deletes, Long sequenceId) throws IOException { - getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); - if (highPriorityIndex != null) { + if (defaultPriorityIndex.containsKey(tx, sequenceId)) { + getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); + } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { getDeleteList(tx, deletes, highPriorityIndex, sequenceId); - } - if (lowPriorityIndex != null) { + } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); } } @@ -2073,7 +2085,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar final Iterator>highIterator; final Iterator>defaultIterator; final Iterator>lowIterator; - Long lastKey; diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index 3d85b84ebc..6ab31bcca0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -27,6 +27,7 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -49,7 +50,8 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { Session sess; public boolean useCache; - + public int prefetchVal = 500; + int MSG_NUM = 1000; int HIGH_PRI = 7; int LOW_PRI = 3; @@ -59,6 +61,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { protected void setUp() throws Exception { broker = new BrokerService(); broker.setBrokerName("priorityTest"); + broker.setAdvisorySupport(false); adapter = createPersistenceAdapter(true); broker.setPersistenceAdapter(adapter); PolicyEntry policy = new PolicyEntry(); @@ -71,6 +74,10 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { broker.waitUntilStarted(); factory = new ActiveMQConnectionFactory("vm://priorityTest"); + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setAll(prefetchVal); + factory.setPrefetchPolicy(prefetch); + factory.setWatchTopicAdvisories(false); conn = factory.createConnection(); conn.setClientID("priority"); conn.start(); @@ -159,6 +166,10 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { LOG.info("Sending " + text); return msg; } + + public void initCombosForTestDurableSubs() { + addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/4)}); + } public void testDurableSubs() throws Exception { ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST"); @@ -176,11 +187,45 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { sub = sess.createDurableSubscriber(topic, "priority"); for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = sub.receive(1000); + Message msg = sub.receive(5000); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); } } + + public void initCombosForTestDurableSubsReconnect() { + addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)}); + } + + public void testDurableSubsReconnect() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST"); + final String subName = "priorityDisconnect"; + TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); + sub.close(); + + ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI); + ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI); + + lowPri.start(); + highPri.start(); + + lowPri.join(); + highPri.join(); + + + final int closeFrequency = MSG_NUM/4; + sub = sess.createDurableSubscriber(topic, subName); + for (int i = 0; i < MSG_NUM * 2; i++) { + Message msg = sub.receive(5000); + assertNotNull("Message " + i + " was null", msg); + assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); + if (i>0 && i%closeFrequency==0) { + LOG.info("Closing durable sub.. on: " + i); + sub.close(); + sub = sess.createDurableSubscriber(topic, subName); + } + } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java index 8f55505852..549c6116bd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -40,4 +40,9 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { return suite(JDBCMessagePriorityTest.class); } + // pending fix... + @Override + public void testDurableSubsReconnect() throws Exception { + // TODO: fix jdbc durable sub recovery + } }