https://issues.apache.org/jira/browse/AMQ-3206 - Unsubscribed durable sub can leave dangling message reference in kahaDB, visible after a restart

resolved by correctly updating the persitent index when a durable sub with an outstanding backlog is unsubscribed. Additional test that vaidates fix

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1078799 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-07 14:58:26 +00:00
parent 008b3e551c
commit 1c1aa17311
3 changed files with 82 additions and 9 deletions

View File

@ -500,7 +500,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
@Override @Override
public String toString() { public String toString() {
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
return "KahaDBPersistenceAdapter[" + path + "]"; return "KahaDBPersistenceAdapter[" + path + "]";
} }

View File

@ -1133,10 +1133,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx); StoredDestination sd = getStoredDestination(command.getDestination(), tx);
final String subscriptionKey = command.getSubscriptionKey();
// If set then we are creating it.. otherwise we are destroying the sub // If set then we are creating it.. otherwise we are destroying the sub
if (command.hasSubscriptionInfo()) { if (command.hasSubscriptionInfo()) {
String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.put(tx, subscriptionKey, command); sd.subscriptions.put(tx, subscriptionKey, command);
long ackLocation=NOT_ACKED; long ackLocation=NOT_ACKED;
if (!command.getRetroactive()) { if (!command.getRetroactive()) {
@ -1147,7 +1147,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
} else { } else {
// delete the sub... // delete the sub...
String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.remove(tx, subscriptionKey); sd.subscriptions.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey);
@ -1206,7 +1205,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if( gcCandidateSet.isEmpty() ) { if( gcCandidateSet.isEmpty() ) {
break; break;
} }
// Use a visitor to cut down the number of pages that we load // Use a visitor to cut down the number of pages that we load
entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1; int last=-1;
@ -1234,7 +1233,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
return !subset.isEmpty(); return !subset.isEmpty();
} }
} }
public void visit(List<Location> keys, List<Long> values) { public void visit(List<Location> keys, List<Long> values) {
for (Location l : keys) { for (Location l : keys) {
int fileId = l.getDataFileId(); int fileId = l.getDataFileId();
@ -1242,9 +1241,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
gcCandidateSet.remove(fileId); gcCandidateSet.remove(fileId);
last = fileId; last = fileId;
} }
} }
} }
}); });
LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
} }
@ -1669,7 +1667,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
if (!sd.ackPositions.isEmpty(tx)) { if (!sd.ackPositions.isEmpty(tx)) {
Long end = sd.ackPositions.getLast(tx).getKey(); Long end = sd.ackPositions.getLast(tx).getKey();
for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) { for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) {
removeAckLocation(tx, sd, subscriptionKey, sequence); removeAckLocation(tx, sd, subscriptionKey, sequence);
@ -1704,6 +1702,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey()); sd.orderIndex.remove(tx, entry.getKey());
} }
} else {
// update
sd.ackPositions.put(tx, sequenceId, hs);
} }
} }
} }

View File

@ -32,13 +32,17 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.kahadb.journal.Journal;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class); private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
public Boolean usePrioritySupport = Boolean.TRUE; public boolean usePrioritySupport = Boolean.TRUE;
public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private BrokerService broker; private BrokerService broker;
private ActiveMQTopic topic; private ActiveMQTopic topic;
private Vector<Exception> exceptions = new Vector<Exception>(); private Vector<Exception> exceptions = new Vector<Exception>();
@ -97,6 +101,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
// ensure it kicks in during tests // ensure it kicks in during tests
((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000); ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
} else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
// have lots of journal files
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
} }
broker.start(); broker.start();
} }
@ -1049,6 +1056,71 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
con.close(); con.close();
} }
// use very small journal to get lots of files to cleanup
public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
this.addCombinationValues("journalMaxFileLength",
new Object[]{new Integer(64*1024)});
}
// https://issues.apache.org/jira/browse/AMQ-3206
public void testCleanupDeletedSubAfterRestart() throws Exception {
Connection con = createConnection("cli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", null, true);
session.close();
con.close();
con = createConnection("cli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", null, true);
session.close();
con.close();
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
final int toSend = 500;
final String payload = new byte[40*1024].toString();
int sent = 0;
for (int i = sent; i < toSend; i++) {
Message message = session.createTextMessage(payload);
message.setStringProperty("filter", "false");
message.setIntProperty("ID", i);
producer.send(topic, message);
sent++;
}
con.close();
LOG.info("sent: " + sent);
// kill off cli1
con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe("SubsId");
destroyBroker();
createBroker(false);
con = createConnection("cli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
final Listener listener = new Listener();
consumer.setMessageListener(listener);
assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
LOG.info("Want: " + toSend + ", current: " + listener.count);
return listener.count == toSend;
}
}));
session.close();
con.close();
destroyBroker();
createBroker(false);
KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
}
public static class Listener implements MessageListener { public static class Listener implements MessageListener {
int count = 0; int count = 0;
String id = null; String id = null;