mirror of https://github.com/apache/activemq.git
Fixing an issue that prevented old versions of KahaDB from being upgraded to the newest version 6
This commit is contained in:
parent
e1c707e813
commit
8871b0e496
|
@ -1930,7 +1930,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
|
|
||||||
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
|
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
|
||||||
|
|
||||||
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
|
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
|
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
|
||||||
|
@ -2128,6 +2128,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
|
rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
|
||||||
rc.messageIdIndex.load(tx);
|
rc.messageIdIndex.load(tx);
|
||||||
|
|
||||||
|
//go through an upgrade old index if older than version 6
|
||||||
|
if (metadata.version < 6) {
|
||||||
|
for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
|
||||||
|
Entry<Location, Long> entry = iterator.next();
|
||||||
|
// modify so it is upgraded
|
||||||
|
rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If it was a topic...
|
// If it was a topic...
|
||||||
if (topic) {
|
if (topic) {
|
||||||
|
|
||||||
|
@ -2275,24 +2284,24 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
protected final Map<String, MessageStore> storeCache =
|
protected final Map<String, MessageStore> storeCache =
|
||||||
new ConcurrentHashMap<String, MessageStore>();
|
new ConcurrentHashMap<String, MessageStore>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Locate the storeMessageSize counter for this KahaDestination
|
* Locate the storeMessageSize counter for this KahaDestination
|
||||||
* @param kahaDestination
|
* @param kahaDestination
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
|
protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
|
||||||
MessageStoreStatistics storeStats = null;
|
MessageStoreStatistics storeStats = null;
|
||||||
try {
|
try {
|
||||||
MessageStore messageStore = storeCache.get(kahaDestKey);
|
MessageStore messageStore = storeCache.get(kahaDestKey);
|
||||||
if (messageStore != null) {
|
if (messageStore != null) {
|
||||||
storeStats = messageStore.getMessageStoreStatistics();
|
storeStats = messageStore.getMessageStoreStatistics();
|
||||||
}
|
}
|
||||||
} catch (Exception e1) {
|
} catch (Exception e1) {
|
||||||
LOG.error("Getting size counter of destination failed", e1);
|
LOG.error("Getting size counter of destination failed", e1);
|
||||||
}
|
}
|
||||||
|
|
||||||
return storeStats;
|
return storeStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine whether this Destination matches the DestinationType
|
* Determine whether this Destination matches the DestinationType
|
||||||
|
@ -2319,6 +2328,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Location readPayload(DataInput dataIn) throws IOException {
|
public Location readPayload(DataInput dataIn) throws IOException {
|
||||||
Location rc = new Location();
|
Location rc = new Location();
|
||||||
rc.setDataFileId(dataIn.readInt());
|
rc.setDataFileId(dataIn.readInt());
|
||||||
|
@ -2329,6 +2339,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void writePayload(Location object, DataOutput dataOut)
|
public void writePayload(Location object, DataOutput dataOut)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
dataOut.writeInt(object.getDataFileId());
|
dataOut.writeInt(object.getDataFileId());
|
||||||
|
@ -2336,14 +2347,17 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
dataOut.writeInt(object.getSize());
|
dataOut.writeInt(object.getSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getFixedSize() {
|
public int getFixedSize() {
|
||||||
return 12;
|
return 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Location deepCopy(Location source) {
|
public Location deepCopy(Location source) {
|
||||||
return new Location(source);
|
return new Location(source);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isDeepCopySupported() {
|
public boolean isDeepCopySupported() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,8 @@ public class KahaDBVersionTest extends TestCase {
|
||||||
final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
|
final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
|
||||||
final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
|
final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
|
||||||
final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
|
final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
|
||||||
|
final static File VERSION_5_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5");
|
||||||
|
|
||||||
|
|
||||||
BrokerService broker = null;
|
BrokerService broker = null;
|
||||||
|
|
||||||
|
@ -76,7 +78,7 @@ public class KahaDBVersionTest extends TestCase {
|
||||||
|
|
||||||
public void XtestCreateStore() throws Exception {
|
public void XtestCreateStore() throws Exception {
|
||||||
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
||||||
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
|
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5");
|
||||||
IOHelper.deleteFile(dir);
|
IOHelper.deleteFile(dir);
|
||||||
kaha.setDirectory(dir);
|
kaha.setDirectory(dir);
|
||||||
kaha.setJournalMaxFileLength(1024 * 1024);
|
kaha.setJournalMaxFileLength(1024 * 1024);
|
||||||
|
@ -127,6 +129,10 @@ public class KahaDBVersionTest extends TestCase {
|
||||||
doConvertRestartCycle(VERSION_4_DB);
|
doConvertRestartCycle(VERSION_4_DB);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testVersion5Conversion() throws Exception {
|
||||||
|
doConvertRestartCycle(VERSION_5_DB);
|
||||||
|
}
|
||||||
|
|
||||||
public void doConvertRestartCycle(File existingStore) throws Exception {
|
public void doConvertRestartCycle(File existingStore) throws Exception {
|
||||||
|
|
||||||
File testDir = new File("target/activemq-data/kahadb/versionDB");
|
File testDir = new File("target/activemq-data/kahadb/versionDB");
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue