From 32714018830bb49a7ae8a437cc21bddc7d92ab72 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 1 Jun 2010 17:28:21 +0000 Subject: [PATCH] resolve durable restart test regressions with kahaDB as default store, the next message sequence id could get out of sync for a durable sub when drained and reused, fix for https://issues.apache.org/activemq/browse/AMQ-2755 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@950171 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/kahadb/MessageDatabase.java | 13 ++++++++- .../DurableConsumerCloseAndReconnectTest.java | 27 ++++++++++++++----- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 979509c517..4b155baf7b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -92,6 +92,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar public static final int CLOSED_STATE = 1; public static final int OPEN_STATE = 2; + private static final long NOT_ACKED = -1; protected class Metadata { @@ -945,7 +946,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (command.hasSubscriptionInfo()) { String subscriptionKey = command.getSubscriptionKey(); sd.subscriptions.put(tx, subscriptionKey, command); - long ackLocation=-1; + long ackLocation=NOT_ACKED; if (!command.getRetroactive()) { ackLocation = sd.nextMessageId-1; } @@ -1263,6 +1264,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar Entry entry = iterator.next(); addAckLocation(rc, entry.getValue(), entry.getKey()); } + + if (rc.nextMessageId == 0) { + // check for existing durable sub all acked out - pull next seq from acks as messages are gone + if (!rc.ackPositions.isEmpty()) { + Long lastAckedMessageId = rc.ackPositions.lastKey(); + if (lastAckedMessageId != NOT_ACKED) { + rc.nextMessageId = lastAckedMessageId+1; + } + } + } } return rc; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java index b1b30d42d2..ce6517d0ec 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java @@ -46,6 +46,26 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { private Destination destination; private int messageCount; + + @Override + protected void setUp() throws Exception { + super.setUp(); + deleteAllMessages(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + deleteAllMessages(); + } + + private void deleteAllMessages() throws Exception { + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true"); + Connection dummyConnection = fac.createConnection(); + dummyConnection.start(); + dummyConnection.close(); + } + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false"); } @@ -60,12 +80,7 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport { dummyConnection.close(); // now lets try again without one connection open - consumeMessagesDeliveredWhileConsumerClosed(); - // now delete the db - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true"); - dummyConnection = fac.createConnection(); - dummyConnection.start(); - dummyConnection.close(); + consumeMessagesDeliveredWhileConsumerClosed(); } protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {