https://issues.apache.org/jira/browse/AMQ-6277 - journal getNextLocation needs too passes to skip past if target is not initialized

This commit is contained in:
gtully 2016-05-04 22:09:06 +01:00
parent a28a091c55
commit 1c4108545c
1 changed files with 25 additions and 11 deletions

View File

@ -111,6 +111,8 @@ import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
protected BrokerService brokerService;
@ -625,12 +627,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
long start = System.currentTimeMillis();
Location producerAuditPosition = recoverProducerAudit();
Location ackMessageFileLocation = recoverAckMessageFileMap();
Location afterProducerAudit = recoverProducerAudit();
Location afterAckMessageFile = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation);
recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition);
if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
// valid checkpoint, possible recover from afterAckMessageFile
afterProducerAudit = null;
}
Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) {
int redoCounter = 0;
@ -711,8 +717,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return TransactionIdConversion.convertToLocal(tx);
}
private Location startOfRecovery(Location x,
Location y) {
private Location minimum(Location x,
Location y) {
Location min = null;
if (x != null) {
min = x;
@ -720,8 +726,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int compare = y.compareTo(x);
if (compare < 0) {
min = y;
} else if (compare == 0) {
min = null; // no recovery needed on a matched location
}
}
} else {
@ -740,7 +744,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
} catch (Exception e) {
LOG.warn("Cannot recover message audit", e);
return journal.getNextLocation(null);
@ -758,7 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
return journal.getNextLocation(metadata.ackMessageFileMapLocation);
return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
} catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e);
return journal.getNextLocation(null);
@ -986,13 +990,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Perhaps there were no transactions...
if( metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(metadata.lastUpdate);
return getNextInitializedLocation(metadata.lastUpdate);
}
}
// This loads the first position.
return journal.getNextLocation(null);
}
private Location getNextInitializedLocation(Location location) throws IOException {
Location mayNotBeInitialized = journal.getNextLocation(location);
if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
// need to init size and type to skip
return journal.getNextLocation(mayNotBeInitialized);
} else {
return mayNotBeInitialized;
}
}
protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start;
this.indexLock.writeLock().lock();