Propertly re-setting the storeOpenWireVersion from the BrokerService
on the KahaDB Metadata if a corrupted index is detected and the
Metadata has to be recreated.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-12-08 20:10:11 +00:00
parent 5772e7bed8
commit 7a7c70ad75
3 changed files with 81 additions and 1 deletions

View File

@ -66,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@ -192,8 +193,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.maxAsyncJobs = maxAsyncJobs;
}
@Override
public void doStart() throws Exception {
protected void configureMetadata() {
if (brokerService != null) {
metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
wireFormat.setVersion(metadata.openwireVersion);
@ -203,6 +205,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
}
@Override
public void doStart() throws Exception {
//configure the metadata before start, right now
//this is just the open wire version
configureMetadata();
super.doStart();
if (brokerService != null) {

View File

@ -399,6 +399,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
pageFile.delete();
}
metadata = createMetadata();
//The metadata was recreated after a detect corruption so we need to
//reconfigure anything that was configured on the old metadata on startup
configureMetadata();
pageFile = null;
loadPageFile();
}
@ -2727,6 +2730,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return md;
}
protected abstract void configureMetadata();
public int getJournalMaxWriteBatchSize() {
return journalMaxWriteBatchSize;
}

View File

@ -20,6 +20,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map.Entry;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -34,7 +39,15 @@ import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -61,6 +74,8 @@ public class KahaDBStoreOpenWireVersionTest {
broker.setAdvisorySupport(false);
broker.setDataDirectory(storeDir);
broker.setStoreOpenWireVersion(storeOpenWireVersion);
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruptJournalFiles(true);
broker.start();
broker.waitUntilStarted();
@ -119,6 +134,56 @@ public class KahaDBStoreOpenWireVersionTest {
doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
}
/**
* This test shows that a corrupted index/rebuild will still
* honor the storeOpenWireVersion set on the BrokerService.
* This wasn't the case before AMQ-6082
*/
@Test(timeout = 60000)
public void testStoreVersionCorrupt() throws Exception {
final int create = 6;
final int reload = 6;
createBroker(create);
populateStore();
//blow up the index so it has to be recreated
corruptIndex();
stopBroker();
createBroker(reload);
assertEquals(create, broker.getStoreOpenWireVersion());
assertStoreIsUsable();
}
private void corruptIndex() throws IOException {
KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
final PageFile pageFile = store.getPageFile();
final Metadata metadata = store.metadata;
//blow up the index
try {
store.indexLock.writeLock().lock();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
entry.getValue().orderIndex.nextMessageId = -100;
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
entry.getValue().messageReferences.clear();
}
}
});
} finally {
store.indexLock.writeLock().unlock();
}
}
private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception {
createBroker(create);
populateStore();