AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack

This adds a check in case a duplicate ack is passed to the store to make
sure that the subscription statistics (if enabled) for a durable sub do
not have the metrics decremented a second time
This commit is contained in:
Christopher L. Shannon (cshannon) 2024-01-23 09:33:17 -05:00
parent 0b95f93449
commit f73cf2aaab
7 changed files with 186 additions and 43 deletions

View File

@ -1545,8 +1545,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if (previous == null) {
incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
// increment after putting into the order index first
incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
}
@ -3148,18 +3149,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (messageSequence != null) {
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
if (range != null && !range.isEmpty()) {
range.remove(messageSequence);
boolean removed = range.remove(messageSequence);
if (!range.isEmpty()) {
sd.ackPositions.put(tx, subscriptionKey, range);
} else {
sd.ackPositions.remove(tx, subscriptionKey);
}
// Only decrement the statistics if the message was removed
// from the ack set for the subscription
// Fix for AMQ-9420
if (removed) {
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
key.location.getSize());
} else {
LOG.warn("Received unexpected duplicate ack: messageId: {}, Sub: {}, Dest: {}",
command.getMessageId(), subscriptionKey, command.getDestination());
}
// Check if the message is reference by any other subscription.
// If removed was previously false then we could return before
// this check as this should always return true (should still be
// a reference) but removed being false is unexpected in the first
// place so this is a good second check to verify.
if (isSequenceReferenced(tx, sd, messageSequence)) {
return;
}

View File

@ -533,7 +533,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
return publishTestMessagesDurable(connection, subNames, defaultTopicName,
publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize,
publishedMessageSize, false, deliveryMode);
publishedMessageSize, null, false, deliveryMode);
}
protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,

View File

