From 246ccb8e04515c6e85544b32035537f60d92b5b9 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Fri, 18 Dec 2015 13:51:37 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6097 Fixing KahaDB so that the correct marshaller is used for the message keys inside of the message order index. This will ensure that message size metrics are accurate. --- .../store/kahadb/MessageDatabase.java | 22 +++++++--- .../AbstractKahaDBMessageStoreSizeTest.java | 9 ++++ .../kahadb/KahaDBMessageStoreSizeTest.java | 44 +++++++++++++++++++ 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 6a98ccea2b..5c0801ba9a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1861,18 +1861,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - static protected class MessageKeysMarshaller extends VariableMarshaller { - static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); + protected class MessageKeysMarshaller extends VariableMarshaller { + final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); @Override public MessageKeys readPayload(DataInput dataIn) throws IOException { - return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); + return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); } @Override public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { dataOut.writeUTF(object.messageId); - LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); + locationSizeMarshaller.writePayload(object.location, dataOut); } } @@ -2178,6 +2178,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // modify so it is upgraded rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); } + //upgrade the order index + for (Iterator> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { + Entry entry = iterator.next(); + //call get so that the last priority is updated + rc.orderIndex.get(tx, entry.getKey()); + rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); + } } // If it was a topic... @@ -3062,6 +3069,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Long lastLowKey; byte lastGetPriority; final List pendingAdditions = new LinkedList(); + final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); MessageKeys remove(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.remove(tx, key); @@ -3076,13 +3084,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe void load(Transaction tx) throws IOException { defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); defaultPriorityIndex.load(tx); lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); lowPriorityIndex.load(tx); highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + highPriorityIndex.setValueMarshaller(messageKeysMarshaller); highPriorityIndex.load(tx); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java index 7d53cbd0f0..5ba75fd498 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java @@ -20,10 +20,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.store.AbstractMessageStoreSizeTest; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java index 43dc2f6f2f..4513856153 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java @@ -16,9 +16,18 @@ */ package org.apache.activemq.store.kahadb; +import static org.junit.Assert.assertEquals; + import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.junit.Test; /** * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly @@ -39,6 +48,41 @@ public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTe messageStore.start(); } + /** + * Make sure that the sizes stored in the KahaDB location index are the same as in + * the message order index. + * + * @throws Exception + */ + @Test + public void testLocationIndexMatchesOrderIndex() throws Exception { + final KahaDBStore kahaDbStore = (KahaDBStore) store; + writeMessages(); + + //Iterate over the order index and add up the size of the messages to compare + //to the location index + kahaDbStore.indexLock.readLock().lock(); + try { + long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure() { + @Override + public Long execute(Transaction tx) throws IOException { + long size = 0; + + // Iterate through all index entries to get the size of each message + StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(destination), tx); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + size += iterator.next().getValue().location.getSize(); + } + return size; + } + }); + assertEquals("Order index size values don't match message size", + size, messageStore.getMessageSize()); + } finally { + kahaDbStore.indexLock.readLock().unlock(); + } + } + @Override protected String getVersion5Dir() { return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";