diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 7be221b1a0..6db6509568 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -500,7 +500,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi @Override public String toString() { - String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET"; + String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; return "KahaDBPersistenceAdapter[" + path + "]"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8b9a15123f..409517bf5b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1133,10 +1133,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); + final String subscriptionKey = command.getSubscriptionKey(); // If set then we are creating it.. otherwise we are destroying the sub if (command.hasSubscriptionInfo()) { - String subscriptionKey = command.getSubscriptionKey(); sd.subscriptions.put(tx, subscriptionKey, command); long ackLocation=NOT_ACKED; if (!command.getRetroactive()) { @@ -1147,7 +1147,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); } else { // delete the sub... - String subscriptionKey = command.getSubscriptionKey(); sd.subscriptions.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey); @@ -1206,7 +1205,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if( gcCandidateSet.isEmpty() ) { break; } - + // Use a visitor to cut down the number of pages that we load entry.getValue().locationIndex.visit(tx, new BTreeVisitor() { int last=-1; @@ -1234,7 +1233,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return !subset.isEmpty(); } } - + public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); @@ -1242,9 +1241,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar gcCandidateSet.remove(fileId); last = fileId; } - } + } } - }); LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); } @@ -1669,7 +1667,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { - if (!sd.ackPositions.isEmpty(tx)) { + if (!sd.ackPositions.isEmpty(tx)) { Long end = sd.ackPositions.getLast(tx).getKey(); for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) { removeAckLocation(tx, sd, subscriptionKey, sequence); @@ -1704,6 +1702,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.orderIndex.remove(tx, entry.getKey()); } + } else { + // update + sd.ackPositions.put(tx, sequenceId, hs); } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index a804a935fe..9d685b44b6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -32,13 +32,17 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.apache.kahadb.journal.Journal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class); - public Boolean usePrioritySupport = Boolean.TRUE; + public boolean usePrioritySupport = Boolean.TRUE; + public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; private BrokerService broker; private ActiveMQTopic topic; private Vector exceptions = new Vector(); @@ -97,6 +101,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { // ensure it kicks in during tests ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000); + } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + // have lots of journal files + ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength); } broker.start(); } @@ -1049,6 +1056,71 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con.close(); } + // use very small journal to get lots of files to cleanup + public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception { + this.addCombinationValues("journalMaxFileLength", + new Object[]{new Integer(64*1024)}); + } + + // https://issues.apache.org/jira/browse/AMQ-3206 + public void testCleanupDeletedSubAfterRestart() throws Exception { + Connection con = createConnection("cli1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", null, true); + session.close(); + con.close(); + + con = createConnection("cli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", null, true); + session.close(); + con.close(); + + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + final int toSend = 500; + final String payload = new byte[40*1024].toString(); + int sent = 0; + for (int i = sent; i < toSend; i++) { + Message message = session.createTextMessage(payload); + message.setStringProperty("filter", "false"); + message.setIntProperty("ID", i); + producer.send(topic, message); + sent++; + } + con.close(); + LOG.info("sent: " + sent); + + // kill off cli1 + con = createConnection("cli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe("SubsId"); + + destroyBroker(); + createBroker(false); + + con = createConnection("cli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); + final Listener listener = new Listener(); + consumer.setMessageListener(listener); + assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + LOG.info("Want: " + toSend + ", current: " + listener.count); + return listener.count == toSend; + } + })); + session.close(); + con.close(); + + destroyBroker(); + createBroker(false); + KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size()); + } + public static class Listener implements MessageListener { int count = 0; String id = null;