resolve durable restart test regressions with kahaDB as default store, the next message sequence id could get out of sync for a durable sub when drained and reused, fix for https://issues.apache.org/activemq/browse/AMQ-2755

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@950171 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-06-01 17:28:21 +00:00
parent 7f0ced9402
commit 3271401883
2 changed files with 33 additions and 7 deletions

View File

@ -92,6 +92,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
public static final int CLOSED_STATE = 1; public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2; public static final int OPEN_STATE = 2;
private static final long NOT_ACKED = -1;
protected class Metadata { protected class Metadata {
@ -945,7 +946,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (command.hasSubscriptionInfo()) { if (command.hasSubscriptionInfo()) {
String subscriptionKey = command.getSubscriptionKey(); String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.put(tx, subscriptionKey, command); sd.subscriptions.put(tx, subscriptionKey, command);
long ackLocation=-1; long ackLocation=NOT_ACKED;
if (!command.getRetroactive()) { if (!command.getRetroactive()) {
ackLocation = sd.nextMessageId-1; ackLocation = sd.nextMessageId-1;
} }
@ -1263,6 +1264,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
Entry<String, Long> entry = iterator.next(); Entry<String, Long> entry = iterator.next();
addAckLocation(rc, entry.getValue(), entry.getKey()); addAckLocation(rc, entry.getValue(), entry.getKey());
} }
if (rc.nextMessageId == 0) {
// check for existing durable sub all acked out - pull next seq from acks as messages are gone
if (!rc.ackPositions.isEmpty()) {
Long lastAckedMessageId = rc.ackPositions.lastKey();
if (lastAckedMessageId != NOT_ACKED) {
rc.nextMessageId = lastAckedMessageId+1;
}
}
}
} }
return rc; return rc;

View File

@ -46,6 +46,26 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport {
private Destination destination; private Destination destination;
private int messageCount; private int messageCount;
@Override
protected void setUp() throws Exception {
super.setUp();
deleteAllMessages();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
deleteAllMessages();
}
private void deleteAllMessages() throws Exception {
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
Connection dummyConnection = fac.createConnection();
dummyConnection.start();
dummyConnection.close();
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false"); return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false");
} }
@ -60,12 +80,7 @@ public class DurableConsumerCloseAndReconnectTest extends TestSupport {
dummyConnection.close(); dummyConnection.close();
// now lets try again without one connection open // now lets try again without one connection open
consumeMessagesDeliveredWhileConsumerClosed(); consumeMessagesDeliveredWhileConsumerClosed();
// now delete the db
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
dummyConnection = fac.createConnection();
dummyConnection.start();
dummyConnection.close();
} }
protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception { protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {