diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 0c0f9454ea..a437d4de56 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -53,8 +53,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public final synchronized void start() throws Exception{ if (!isStarted()) { - super.start(); clear(); + super.start(); resetBatch(); this.size = getStoreSize(); this.storeHasMessages=this.size > 0; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index e88aabc31a..7df3b30653 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -928,7 +928,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { ActiveMQDestination dest = convert(entry.getKey()); if (dest.isTopic()) { StoredDestination loadedStore = getStoredDestination(convert(dest), tx); - if (loadedStore.ackPositions.isEmpty()) { + if (loadedStore.ackPositions.isEmpty(tx)) { isEmptyTopic = true; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index bdfdfd087b..246c369324 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.store.kahadb; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; @@ -100,7 +101,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar static final long NOT_ACKED = -1; static final long UNMATCHED_SEQ = -2; - static final int VERSION = 2; + static final int VERSION = 3; protected class Metadata { @@ -165,9 +166,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } else { os.writeBoolean(false); } - if (version > 1) { - os.writeInt(version); - } + os.writeInt(VERSION); } } @@ -255,8 +254,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar metadata.destinations.load(tx); } }); - pageFile.flush(); - // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. // Perhaps we should just keep an index of file storedDestinations.clear(); @@ -269,6 +266,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } }); + pageFile.flush(); }finally { this.indexLock.writeLock().unlock(); } @@ -985,7 +983,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // Skip adding the message to the index if this is a topic and there are // no subscriptions. - if (sd.subscriptions != null && sd.ackPositions.isEmpty()) { + if (sd.subscriptions != null && sd.ackPositions.isEmpty(tx)) { return; } @@ -1055,18 +1053,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore); - - if (ackSequenceToStore != sequence) { - // unmatched, need to add ack locations for the intermediate sequences - for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) { - addAckLocation(sd, matchedGapSequence, subscriptionKey); + if (prev != null) { + if (ackSequenceToStore != sequence) { + // unmatched, need to add ack locations for the intermediate sequences + for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) { + addAckLocation(tx, sd, matchedGapSequence, subscriptionKey); + } } + // The following method handles deleting un-referenced messages. + removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev)); } - // The following method handles deleting un-referenced messages. - removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev)); // Add it to the new location set. - addAckLocation(sd, sequence, subscriptionKey); + addAckLocation(tx, sd, sequence, subscriptionKey); } } @@ -1107,6 +1106,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar sd.subscriptionAcks.clear(tx); sd.subscriptionAcks.unload(tx); tx.free(sd.subscriptionAcks.getPageId()); + + sd.ackPositions.clear(tx); + sd.ackPositions.unload(tx); + tx.free(sd.ackPositions.getPageId()); } String key = key(command.getDestination()); @@ -1127,7 +1130,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation); - addAckLocation(sd, ackLocation, subscriptionKey); + addAckLocation(tx, sd, ackLocation, subscriptionKey); } else { // delete the sub... String subscriptionKey = command.getSubscriptionKey(); @@ -1326,13 +1329,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar BTreeIndex subscriptions; BTreeIndex subscriptionAcks; HashMap subscriptionCursors; - TreeMap> ackPositions; + BTreeIndex> ackPositions; } protected class StoredDestinationMarshaller extends VariableMarshaller { public StoredDestination readPayload(DataInput dataIn) throws IOException { - StoredDestination value = new StoredDestination(); + final StoredDestination value = new StoredDestination(); value.orderIndex.defaultPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); value.locationIndex = new BTreeIndex(pageFile, dataIn.readLong()); value.messageIdIndex = new BTreeIndex(pageFile, dataIn.readLong()); @@ -1340,11 +1343,40 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (dataIn.readBoolean()) { value.subscriptions = new BTreeIndex(pageFile, dataIn.readLong()); value.subscriptionAcks = new BTreeIndex(pageFile, dataIn.readLong()); + if (metadata.version >= 3) { + value.ackPositions = new BTreeIndex>(pageFile, dataIn.readLong()); + } else { + // upgrade + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + value.ackPositions = new BTreeIndex>(pageFile, tx.allocate()); + value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); + value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); + value.ackPositions.load(tx); + } + }); + } } if (metadata.version >= 2) { value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); + } else { + // upgrade + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.load(tx); + + value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.load(tx); + } + }); } + return value; } @@ -1356,13 +1388,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar dataOut.writeBoolean(true); dataOut.writeLong(value.subscriptions.getPageId()); dataOut.writeLong(value.subscriptionAcks.getPageId()); + dataOut.writeLong(value.ackPositions.getPageId()); } else { dataOut.writeBoolean(false); } - if (metadata.version >= 2) { - dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); - dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); - } + dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); + dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); } } @@ -1452,6 +1483,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (topic) { rc.subscriptions = new BTreeIndex(pageFile, tx.allocate()); rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); + rc.ackPositions = new BTreeIndex>(pageFile, tx.allocate()); } metadata.destinations.put(tx, key, rc); } @@ -1481,18 +1513,24 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE); rc.subscriptionAcks.load(tx); - rc.ackPositions = new TreeMap>(); + rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); + rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); + rc.ackPositions.load(tx); + rc.subscriptionCursors = new HashMap(); - for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - addAckLocation(rc, extractSequenceId(entry.getValue()), entry.getKey()); + if (metadata.version < 3) { + // on upgrade need to fill ackLocation + for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + addAckLocation(tx, rc, extractSequenceId(entry.getValue()), entry.getKey()); + } } if (rc.orderIndex.nextMessageId == 0) { // check for existing durable sub all acked out - pull next seq from acks as messages are gone - if (!rc.ackPositions.isEmpty()) { - Long lastAckedMessageId = rc.ackPositions.lastKey(); + if (!rc.ackPositions.isEmpty(tx)) { + Long lastAckedMessageId = rc.ackPositions.getLast(tx).getKey(); if (lastAckedMessageId != NOT_ACKED) { rc.orderIndex.nextMessageId = lastAckedMessageId+1; } @@ -1500,6 +1538,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } + + if (metadata.version < 3) { + // store again after upgrade + metadata.destinations.put(tx, key, rc); + } return rc; } @@ -1508,13 +1551,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar * @param messageSequence * @param subscriptionKey */ - private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) { - HashSet hs = sd.ackPositions.get(messageSequence); + private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { + HashSet hs = sd.ackPositions.get(tx, messageSequence); if (hs == null) { hs = new HashSet(); - sd.ackPositions.put(messageSequence, hs); } hs.add(subscriptionKey); + // every ack location addition needs to be a btree modification to get it stored + sd.ackPositions.put(tx, messageSequence, hs); } /** @@ -1527,12 +1571,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { // Remove the sub from the previous location set.. if (sequenceId != null) { - HashSet hs = sd.ackPositions.get(sequenceId); + HashSet hs = sd.ackPositions.get(tx, sequenceId); if (hs != null) { hs.remove(subscriptionKey); if (hs.isEmpty()) { - HashSet firstSet = sd.ackPositions.values().iterator().next(); - sd.ackPositions.remove(sequenceId); + HashSet firstSet = sd.ackPositions.getFirst(tx).getValue(); + sd.ackPositions.remove(tx, sequenceId); // Did we just empty out the first set in the // ordered list of ack locations? Then it's time to @@ -1942,15 +1986,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); defaultPriorityIndex.load(tx); - if (metadata.version >= 2) { - lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); - lowPriorityIndex.load(tx); - - highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); - highPriorityIndex.load(tx); - } + lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + lowPriorityIndex.load(tx); + highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + highPriorityIndex.load(tx); } void allocate(Transaction tx) throws IOException { @@ -2193,6 +2234,33 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - - + private static class HashSetStringMarshaller extends VariableMarshaller> { + final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); + + public void writePayload(HashSet object, DataOutput dataOut) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oout = new ObjectOutputStream(baos); + oout.writeObject(object); + oout.flush(); + oout.close(); + byte[] data = baos.toByteArray(); + dataOut.writeInt(data.length); + dataOut.write(data); + } + + public HashSet readPayload(DataInput dataIn) throws IOException { + int dataLen = dataIn.readInt(); + byte[] data = new byte[dataLen]; + dataIn.readFully(data); + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream oin = new ObjectInputStream(bais); + try { + return (HashSet) oin.readObject(); + } catch (ClassNotFoundException cfe) { + IOException ioe = new IOException("Failed to read HashSet: " + cfe); + ioe.initCause(cfe); + throw ioe; + } + } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java index 8a55333a20..ec9348d8d8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java @@ -34,6 +34,7 @@ import java.io.FileNotFoundException; public class KahaDBVersionTest extends TestCase { final static File VERSION_1_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); + final static File VERSION_2_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2"); protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception { BrokerService broker = new BrokerService(); @@ -47,21 +48,28 @@ public class KahaDBVersionTest extends TestCase { public void XtestCreateStore() throws Exception { KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); + File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX"); IOHelper.deleteFile(dir); kaha.setDirectory(dir); kaha.setJournalMaxFileLength(1024*1024); BrokerService broker = createBroker(kaha); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = cf.createConnection(); connection.setClientID("test"); connection.start(); + producerSomeMessages(connection); + connection.close(); + broker.stop(); + } + + private void producerSomeMessages(Connection connection) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic"); Queue queue = session.createQueue("test.queue"); MessageConsumer consumer = session.createDurableSubscriber(topic,"test"); consumer.close(); MessageProducer producer = session.createProducer(topic); + producer.setPriority(9); for (int i =0; i < 1000; i++) { Message msg = session.createTextMessage("test message:"+i); producer.send(msg); @@ -71,45 +79,56 @@ public class KahaDBVersionTest extends TestCase { Message msg = session.createTextMessage("test message:"+i); producer.send(msg); } - connection.stop(); - broker.stop(); } - - public void testVersionConversion() throws Exception{ + + public void testVersion1Conversion() throws Exception{ + doConvertRestartCycle(VERSION_1_DB); + } + + public void testVersion2Conversion() throws Exception{ + doConvertRestartCycle(VERSION_2_DB); + } + + public void doConvertRestartCycle(File existingStore) throws Exception { + File testDir = new File("target/activemq-data/kahadb/versionDB"); IOHelper.deleteFile(testDir); - IOHelper.copyFile(VERSION_1_DB, testDir); - - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setDirectory(testDir); - kaha.setJournalMaxFileLength(1024*1024); - BrokerService broker = createBroker(kaha); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.setClientID("test"); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("test.topic"); - Queue queue = session.createQueue("test.queue"); - MessageConsumer queueConsumer = session.createConsumer(queue); - for (int i = 0; i < 1000; i++) { - TextMessage msg = (TextMessage) queueConsumer.receive(10000); - //System.err.println(msg.getText()); - assertNotNull(msg); + IOHelper.copyFile(existingStore, testDir); + + // on repeat store will be upgraded + for (int repeats = 0; repeats < 3; repeats++) { + KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); + kaha.setDirectory(testDir); + kaha.setJournalMaxFileLength(1024 * 1024); + BrokerService broker = createBroker(kaha); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.setClientID("test"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("test.topic"); + Queue queue = session.createQueue("test.queue"); + + if (repeats > 0) { + // upgraded store will be empty so generated some more messages + producerSomeMessages(connection); + } + + MessageConsumer queueConsumer = session.createConsumer(queue); + for (int i = 0; i < 1000; i++) { + TextMessage msg = (TextMessage) queueConsumer.receive(10000); + //System.err.println(msg.getText()); + assertNotNull(msg); + } + MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test"); + for (int i = 0; i < 1000; i++) { + TextMessage msg = (TextMessage) topicConsumer.receive(10000); + //System.err.println(msg.getText()); + assertNotNull(msg); + } + connection.close(); + + broker.stop(); } - MessageConsumer topicConsumer = session.createDurableSubscriber(topic,"test"); - for (int i = 0; i < 1000; i++) { - TextMessage msg = (TextMessage) topicConsumer.receive(10000); - //System.err.println(msg.getText()); - assertNotNull(msg); - } - broker.stop(); - } - - - - - - } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 2fa4021af5..360ceb2475 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -617,6 +617,90 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(0, listener.count); } + public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { + // create offline subs 1 + Connection con = createConnection("offCli1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // create offline subs 2 + con = createConnection("offCli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int filtered = 0; + for (int i = 0; i < 10; i++) { + boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1; + if (filter) + filtered++; + + Message message = session.createMessage(); + message.setStringProperty("filter", filter ? "true" : "false"); + producer.send(topic, message); + } + + LOG.info("sent: " + filtered); + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + // restart broker + Thread.sleep(3 * 1000); + broker.stop(); + createBroker(false /*deleteAllMessages*/); + + // send more messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(null); + + for (int i = 0; i < 10; i++) { + boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1; + if (filter) + filtered++; + + Message message = session.createMessage(); + message.setStringProperty("filter", filter ? "true" : "false"); + producer.send(topic, message); + } + + LOG.info("after restart, sent: " + filtered); + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + // test offline subs + con = createConnection("offCli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + + Connection con3 = createConnection("offCli2"); + Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener3 = new Listener(); + consumer3.setMessageListener(listener3); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + session3.close(); + con3.close(); + + assertEquals(filtered, listener.count); + assertEquals(filtered, listener3.count); + } public static class Listener implements MessageListener { int count = 0; diff --git a/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log b/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log new file mode 100644 index 0000000000..82461ce08d Binary files /dev/null and b/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log differ diff --git a/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data b/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data new file mode 100644 index 0000000000..56275da605 Binary files /dev/null and b/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data differ diff --git a/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo b/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo new file mode 100644 index 0000000000..b331eabb57 Binary files /dev/null and b/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo differ