mirror of https://github.com/apache/activemq.git
Fixing KahaDB so that the correct marshaller is used for the message keys inside of the message order index. This will ensure that message size metrics are accurate.
This commit is contained in:
parent
9ee92a1e16
commit
246ccb8e04
|
@ -1861,18 +1861,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
|
protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
|
||||||
static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
|
final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageKeys readPayload(DataInput dataIn) throws IOException {
|
public MessageKeys readPayload(DataInput dataIn) throws IOException {
|
||||||
return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
|
return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
|
public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
|
||||||
dataOut.writeUTF(object.messageId);
|
dataOut.writeUTF(object.messageId);
|
||||||
LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
|
locationSizeMarshaller.writePayload(object.location, dataOut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2178,6 +2178,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
// modify so it is upgraded
|
// modify so it is upgraded
|
||||||
rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
|
rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
|
//upgrade the order index
|
||||||
|
for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) {
|
||||||
|
Entry<Long, MessageKeys> entry = iterator.next();
|
||||||
|
//call get so that the last priority is updated
|
||||||
|
rc.orderIndex.get(tx, entry.getKey());
|
||||||
|
rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it was a topic...
|
// If it was a topic...
|
||||||
|
@ -3062,6 +3069,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
Long lastLowKey;
|
Long lastLowKey;
|
||||||
byte lastGetPriority;
|
byte lastGetPriority;
|
||||||
final List<Long> pendingAdditions = new LinkedList<Long>();
|
final List<Long> pendingAdditions = new LinkedList<Long>();
|
||||||
|
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
|
||||||
|
|
||||||
MessageKeys remove(Transaction tx, Long key) throws IOException {
|
MessageKeys remove(Transaction tx, Long key) throws IOException {
|
||||||
MessageKeys result = defaultPriorityIndex.remove(tx, key);
|
MessageKeys result = defaultPriorityIndex.remove(tx, key);
|
||||||
|
@ -3076,13 +3084,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
|
|
||||||
void load(Transaction tx) throws IOException {
|
void load(Transaction tx) throws IOException {
|
||||||
defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
|
||||||
defaultPriorityIndex.load(tx);
|
defaultPriorityIndex.load(tx);
|
||||||
lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
|
||||||
lowPriorityIndex.load(tx);
|
lowPriorityIndex.load(tx);
|
||||||
highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
|
||||||
highPriorityIndex.load(tx);
|
highPriorityIndex.load(tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,10 +20,19 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.store.AbstractMessageStoreSizeTest;
|
import org.apache.activemq.store.AbstractMessageStoreSizeTest;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.MessageStoreStatistics;
|
||||||
import org.apache.activemq.store.PersistenceAdapter;
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.journal.Location;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.page.Transaction;
|
||||||
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
|
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.filefilter.TrueFileFilter;
|
import org.apache.commons.io.filefilter.TrueFileFilter;
|
||||||
|
|
|
@ -16,9 +16,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.store.kahadb;
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys;
|
||||||
|
import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.page.Transaction;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
|
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
|
||||||
|
@ -39,6 +48,41 @@ public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTe
|
||||||
messageStore.start();
|
messageStore.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure that the sizes stored in the KahaDB location index are the same as in
|
||||||
|
* the message order index.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLocationIndexMatchesOrderIndex() throws Exception {
|
||||||
|
final KahaDBStore kahaDbStore = (KahaDBStore) store;
|
||||||
|
writeMessages();
|
||||||
|
|
||||||
|
//Iterate over the order index and add up the size of the messages to compare
|
||||||
|
//to the location index
|
||||||
|
kahaDbStore.indexLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
|
||||||
|
@Override
|
||||||
|
public Long execute(Transaction tx) throws IOException {
|
||||||
|
long size = 0;
|
||||||
|
|
||||||
|
// Iterate through all index entries to get the size of each message
|
||||||
|
StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(destination), tx);
|
||||||
|
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
|
||||||
|
size += iterator.next().getValue().location.getSize();
|
||||||
|
}
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertEquals("Order index size values don't match message size",
|
||||||
|
size, messageStore.getMessageSize());
|
||||||
|
} finally {
|
||||||
|
kahaDbStore.indexLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getVersion5Dir() {
|
protected String getVersion5Dir() {
|
||||||
return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";
|
return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";
|
||||||
|
|
Loading…
Reference in New Issue