mirror of https://github.com/apache/activemq.git
Apply fix for tracker settings.
This commit is contained in:
parent
810ce35f55
commit
bfb1778211
|
@ -204,7 +204,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
class MetadataMarshaller extends VariableMarshaller<Metadata> {
|
class MetadataMarshaller extends VariableMarshaller<Metadata> {
|
||||||
@Override
|
@Override
|
||||||
public Metadata readPayload(DataInput dataIn) throws IOException {
|
public Metadata readPayload(DataInput dataIn) throws IOException {
|
||||||
Metadata rc = new Metadata();
|
Metadata rc = createMetadata();
|
||||||
rc.read(dataIn);
|
rc.read(dataIn);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
@ -392,7 +392,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
} else {
|
} else {
|
||||||
pageFile.delete();
|
pageFile.delete();
|
||||||
}
|
}
|
||||||
metadata = new Metadata();
|
metadata = createMetadata();
|
||||||
pageFile = null;
|
pageFile = null;
|
||||||
loadPageFile();
|
loadPageFile();
|
||||||
}
|
}
|
||||||
|
@ -430,7 +430,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
checkpointUpdate(true);
|
checkpointUpdate(true);
|
||||||
}
|
}
|
||||||
pageFile.unload();
|
pageFile.unload();
|
||||||
metadata = new Metadata();
|
metadata = createMetadata();
|
||||||
} finally {
|
} finally {
|
||||||
checkpointLock.writeLock().unlock();
|
checkpointLock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
|
@ -687,7 +687,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
|
KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
|
||||||
try {
|
try {
|
||||||
ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
|
ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
|
||||||
|
int maxNumProducers = getMaxFailoverProducersToTrack();
|
||||||
|
int maxAuditDepth = getFailoverProducersAuditDepth();
|
||||||
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
|
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
|
||||||
|
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
|
||||||
|
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
|
||||||
return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
|
return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Cannot recover message audit", e);
|
LOG.warn("Cannot recover message audit", e);
|
||||||
|
@ -2458,6 +2462,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
return manager;
|
return manager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Metadata createMetadata() {
|
||||||
|
Metadata md = new Metadata();
|
||||||
|
md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
|
||||||
|
md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
|
||||||
|
return md;
|
||||||
|
}
|
||||||
|
|
||||||
public int getJournalMaxWriteBatchSize() {
|
public int getJournalMaxWriteBatchSize() {
|
||||||
return journalMaxWriteBatchSize;
|
return journalMaxWriteBatchSize;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue