From 1463bec066c199d60402c2d3d594adc05a007e6a Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 22 Mar 2012 20:01:40 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3768: ClassCastException when running some Durable Consumer test cases. root cause of the classcast was the reuse of a freed node that was still referenced as the head page id of a listindex. The fix is to not modify the head page id of a listindex when removing and coalescing linked pages, the head page remains valid for the duration of a subscription. Eventually got a test case that could easlily reproduce git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1304020 13f79535-47bb-0310-9956-ffa450edef68 --- .../DurableSubscriptionOfflineTest.java | 48 +++++++++++++++++++ .../org/apache/kahadb/index/ListIndex.java | 12 ++--- .../org/apache/kahadb/index/ListNode.java | 20 ++++++-- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 7d7aafdc2d..64c1f28081 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -33,6 +33,8 @@ import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -1244,6 +1246,52 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size()); } + // https://issues.apache.org/jira/browse/AMQ-3768 + public void testPageReuse() throws Exception { + Connection con = null; + Session session = null; + + final int numConsumers = 115; + for (int i=0; i<=numConsumers;i++) { + con = createConnection("cli" + i); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", null, true); + session.close(); + con.close(); + } + + + // populate ack locations + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + Message message = session.createTextMessage(new byte[10].toString()); + producer.send(topic, message); + con.close(); + + // we have a split, remove all but the last so that + // the head pageid changes in the acklocations listindex + for (int i=0; i<=numConsumers -1; i++) { + con = createConnection("cli" + i); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe("SubsId"); + session.close(); + con.close(); + } + + destroyBroker(); + createBroker(false); + + // create a bunch more subs to reuse the freed page and get us in a knot + for (int i=1; i<=numConsumers;i++) { + con = createConnection("cli" + i); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", filter, true); + session.close(); + con.close(); + } + } + public static class Listener implements MessageListener { int count = 0; String id = null; diff --git a/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java b/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java index 8ca5b3a94e..7b44bfc831 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java @@ -304,14 +304,10 @@ public class ListIndex implements Index { ListNode loadNode(Transaction tx, long pageId) throws IOException { Page> page = tx.load(pageId, marshaller); - try { - ListNode node = page.get(); - node.setPage(page); - node.setContainingList(this); - return node; - } catch (ClassCastException e) { - throw e; - } + ListNode node = page.get(); + node.setPage(page); + node.setContainingList(this); + return node; } ListNode createNode(Page> page) throws IOException { diff --git a/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java b/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java index 28ccc47cb1..4bd4501dbe 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java @@ -198,10 +198,22 @@ public final class ListNode { if (currentNode.isHead() && currentNode.isTail()) { // store empty list } else if (currentNode.isHead()) { - // new head + // merge next node into existing headNode + // as we don't want to change our headPageId b/c + // that is our identity + ListNode headNode = currentNode; + nextEntry = getFromNextNode(); // will move currentNode + + if (currentNode.isTail()) { + targetList.setTailPageId(headNode.getPageId()); + } + // copy next/currentNode into head + headNode.setEntries(currentNode.entries); + headNode.setNext(currentNode.getNext()); + headNode.store(tx); toRemoveNode = currentNode; - nextEntry = getFromNextNode(); - targetList.setHeadPageId(currentNode.getPageId()); + currentNode = headNode; + } else if (currentNode.isTail()) { toRemoveNode = currentNode; previousNode.setNext(ListIndex.NOT_SET); @@ -262,7 +274,7 @@ public final class ListNode { @SuppressWarnings({ "unchecked", "rawtypes" }) public ListNode readPayload(DataInput is) throws IOException { ListNode node = new ListNode(); - node.next = is.readLong(); + node.setNext(is.readLong()); final short size = is.readShort(); for (short i = 0; i < size; i++) { node.entries.addLast(