better deal with journal corruption for scheduler store to report error location - intermittent failure of KahaDBSchedulerIndexRebuildTest

This commit is contained in:
gtully 2015-05-27 12:25:46 +01:00
parent 17bcf43048
commit 0a21c5f8ff
2 changed files with 16 additions and 6 deletions

View File

@ -614,7 +614,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// track this dud location
journal.corruptRecoveryLocation(recoveryPosition);
} else {
throw failedRecovery;
throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
}
}
recoveryPosition = journal.getNextLocation(recoveryPosition);

View File

@ -753,12 +753,22 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
if (recoveryPosition != null) {
int redoCounter = 0;
LOG.info("Recovering from the journal ...");
LOG.info("Recovering from the scheduled job journal @" + recoveryPosition);
while (recoveryPosition != null) {
JournalCommand<?> message = load(recoveryPosition);
metaData.setLastUpdateLocation(recoveryPosition);
doRecover(message, recoveryPosition, lastIndoubtPosition);
redoCounter++;
try {
JournalCommand<?> message = load(recoveryPosition);
metaData.setLastUpdateLocation(recoveryPosition);
doRecover(message, recoveryPosition, lastIndoubtPosition);
redoCounter++;
} catch (IOException failedRecovery) {
if (isIgnoreMissingJournalfiles()) {
LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
// track this dud location
journal.corruptRecoveryLocation(recoveryPosition);
} else {
throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
}
}
recoveryPosition = journal.getNextLocation(recoveryPosition);
if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter);