https://issues.apache.org/jira/browse/AMQ-6277 - tidy up logic that determines recovery location so that we don't recovery from the end of the journal in error on normal restart. This avoids suprious recovery logging

This commit is contained in:
gtully 2016-05-03 12:47:24 +01:00
parent 3dd86d04e8
commit ba77b9f55a
2 changed files with 51 additions and 9 deletions

View File

@ -629,8 +629,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Location ackMessageFileLocation = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation);
recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) {
int redoCounter = 0;
@ -711,16 +711,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return TransactionIdConversion.convertToLocal(tx);
}
private Location minimum(Location producerAuditPosition,
Location lastIndoubtPosition) {
private Location startOfRecovery(Location x,
Location y) {
Location min = null;
if (producerAuditPosition != null) {
min = producerAuditPosition;
if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
min = lastIndoubtPosition;
if (x != null) {
min = x;
if (y != null) {
int compare = y.compareTo(x);
if (compare < 0) {
min = y;
} else if (compare == 0) {
min = null; // no recovery needed on a matched location
}
}
} else {
min = lastIndoubtPosition;
min = y;
}
return min;
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
@ -31,6 +32,10 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
/**
* @author chirino
@ -193,6 +198,38 @@ public class KahaDBTest extends TestCase {
broker.stop();
}
public void testNoReplayOnStopStart() throws Exception {
KahaDBStore kaha = createStore(true);
BrokerService broker = createBroker(kaha);
sendMessages(100);
broker.stop();
broker.waitUntilStopped();
kaha = createStore(false);
kaha.setCheckForCorruptJournalFiles(true);
final AtomicBoolean didSomeRecovery = new AtomicBoolean(false);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel() == Level.INFO && event.getRenderedMessage().contains("Recovering from the journal @")) {
didSomeRecovery.set(true);
}
}
};
Logger.getRootLogger().addAppender(appender);
broker = createBroker(kaha);
int count = receiveMessages();
assertEquals("Expected to received all messages.", count, 100);
broker.stop();
Logger.getRootLogger().addAppender(appender);
assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
}
private void assertExistsAndDelete(File file) {
assertTrue(file.exists());
file.delete();