https://issues.apache.org/jira/browse/AMQ-4212 - fix auto upgrade from ver 1 and 2 to 5 - regression in KahaDBVersionTest

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1511711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-08-08 11:51:29 +00:00
parent ae84399b97
commit 90d6c2042d
2 changed files with 23 additions and 20 deletions

View File

@ -1838,31 +1838,34 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
BTreeIndex<Long, HashSet<String>> oldAckPositions =
new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
oldAckPositions.load(tx);
LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
// Do the initial build of the data in memory before writing into the store
// based Ack Positions List to avoid a lot of disk thrashing.
Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
while (iterator.hasNext()) {
Entry<Long, HashSet<String>> entry = iterator.next();
if (metadata.version >= 3) {
// migrate
BTreeIndex<Long, HashSet<String>> oldAckPositions =
new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
oldAckPositions.load(tx);
for(String subKey : entry.getValue()) {
SequenceSet pendingAcks = temp.get(subKey);
if (pendingAcks == null) {
pendingAcks = new SequenceSet();
temp.put(subKey, pendingAcks);
// Do the initial build of the data in memory before writing into the store
// based Ack Positions List to avoid a lot of disk thrashing.
Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
while (iterator.hasNext()) {
Entry<Long, HashSet<String>> entry = iterator.next();
for(String subKey : entry.getValue()) {
SequenceSet pendingAcks = temp.get(subKey);
if (pendingAcks == null) {
pendingAcks = new SequenceSet();
temp.put(subKey, pendingAcks);
}
pendingAcks.add(entry.getKey());
}
pendingAcks.add(entry.getKey());
}
}
// Now move the pending messages to ack data into the store backed
// structure.
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());

View File

@ -168,7 +168,7 @@ public class KahaDBVersionTest extends TestCase {
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
count++;
// System.err.println(msg.getText());
assertNotNull(msg);
assertNotNull("" + count, msg);
}
LOG.info("Consumed " + count + " from topic");
connection.close();