[AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay

(cherry picked from commit a359d8152cfee6f2fe95d34fd1b2296f6ed2670c)
This commit is contained in:
gtully 2017-09-15 13:48:03 +01:00 committed by Timothy Bish
parent 9387451412
commit d894d570d4
2 changed files with 59 additions and 25 deletions

View File

@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Location afterProducerAudit = recoverProducerAudit(); boolean requiresJournalReplay = recoverProducerAudit();
Location afterAckMessageFile = recoverAckMessageFileMap(); requiresJournalReplay |= recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition(); Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
// valid checkpoint, possible recover from afterAckMessageFile
afterProducerAudit = null;
}
Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) { if (recoveryPosition != null) {
int redoCounter = 0; int redoCounter = 0;
int dataFileRotationTracker = recoveryPosition.getDataFileId(); int dataFileRotationTracker = recoveryPosition.getDataFileId();
@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return min; return min;
} }
private Location recoverProducerAudit() throws IOException { private boolean recoverProducerAudit() throws IOException {
boolean requiresReplay = true;
if (metadata.producerSequenceIdTrackerLocation != null) { if (metadata.producerSequenceIdTrackerLocation != null) {
try { try {
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); requiresReplay = false;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Cannot recover message audit", e); LOG.warn("Cannot recover message audit", e);
return journal.getNextLocation(null);
} }
} else {
// got no audit stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null);
} }
// got no audit stored so got to recreate via replay from start of the journal
return requiresReplay;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Location recoverAckMessageFileMap() throws IOException { private boolean recoverAckMessageFileMap() throws IOException {
boolean requiresReplay = true;
if (metadata.ackMessageFileMapLocation != null) { if (metadata.ackMessageFileMapLocation != null) {
try { try {
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
return getNextInitializedLocation(metadata.ackMessageFileMapLocation); requiresReplay = false;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e); LOG.warn("Cannot recover ackMessageFileMap", e);
return journal.getNextLocation(null);
} }
} else {
// got no ackMessageFileMap stored so got to recreate via replay from start of the journal
return journal.getNextLocation(null);
} }
// got no ackMessageFileMap stored so got to recreate via replay from start of the journal
return requiresReplay;
} }
protected void recoverIndex(Transaction tx) throws IOException { protected void recoverIndex(Transaction tx) throws IOException {

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import java.io.File; import org.apache.activemq.broker.BrokerService;
import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterTestSupport; import org.apache.activemq.store.PersistenceAdapterTestSupport;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport
} }
return kaha; return kaha;
} }
public void testNoReplayOnStop() throws Exception {
brokerService.getPersistenceAdapter().checkpoint(true);
brokerService.stop();
final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE);
final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE);
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
trappedLogMessages.set(true);
if (event.getLevel().equals(Level.INFO)) {
if (event.getMessage().toString().contains("Recovery replayed ")) {
gotSomeReplay.set(true);
}
}
}
};
try {
Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender);
Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO);
brokerService = new BrokerService();
pa = createPersistenceAdapter(false);
brokerService.setPersistenceAdapter(pa);
brokerService.start();
} finally {
Logger.getRootLogger().removeAppender(appender);
Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender);
}
assertTrue("log capture working", trappedLogMessages.get());
assertFalse("no replay message in the log", gotSomeReplay.get());
}
} }