diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index ef806e3c45..eb3a5ee8f9 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -751,8 +751,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private Location recoverProducerAudit() throws IOException { if (metadata.producerSequenceIdTrackerLocation != null) { - KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); try { + KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); int maxNumProducers = getMaxFailoverProducersToTrack(); int maxAuditDepth = getFailoverProducersAuditDepth(); @@ -773,8 +773,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @SuppressWarnings("unchecked") private Location recoverAckMessageFileMap() throws IOException { if (metadata.ackMessageFileMapLocation != null) { - KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); try { + KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map>) objectIn.readObject(); return getNextInitializedLocation(metadata.ackMessageFileMapLocation); @@ -3228,6 +3228,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return journal; } + protected Metadata getMetadata() { + return metadata; + } + public boolean isFailIfDatabaseIsLocked() { return failIfDatabaseIsLocked; } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index de68cf0d44..71c2195c78 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -89,7 +89,7 @@ final class DataFileAccessor { return new ByteSequence(data, 0, data.length); } catch (RuntimeException e) { - throw new IOException("Invalid location: " + location + ", : " + e, e); + throw new IOException("Invalid location: " + location + " : " + e, e); } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 6cffe3ddb0..221b0875ec 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Map.Entry; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -42,6 +43,7 @@ import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys; import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; @@ -232,6 +234,48 @@ public class JournalCorruptionEofIndexRecoveryTest { assertEquals("Drain", numToSend, drainQueue(numToSend)); } + + @Test + public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception { + doTestRecoveryAfterLocationCorrupt(false); + } + + @Test + public void testRecoveryAfterAckMapLocationCorrupt() throws Exception { + doTestRecoveryAfterLocationCorrupt(true); + } + + private void doTestRecoveryAfterLocationCorrupt(boolean aOrB) throws Exception { + startBroker(); + + produceMessagesToConsumeMultipleDataFiles(50); + + int numFiles = getNumberOfJournalFiles(); + + assertTrue("more than x files: " + numFiles, numFiles > 4); + + KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); + store.checkpointCleanup(true); + Location toCorrupt = aOrB ? store.getMetadata().ackMessageFileMapLocation : store.getMetadata().producerSequenceIdTrackerLocation; + corruptLocation(toCorrupt); + + restartBroker(false, false); + + assertEquals("missing no message", 50, broker.getAdminView().getTotalMessageCount()); + assertEquals("Drain", 50, drainQueue(50)); + } + + private void corruptLocation(Location toCorrupt) throws IOException { + + DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(new Integer(toCorrupt.getDataFileId())); + + RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile(); + + randomAccessFile.seek(toCorrupt.getOffset()); + randomAccessFile.writeInt(3); + dataFile.closeRandomAccessFile(randomAccessFile); + } + private void corruptBatchCheckSumSplash(int id) throws Exception{ Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); @@ -367,19 +411,30 @@ public class JournalCorruptionEofIndexRecoveryTest { } private int drainQueue(int max) throws Exception { + return drain(cf, destination, max); + } + + public static int drain(ConnectionFactory cf, Destination destination, int max) throws Exception { Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); + MessageConsumer consumer = null; int count = 0; try { - while (count < max && consumer.receive(5000) != null) { + consumer = session.createConsumer(destination); + while (count < max && consumer.receive(4000) != null) { count++; } } catch (JMSException ok) { } finally { - consumer.close(); - connection.close(); + if (consumer != null) { + try { + consumer.close(); + } catch (JMSException ok) {} + } + try { + connection.close(); + } catch (JMSException ok) {} } return count; } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java index 2e34686060..325357da30 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.store.kahadb; +import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -36,6 +37,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.RecoverableRandomAccessFile; import org.junit.After; @@ -265,16 +267,7 @@ public class JournalCorruptionIndexRecoveryTest { } private int drainQueue(int max) throws Exception { - Connection connection = cf.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - int count = 0; - while (count < max && consumer.receive(5000) != null) { - count++; - } - consumer.close(); - connection.close(); - return count; + return drain(cf, destination, max); } + } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java index 2d398e2ed1..4121e49b95 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java @@ -43,6 +43,7 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -247,26 +248,8 @@ public class JournalFdRecoveryTest { } private int tryConsume(Destination destination, int numToGet) throws Exception { - int got = 0; - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); - connection.start(); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - for (int i = 0; i < numToGet; i++) { - if (consumer.receive(4000) == null) { - // give up on timeout or error - break; - } - got++; - - } - } catch (JMSException ok) { - } finally { - connection.close(); - } - - return got; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); + return drain(cf, destination, numToGet); } private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {