https://issues.apache.org/jira/browse/AMQ-4907 - sanity check on the index when checkForCorruptJournalFiles - test and check that validates the orderindex makes sense

This commit is contained in:
gtully 2013-11-28 21:13:02 +00:00
parent 11781d3cf2
commit 06f24e2e0b
2 changed files with 23 additions and 2 deletions

View File

@ -303,6 +303,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
Entry<String, StoredDestination> entry = iterator.next();
StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
storedDestinations.put(entry.getKey(), sd);
if (checkForCorruptJournalFiles) {
// sanity check the index also
if (!entry.getValue().locationIndex.isEmpty(tx)) {
if (entry.getValue().orderIndex.nextMessageId <= 0) {
throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
}
}
}
}
}
});

View File

@ -46,7 +46,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
public static final String KAHADB_DIR_BASE = "target/activemq-data/kahadb";
public static String kahaDbDirectoryName;
enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt };
enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt, LoadOrderIndex0 };
public CorruptionType failTest = CorruptionType.None;
@Override
@ -71,6 +71,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(kahaDbDirectoryName));
kaha.deleteAllMessages();
kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0);
broker.setPersistenceAdapter(kaha);
return broker;
}
@ -100,6 +101,16 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
raf.seek(8*1024 + 57);
raf.writeLong(Integer.MAX_VALUE-10);
break;
case LoadOrderIndex0:
// loadable but invalid metadata
// location of order index default priority index size
// so looks like there are no ids in the order index
// picked up by setCheckForCorruptJournalFiles
raf.seek(12*1024 + 21);
raf.writeShort(0);
raf.writeChar(0);
raf.writeLong(-1);
break;
default:
}
raf.close();
@ -107,6 +118,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
// starting broker
BrokerService broker = new BrokerService();
KahaDBStore kaha = new KahaDBStore();
kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0);
// uncomment if you want to test archiving
//kaha.setArchiveCorruptedIndex(true);
kaha.setDirectory(new File(kahaDbDirectoryName));
@ -123,7 +135,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
}
public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() {
this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt} );
this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt, CorruptionType.LoadOrderIndex0} );
}
public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {