AMQ-7129 - Properly recover messages from KahaDB for a durable when there are

messages to recover before the stored lastAck value

With individual ack mode we need to check the durable ackPosition
sequence set in the KahaDB index on subsription load to see if there are
earlier messages before the lastAck value that still haven't been acked.
While this normally wouldn't happen it is possible in individual ack
mode
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-01-09 12:51:03 -05:00
parent 49296f9259
commit 25de20c77e
3 changed files with 394 additions and 2 deletions

View File

@ -80,6 +80,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@ -1086,7 +1087,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
//If we have ackPositions tracked then compare the first one as individual acknowledge mode
//may have bumped lastAck even though there are earlier messages to still consume
if (subAckPositions != null && !subAckPositions.isEmpty()
&& subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) {
//we have messages to ack before lastAckedSequence
sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
} else {
subAckPositions = null;
sd.orderIndex.setBatch(tx, cursorPos);
}
recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
@ -1094,6 +1105,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the sequence set contains the message still
//and if it doesn't skip it
if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
continue;
}
listener.recoverMessage(loadMessage(entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
@ -1118,13 +1134,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
SequenceSet subAckPositions = null;
if (moc == null) {
LastAck pos = getLastAck(tx, sd, subscriptionKey);
if (pos == null) {
// sub deleted
return;
}
sd.orderIndex.setBatch(tx, pos);
subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
//If we have ackPositions tracked then compare the first one as individual acknowledge mode
//may have bumped lastAck even though there are earlier messages to still consume
if (subAckPositions != null && !subAckPositions.isEmpty()
&& subAckPositions.getHead().getFirst() < pos.lastAckedSequence) {
//we have messages to ack before lastAckedSequence
sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
} else {
subAckPositions = null;
sd.orderIndex.setBatch(tx, pos);
}
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
@ -1138,6 +1165,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the sequence set contains the message still
//and if it doesn't skip it
if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
continue;
}
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++;
}
@ -1536,6 +1568,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
super(runnable, null);
}
@Override
public void setException(final Throwable e) {
super.setException(e);
}

View File

@ -3000,6 +3000,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return sd.subscriptionAcks.get(tx, subscriptionKey);
}
protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
if (sd.ackPositions != null) {
final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
return messageSequences;
}
return null;
}
protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
if (sd.ackPositions != null) {
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);

View File

