diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index df970d452a..755f214b83 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -204,7 +204,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe class MetadataMarshaller extends VariableMarshaller { @Override public Metadata readPayload(DataInput dataIn) throws IOException { - Metadata rc = new Metadata(); + Metadata rc = createMetadata(); rc.read(dataIn); return rc; } @@ -392,7 +392,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } else { pageFile.delete(); } - metadata = new Metadata(); + metadata = createMetadata(); pageFile = null; loadPageFile(); } @@ -430,7 +430,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointUpdate(true); } pageFile.unload(); - metadata = new Metadata(); + metadata = createMetadata(); } finally { checkpointLock.writeLock().unlock(); } @@ -687,7 +687,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); try { ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); + int maxNumProducers = getMaxFailoverProducersToTrack(); + int maxAuditDepth = getFailoverProducersAuditDepth(); metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); + metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); + metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); } catch (Exception e) { LOG.warn("Cannot recover message audit", e); @@ -2458,6 +2462,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return manager; } + private Metadata createMetadata() { + Metadata md = new Metadata(); + md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); + md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); + return md; + } + public int getJournalMaxWriteBatchSize() { return journalMaxWriteBatchSize; }