From d894d570d4254cc9bb6bca0ab6add2bafe2acc11 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 15 Sep 2017 13:48:03 +0100 Subject: [PATCH] [AMQ-6277] take account of producer audit not being updatated on recovery check, avoid unnecessary partial journal replay (cherry picked from commit a359d8152cfee6f2fe95d34fd1b2296f6ed2670c) --- .../store/kahadb/MessageDatabase.java | 35 +++++-------- .../kahadb/KahaDBPersistenceAdapterTest.java | 49 +++++++++++++++++-- 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 40e8f955e1..a6d3cc8db2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -671,17 +671,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { long start = System.currentTimeMillis(); - Location afterProducerAudit = recoverProducerAudit(); - Location afterAckMessageFile = recoverAckMessageFileMap(); + boolean requiresJournalReplay = recoverProducerAudit(); + requiresJournalReplay |= recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - - if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { - // valid checkpoint, possible recover from afterAckMessageFile - afterProducerAudit = null; - } - Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); - recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); - + Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition; if (recoveryPosition != null) { int redoCounter = 0; int dataFileRotationTracker = recoveryPosition.getDataFileId(); @@ -784,7 +777,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return min; } - private Location recoverProducerAudit() throws IOException { + private boolean recoverProducerAudit() throws IOException { + boolean requiresReplay = true; if (metadata.producerSequenceIdTrackerLocation != null) { try { KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); @@ -794,33 +788,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); - return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); + requiresReplay = false; } catch (Exception e) { LOG.warn("Cannot recover message audit", e); - return journal.getNextLocation(null); } - } else { - // got no audit stored so got to recreate via replay from start of the journal - return journal.getNextLocation(null); } + // got no audit stored so got to recreate via replay from start of the journal + return requiresReplay; } @SuppressWarnings("unchecked") - private Location recoverAckMessageFileMap() throws IOException { + private boolean recoverAckMessageFileMap() throws IOException { + boolean requiresReplay = true; if (metadata.ackMessageFileMapLocation != null) { try { KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map>) objectIn.readObject(); - return getNextInitializedLocation(metadata.ackMessageFileMapLocation); + requiresReplay = false; } catch (Exception e) { LOG.warn("Cannot recover ackMessageFileMap", e); - return journal.getNextLocation(null); } - } else { - // got no ackMessageFileMap stored so got to recreate via replay from start of the journal - return journal.getNextLocation(null); } + // got no ackMessageFileMap stored so got to recreate via replay from start of the journal + return requiresReplay; } protected void recoverIndex(Transaction tx) throws IOException { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java index c45c3e5a42..f509011480 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java @@ -16,11 +16,18 @@ */ package org.apache.activemq.store.kahadb; -import java.io.File; -import java.io.IOException; - +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterTestSupport; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -36,4 +43,40 @@ public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport } return kaha; } + + public void testNoReplayOnStop() throws Exception { + brokerService.getPersistenceAdapter().checkpoint(true); + brokerService.stop(); + + final AtomicBoolean gotSomeReplay = new AtomicBoolean(Boolean.FALSE); + final AtomicBoolean trappedLogMessages = new AtomicBoolean(Boolean.FALSE); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + trappedLogMessages.set(true); + if (event.getLevel().equals(Level.INFO)) { + if (event.getMessage().toString().contains("Recovery replayed ")) { + gotSomeReplay.set(true); + } + } + } + }; + + try { + Logger.getLogger(MessageDatabase.class.getName()).addAppender(appender); + Logger.getLogger(MessageDatabase.class.getName()).setLevel(Level.INFO); + + brokerService = new BrokerService(); + pa = createPersistenceAdapter(false); + brokerService.setPersistenceAdapter(pa); + brokerService.start(); + + } finally { + Logger.getRootLogger().removeAppender(appender); + Logger.getLogger(MessageDatabase.class.getName()).removeAppender(appender); + } + assertTrue("log capture working", trappedLogMessages.get()); + assertFalse("no replay message in the log", gotSomeReplay.get()); + } }