@ -0,0 +1,350 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class KahaDBDurableMessageRecoveryTest {
@Parameters(name = "recoverIndex")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
private BrokerService broker;
private URI brokerConnectURI;
private boolean recoverIndex;
@Before
public void setUpBroker() throws Exception {
startBroker(false);
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
/**
* @param deleteIndex
*/
public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) {
super();
this.recoverIndex = recoverIndex;
}
protected void startBroker(boolean recoverIndex) throws Exception {
broker = new BrokerService();
broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir.getRoot());
TransportConnector connector = broker.addConnector(new TransportConnector());
connector.setUri(new URI("tcp://0.0.0.0:0"));
connector.setName("tcp");
configurePersistence(broker, recoverIndex);
broker.start();
broker.waitUntilStarted();
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
}
protected void configurePersistence(BrokerService brokerService, boolean forceRecoverIndex) throws Exception {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
adapter.setForceRecoverIndex(forceRecoverIndex);
// set smaller size for test
adapter.setJournalMaxFileLength(1024 * 20);
}
protected void restartBroker(boolean deleteIndex) throws Exception {
stopBroker();
startBroker(deleteIndex);
}
protected Session getSession(int ackMode) throws Exception {
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
connection.setClientID("clientId1");
connection.start();
Session session = connection.createSession(false, ackMode);
return session;
}
/**
* Test that on broker restart a durable topic subscription will recover all
* messages before the "last ack" in KahaDB which could happen if using
* individual acknowledge mode and skipping messages
*/
@Test
public void durableRecoveryIndividualAcknowledge() throws Exception {
String testTopic = "test.topic";
Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
MessageProducer producer = session.createProducer(topic);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1");
for (int i = 1; i <= 10; i++) {
producer.send(session.createTextMessage("msg: " + i));
}
producer.close();
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
// Receive only the 5th message using individual ack mode
for (int i = 1; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber.receive(1000);
assertNotNull(received);
if (i == 5) {
received.acknowledge();
}
}
// Verify there are 9 messages left still and restart broker
assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
subscriber.close();
restartBroker(recoverIndex);
// Verify 9 messages exist in store on startup
assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
// Recreate subscriber and try and receive the other 9 messages
session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
subscriber = session.createDurableSubscriber(topic, "sub1");
for (int i = 1; i <= 4; i++) {
TextMessage received = (TextMessage) subscriber.receive(1000);
assertNotNull(received);
assertEquals("msg: " + i, received.getText());
}
for (int i = 6; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber.receive(1000);
assertNotNull(received);
assertEquals("msg: " + i, received.getText());
}
subscriber.close();
assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
}
@Test
public void multipleDurableRecoveryIndividualAcknowledge() throws Exception {
String testTopic = "test.topic";
Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
MessageProducer producer = session.createProducer(topic);
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "sub1");
TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "sub2");
for (int i = 1; i <= 10; i++) {
producer.send(session.createTextMessage("msg: " + i));
}
producer.close();
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
// Receive 2 messages using individual ack mode only on first sub
for (int i = 1; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber1.receive(1000);
assertNotNull(received);
if (i == 3 || i == 7) {
received.acknowledge();
}
}
// Verify there are 8 messages left still and restart broker
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
subscriber1.close();
subscriber2.close();
restartBroker(recoverIndex);
// Verify 8 messages exist in store on startup on sub 1 and 10 on sub 2
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
// Recreate subscriber and try and receive the other 8 messages
session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
subscriber1 = session.createDurableSubscriber(topic, "sub1");
subscriber2 = session.createDurableSubscriber(topic, "sub2");
for (int i = 1; i <= 2; i++) {
TextMessage received = (TextMessage) subscriber1.receive(1000);
assertNotNull(received);
assertEquals("msg: " + i, received.getText());
}
for (int i = 4; i <= 6; i++) {
TextMessage received = (TextMessage) subscriber1.receive(1000);
assertNotNull(received);
assertEquals("msg: " + i, received.getText());
}
for (int i = 8; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber1.receive(1000);
assertNotNull(received);
assertEquals("msg: " + i, received.getText());
}
// Make sure sub 2 gets all 10
for (int i = 1; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber2.receive(1000);
assertNotNull(received);
assertEquals("msg: " + i, received.getText());
}
subscriber1.close();
subscriber2.close();
assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
}
@Test
public void multipleDurableTestRecoverSubscription() throws Exception {
String testTopic = "test.topic";
Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
MessageProducer producer = session.createProducer(topic);
TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "sub1");
TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "sub2");
for (int i = 1; i <= 10; i++) {
producer.send(session.createTextMessage("msg: " + i));
}
producer.close();
// Receive 2 messages using individual ack mode only on first sub
for (int i = 1; i <= 10; i++) {
TextMessage received = (TextMessage) subscriber1.receive(1000);
assertNotNull(received);
if (i == 3 || i == 7) {
received.acknowledge();
}
}
// Verify there are 8 messages left on sub 1 and 10 on sub2 and restart
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
subscriber1.close();
subscriber2.close();
restartBroker(recoverIndex);
//Manually recover subscription and verify proper messages are loaded
final Topic brokerTopic = (Topic) broker.getDestination(topic);
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
final AtomicInteger sub1Recovered = new AtomicInteger();
final AtomicInteger sub2Recovered = new AtomicInteger();
store.recoverSubscription("clientId1", "sub1", new MessageRecoveryListener() {
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return false;
}
@Override
public boolean recoverMessage(Message message) throws Exception {
TextMessage textMessage = (TextMessage) message;
if (textMessage.getText().equals("msg: " + 3) || textMessage.getText().equals("msg: " + 7)) {
throw new IllegalStateException("Got wrong message: " + textMessage.getText());
}
sub1Recovered.incrementAndGet();
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean hasSpace() {
return true;
}
});
store.recoverSubscription("clientId1", "sub2", new MessageRecoveryListener() {
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return false;
}
@Override
public boolean recoverMessage(Message message) throws Exception {
sub2Recovered.incrementAndGet();
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean hasSpace() {
return true;
}
});
//Verify proper number of messages are recovered
assertEquals(8, sub1Recovered.get());
assertEquals(10, sub2Recovered.get());
}
protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
final Topic brokerTopic = (Topic) broker.getDestination(topic);
final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
return store.getMessageCount(clientId, subId);
}
}