mirror of
https://github.com/apache/activemq.git
synced 2025-02-27 12:55:32 +00:00
https://issues.apache.org/jira/browse/AMQ-3768: ClassCastException when running some Durable Consumer test cases. root cause of the classcast was the reuse of a freed node that was still referenced as the head page id of a listindex. The fix is to not modify the head page id of a listindex when removing and coalescing linked pages, the head page remains valid for the duration of a subscription. Eventually got a test case that could easlily reproduce
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1304020 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5cd9ebaeb7
commit
1463bec066
@ -33,6 +33,8 @@ import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
|
||||
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
@ -1244,6 +1246,52 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||
assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
|
||||
}
|
||||
|
||||
// https://issues.apache.org/jira/browse/AMQ-3768
|
||||
public void testPageReuse() throws Exception {
|
||||
Connection con = null;
|
||||
Session session = null;
|
||||
|
||||
final int numConsumers = 115;
|
||||
for (int i=0; i<=numConsumers;i++) {
|
||||
con = createConnection("cli" + i);
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", null, true);
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
|
||||
|
||||
// populate ack locations
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
Message message = session.createTextMessage(new byte[10].toString());
|
||||
producer.send(topic, message);
|
||||
con.close();
|
||||
|
||||
// we have a split, remove all but the last so that
|
||||
// the head pageid changes in the acklocations listindex
|
||||
for (int i=0; i<=numConsumers -1; i++) {
|
||||
con = createConnection("cli" + i);
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.unsubscribe("SubsId");
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
|
||||
destroyBroker();
|
||||
createBroker(false);
|
||||
|
||||
// create a bunch more subs to reuse the freed page and get us in a knot
|
||||
for (int i=1; i<=numConsumers;i++) {
|
||||
con = createConnection("cli" + i);
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Listener implements MessageListener {
|
||||
int count = 0;
|
||||
String id = null;
|
||||
|
@ -304,14 +304,10 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||
|
||||
ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
|
||||
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
|
||||
try {
|
||||
ListNode<Key, Value> node = page.get();
|
||||
node.setPage(page);
|
||||
node.setContainingList(this);
|
||||
return node;
|
||||
} catch (ClassCastException e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
|
||||
|
@ -198,10 +198,22 @@ public final class ListNode<Key,Value> {
|
||||
if (currentNode.isHead() && currentNode.isTail()) {
|
||||
// store empty list
|
||||
} else if (currentNode.isHead()) {
|
||||
// new head
|
||||
// merge next node into existing headNode
|
||||
// as we don't want to change our headPageId b/c
|
||||
// that is our identity
|
||||
ListNode<Key,Value> headNode = currentNode;
|
||||
nextEntry = getFromNextNode(); // will move currentNode
|
||||
|
||||
if (currentNode.isTail()) {
|
||||
targetList.setTailPageId(headNode.getPageId());
|
||||
}
|
||||
// copy next/currentNode into head
|
||||
headNode.setEntries(currentNode.entries);
|
||||
headNode.setNext(currentNode.getNext());
|
||||
headNode.store(tx);
|
||||
toRemoveNode = currentNode;
|
||||
nextEntry = getFromNextNode();
|
||||
targetList.setHeadPageId(currentNode.getPageId());
|
||||
currentNode = headNode;
|
||||
|
||||
} else if (currentNode.isTail()) {
|
||||
toRemoveNode = currentNode;
|
||||
previousNode.setNext(ListIndex.NOT_SET);
|
||||
@ -262,7 +274,7 @@ public final class ListNode<Key,Value> {
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
|
||||
ListNode<Key,Value> node = new ListNode<Key,Value>();
|
||||
node.next = is.readLong();
|
||||
node.setNext(is.readLong());
|
||||
final short size = is.readShort();
|
||||
for (short i = 0; i < size; i++) {
|
||||
node.entries.addLast(
|
||||
|
Loading…
x
Reference in New Issue
Block a user