diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index a39496dc10..459bcba854 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -18,11 +18,12 @@ package org.apache.activemq.store.kahadb; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; +import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.Destination; @@ -34,8 +35,11 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.RecoverableRandomAccessFile; @@ -222,6 +226,7 @@ public class JournalCorruptionEofIndexRecoveryTest { size -= 1; LOG.info("rewrite incorrect location size @:" + (pos + Journal.BATCH_CONTROL_RECORD_SIZE) + " as: " + size); randomAccessFile.writeInt(size); + corruptOrderIndex(id, size); randomAccessFile.getChannel().force(true); } @@ -241,6 +246,37 @@ public class JournalCorruptionEofIndexRecoveryTest { randomAccessFile.getChannel().force(true); } + private void corruptOrderIndex(final int num, final int size) throws Exception { + //This is because of AMQ-6097, now that the MessageOrderIndex stores the size in the Location, + //we need to corrupt that value as well + final KahaDBStore kahaDbStore = (KahaDBStore) ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); + kahaDbStore.indexLock.writeLock().lock(); + try { + kahaDbStore.pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert( + (ActiveMQQueue)destination), tx); + int i = 1; + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + if (i == num) { + //change the size value to the wrong size + sd.orderIndex.get(tx, entry.getKey()); + MessageKeys messageKeys = entry.getValue(); + messageKeys.location.setSize(size); + sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys); + break; + } + i++; + } + } + }); + } finally { + kahaDbStore.indexLock.writeLock().unlock(); + } + } + private ArrayList findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException { final ArrayList batchPositions = new ArrayList(); final ByteSequence header = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER);