diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 3e0968e62d..fa4672b405 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -66,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -192,8 +193,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.maxAsyncJobs = maxAsyncJobs; } + @Override - public void doStart() throws Exception { + protected void configureMetadata() { if (brokerService != null) { metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); wireFormat.setVersion(metadata.openwireVersion); @@ -203,6 +205,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } + } + + @Override + public void doStart() throws Exception { + //configure the metadata before start, right now + //this is just the open wire version + configureMetadata(); + super.doStart(); if (brokerService != null) { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index cd9067d2d4..936e047cad 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -399,6 +399,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe pageFile.delete(); } metadata = createMetadata(); + //The metadata was recreated after a detect corruption so we need to + //reconfigure anything that was configured on the old metadata on startup + configureMetadata(); pageFile = null; loadPageFile(); } @@ -2727,6 +2730,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return md; } + protected abstract void configureMetadata(); + public int getJournalMaxWriteBatchSize() { return journalMaxWriteBatchSize; } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java index 5b272dbc53..e59767a3d4 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java @@ -20,6 +20,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -34,7 +39,15 @@ import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.RecoverableRandomAccessFile; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -61,6 +74,8 @@ public class KahaDBStoreOpenWireVersionTest { broker.setAdvisorySupport(false); broker.setDataDirectory(storeDir); broker.setStoreOpenWireVersion(storeOpenWireVersion); + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruptJournalFiles(true); + broker.start(); broker.waitUntilStarted(); @@ -119,6 +134,56 @@ public class KahaDBStoreOpenWireVersionTest { doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION); } + /** + * This test shows that a corrupted index/rebuild will still + * honor the storeOpenWireVersion set on the BrokerService. + * This wasn't the case before AMQ-6082 + */ + @Test(timeout = 60000) + public void testStoreVersionCorrupt() throws Exception { + final int create = 6; + final int reload = 6; + + createBroker(create); + populateStore(); + + //blow up the index so it has to be recreated + corruptIndex(); + stopBroker(); + + createBroker(reload); + assertEquals(create, broker.getStoreOpenWireVersion()); + assertStoreIsUsable(); + } + + + private void corruptIndex() throws IOException { + KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); + final PageFile pageFile = store.getPageFile(); + final Metadata metadata = store.metadata; + + //blow up the index + try { + store.indexLock.writeLock().lock(); + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + entry.getValue().orderIndex.nextMessageId = -100; + entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); + entry.getValue().orderIndex.lowPriorityIndex.clear(tx); + entry.getValue().orderIndex.highPriorityIndex.clear(tx); + entry.getValue().messageReferences.clear(); + } + } + }); + } finally { + store.indexLock.writeLock().unlock(); + } + } + private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception { createBroker(create); populateStore();