[AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay

This commit is contained in:
gtully 2017-09-15 13:48:03 +01:00
parent ed395d1a85
commit a359d8152c
2 changed files with 59 additions and 25 deletions

View File

@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
long start = System.currentTimeMillis();
Location afterProducerAudit = recoverProducerAudit();
Location afterAckMessageFile = recoverAckMessageFileMap();
boolean requiresJournalReplay = recoverProducerAudit();
requiresJournalReplay |= recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
// valid checkpoint, possible recover from afterAckMessageFile
afterProducerAudit = null;
}
Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
if (recoveryPosition != null) {
int redoCounter = 0;
int dataFileRotationTracker = recoveryPosition.getDataFileId();
@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return min;
}
private Location recoverProducerAudit() throws IOException {
private boolean recoverProducerAudit() throws IOException {
boolean requiresReplay = true;
if (metadata.producerSequenceIdTrackerLocation != null) {
try {
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
requiresReplay = false;
} catch (Exception e) {
LOG.warn("Cannot recover message audit", e);
return journal.getNextLocation(null);
}
} else {
// got no audit stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null);
}
// got no audit stored so got to recreate via replay from start of the journal
return requiresReplay;
}
@SuppressWarnings("unchecked")
private Location recoverAckMessageFileMap() throws IOException {
private boolean recoverAckMessageFileMap() throws IOException {
boolean requiresReplay = true;
if (metadata.ackMessageFileMapLocation != null) {
try {
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
requiresReplay = false;
} catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e);
return journal.getNextLocation(null);
}
} else {
// got no ackMessageFileMap stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null);
}
// got no ackMessageFileMap stored so got to recreate via replay from start of the journal
return requiresReplay;
}
protected void recoverIndex(Transaction tx) throws IOException {

View File

@ -16,11 +16,18 @@
*/
package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterTestSupport;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport
}
return kaha;
}
public void testNoReplayOnStop() throws Exception {
brokerService.getPersistenceAdapter().checkpoint(true);
brokerService.stop();
final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE);
final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE);
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
trappedLogMessages.set(true);
if (event.getLevel().equals(Level.INFO)) {
if (event.getMessage().toString().contains("Recovery replayed ")) {
gotSomeReplay.set(true);
}
}
}
};
try {
Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender);
Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO);
brokerService = new BrokerService();
pa = createPersistenceAdapter(false);
brokerService.setPersistenceAdapter(pa);
brokerService.start();
} finally {
Logger.getRootLogger().removeAppender(appender);
Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender);
}
assertTrue("log capture working", trappedLogMessages.get());
assertFalse("no replay message in the log", gotSomeReplay.get());
}
}