@ -18,19 +18,22 @@ package org.apache.activemq.store;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.jms.Connection;
import jakarta.jms.DeliveryMode;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -47,11 +50,20 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
protected static final Logger LOG = LoggerFactory
.getLogger(AbstractMessageStoreSizeStatTest.class);
protected BrokerService broker;
protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic";
// Only applies to KahaDB
protected final boolean subStatsEnabled;
protected AbstractMessageStoreSizeStatTest(boolean subStatsEnabled) {
this.subStatsEnabled = subStatsEnabled;
}
protected AbstractMessageStoreSizeStatTest() {
this.subStatsEnabled = false;
}
@Before
public void startBroker() throws Exception {
@ -119,17 +131,40 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
connection.setClientID("clientId");
connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize);
ActiveMQTopic topic = new ActiveMQTopic(defaultTopicName);
Set<String> publishedMessages = new HashSet<>();
Topic dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200,
publishedMessageSize, publishedMessages);
PersistenceAdapter adapter = this.broker.getPersistenceAdapter();
TopicMessageStore store = adapter.createTopicMessageStore(topic);
//verify the count and size
verifyStats(dest, 200, publishedMessageSize.get());
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 200, publishedMessageSize.get());
}
//consume all messages
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//All messages should now be gone
verifyStats(dest, 0, 0);
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
}
// Send 10 duplicates to verify our metrics are not broken
sendAcks(store, publishedMessages);
// Stats should still show 0 after duplicates
verifyStats(dest, 0, publishedMessageSize.get());
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
}
connection.close();
}
@ -140,17 +175,46 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
connection.setClientID("clientId");
connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize);
ActiveMQTopic topic = new ActiveMQTopic(defaultTopicName);
Set<String> publishedMessages = new HashSet<>();
Topic dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200,
publishedMessageSize, publishedMessages);
PersistenceAdapter adapter = this.broker.getPersistenceAdapter();
TopicMessageStore store = adapter.createTopicMessageStore(topic);
//verify the count and size
verifyStats(dest, 200, publishedMessageSize.get());
// Verify each subscription counter is correct
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 200, publishedMessageSize.get());
verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get());
}
//consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//There is still a durable that hasn't consumed so the messages should exist
verifyStats(dest, 200, publishedMessageSize.get());
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get());
}
// Send 10 duplicates to verify our metrics are not broken
// This used to break in memory subscription statistics before AMQ-9420
sendAcks(store, publishedMessages);
// Stats should still show 200 after duplicates
verifyStats(dest, 200, publishedMessageSize.get());
// Verify each subscription counter is correct
// Sub 2 should still have 200 messages
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get());
}
connection.stop();
}
@ -179,30 +243,30 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
final MessageStore messageStore = dest.getMessageStore();
final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() ==
assertTrue(Wait.waitFor(
() -> (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() ==
storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() ==
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
}
}));
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize())));
if (count > 0) {
assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return storeStats.getMessageSize().getTotalSize() > minimumSize;
}
}));
assertTrue(Wait.waitFor(() -> storeStats.getMessageSize().getTotalSize() > minimumSize));
} else {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return storeStats.getMessageSize().getTotalSize() == 0;
assertTrue(Wait.waitFor(() -> storeStats.getMessageSize().getTotalSize() == 0));
}
}));
}
protected void verifyDurableStats(Topic dest, String subKey, final int count, final long minimumSize) throws Exception {
final TopicMessageStore messageStore = (TopicMessageStore) dest.getMessageStore();
MessageStoreSubscriptionStatistics subStats = messageStore.getMessageStoreSubStatistics();
assertTrue(Wait.waitFor(
() -> count == subStats.getMessageCount(subKey).getCount()));
if (count > 0) {
assertTrue(subStats.getMessageSize(subKey).getTotalSize() > minimumSize);
} else {
assertTrue(Wait.waitFor(() -> subStats.getMessageSize(subKey).getTotalSize() == 0));
}
}
@ -226,11 +290,27 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
}
protected Destination publishTestMessagesDurable(Connection connection, String[] subNames,
int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception {
protected Topic publishTestMessagesDurable(Connection connection, String[] subNames,
int publishSize, int expectedSize, AtomicLong publishedMessageSize, Set<String> publishedMessages) throws Exception {
return publishTestMessagesDurable(connection, subNames, defaultTopicName,
publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize,
publishedMessageSize, true);
publishedMessageSize, publishedMessages, true);
}
protected void sendAcks(TopicMessageStore store, Set<String> publishedMessages) {
publishedMessages.stream().limit(10).forEach(id -> {
try {
MessageId messageId = new MessageId(id);
MessageAck ack = new MessageAck();
ack.setMessageID(messageId);
ack.setDestination(store.getDestination());
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageCount(1);
store.acknowledge(broker.getAdminConnectionContext(), "clientId", "sub1", messageId, ack);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import java.net.URI;
import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.jms.BytesMessage;
@ -189,14 +190,14 @@ public abstract class AbstractStoreStatTestSupport {
}
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, Set<String> publishedMessages,
boolean verifyBrowsing) throws Exception {
return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT);
publishedMessageSize, publishedMessages, verifyBrowsing, DeliveryMode.PERSISTENT);
}
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, Set<String> publishedMessages,
boolean verifyBrowsing, int deliveryMode) throws Exception {
// create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
@ -228,7 +229,11 @@ public abstract class AbstractStoreStatTestSupport {
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(i, session, messageSize, publishedMessageSize));
Message message = createMessage(i, session, messageSize, publishedMessageSize);
prod.send(message);
if (publishedMessages != null) {
publishedMessages.add(message.getJMSMessageID());
}
}
//verify the view has expected messages

View File

@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
@ -27,6 +29,9 @@ import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,14 +42,29 @@ import org.slf4j.LoggerFactory;
* AMQ-5748
*
*/
@RunWith(Parameterized.class)
public class KahaDBMessageStoreSizeStatTest extends
AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
.getLogger(KahaDBMessageStoreSizeStatTest.class);
@Parameters(name = "subStatsEnabled={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Subscription stats on
{true},
// Subscription stats off
{false}
});
}
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
public KahaDBMessageStoreSizeStatTest(boolean subStatsEnabled) {
super(subStatsEnabled);
}
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.getRoot().exists())
@ -57,6 +77,8 @@ public class KahaDBMessageStoreSizeStatTest extends
throws IOException {
broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir.getRoot());
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
adapter.setEnableSubscriptionStatistics(subStatsEnabled);
}
/**

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@ -31,6 +33,9 @@ import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,14 +46,29 @@ import org.slf4j.LoggerFactory;
* AMQ-5748
*
*/
@RunWith(Parameterized.class)
public class MultiKahaDBMessageStoreSizeStatTest extends
AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory
.getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
@Parameters(name = "subStatsEnabled={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Subscription stats on
{true},
// Subscription stats off
{false}
});
}
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
public MultiKahaDBMessageStoreSizeStatTest(boolean subStatsEnabled) {
super(subStatsEnabled);
}
@Override
protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.getRoot().exists())
@ -67,6 +87,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024 * 512);
kahaStore.setEnableSubscriptionStatistics(subStatsEnabled);
//set up a store per destination
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();

View File

@ -55,7 +55,8 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, publishedMessageSize);
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100,
publishedMessageSize, null);
//verify the count and size, should be 100 because of the LRUCache
//verify size is at least the minimum of 100 messages times 100 bytes
@ -80,7 +81,8 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100, so only 100 messages are kept
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, publishedMessageSize);
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100,
publishedMessageSize, null);
//verify the count and size
//verify size is at least the minimum of 100 messages times 100 bytes