diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index ea53ad0d10..0611be77c1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1524,7 +1524,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); //update all the subscription metrics - if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize()) { + if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) { Iterator> iter = sd.ackPositions.iterator(tx); while (iter.hasNext()) { Entry e = iter.next(); @@ -2970,33 +2970,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return sd.subscriptionAcks.get(tx, subscriptionKey); } - public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { - SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); - if (messageSequences != null) { - long result = messageSequences.rangeSize(); - // if there's anything in the range the last value is always the nextMessage marker, so remove 1. - return result > 0 ? result - 1 : 0; + protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + if (sd.ackPositions != null) { + SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); + if (messageSequences != null) { + long result = messageSequences.rangeSize(); + // if there's anything in the range the last value is always the nextMessage marker, so remove 1. + return result > 0 ? result - 1 : 0; + } } return 0; } - public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { - //grab the messages attached to this subscription - SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); - + protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { long locationSize = 0; - if (messageSequences != null) { - Sequence head = messageSequences.getHead(); - if (head != null) { - //get an iterator over the order index starting at the first unacked message - //and go over each message to add up the size - Iterator> iterator = sd.orderIndex.iterator(tx, - new MessageOrderCursor(head.getFirst())); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - locationSize += entry.getValue().location.getSize(); + if (sd.ackPositions != null) { + //grab the messages attached to this subscription + SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); + + if (messageSequences != null) { + Sequence head = messageSequences.getHead(); + if (head != null) { + //get an iterator over the order index starting at the first unacked message + //and go over each message to add up the size + Iterator> iterator = sd.orderIndex.iterator(tx, + new MessageOrderCursor(head.getFirst())); + + while (iterator.hasNext()) { + Entry entry = iterator.next(); + locationSize += entry.getValue().location.getSize(); + } } } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java index 357dc5fd54..4deb1e07b9 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java @@ -17,9 +17,12 @@ package org.apache.activemq.store.kahadb; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; @@ -36,13 +39,27 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class MessageDatabaseSizeTest { private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class); + @Parameters(name = "subStatsEnabled={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // Subscription stats on + {true}, + // Subscription stats off + {false} + }); + } + @Rule public TemporaryFolder dataDir = new TemporaryFolder(new File("target")); private final String payload = new String(new byte[1024]); @@ -50,6 +67,13 @@ public class MessageDatabaseSizeTest { private BrokerService broker = null; private final ActiveMQQueue destination = new ActiveMQQueue("Test"); private KahaDBPersistenceAdapter adapter; + private boolean subStatsEnabled; + + public MessageDatabaseSizeTest(boolean subStatsEnabled) { + super(); + this.subStatsEnabled = subStatsEnabled; + } + protected void startBroker() throws Exception { broker = new BrokerService(); @@ -58,6 +82,7 @@ public class MessageDatabaseSizeTest { broker.setUseJmx(true); broker.setDataDirectory(dataDir.getRoot().getAbsolutePath()); adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + adapter.setEnableSubscriptionStatistics(subStatsEnabled); broker.start(); LOG.info("Starting broker.."); } @@ -101,6 +126,22 @@ public class MessageDatabaseSizeTest { assertEquals(existingSize, messageStore.getMessageSize()); } + @Test + public void testUpdateMessageSameLocationDifferentSize() throws Exception { + final KahaDBStore store = adapter.getStore(); + MessageId messageId = new MessageId("111:222:333"); + ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333")); + + //Add a single message and update once so we can compare the size consistently + MessageStore messageStore = store.createQueueMessageStore(destination); + messageStore.addMessage(broker.getAdminConnectionContext(), textMessage); + textMessage.setText("new size of message"); + messageStore.updateMessage(textMessage); + + assertNotNull(findMessageLocation(messageId.toString(), store.convert(destination))); + + } + /** * Test that when updating an existing message to a different location in the * journal that the index size doesn't change