Merge pull request #1142 from cshannon/AMQ-9420

AMQ-9420 - Don't decrement KahaDB durable sub metrics on duplicate ack
This commit is contained in:
Christopher L. Shannon 2024-01-23 10:34:28 -05:00 committed by GitHub
commit 38b6d86ff8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 184 additions and 42 deletions

View File

@ -3148,18 +3148,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (messageSequence != null) { if (messageSequence != null) {
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
if (range != null && !range.isEmpty()) { if (range != null && !range.isEmpty()) {
range.remove(messageSequence); boolean removed = range.remove(messageSequence);
if (!range.isEmpty()) { if (!range.isEmpty()) {
sd.ackPositions.put(tx, subscriptionKey, range); sd.ackPositions.put(tx, subscriptionKey, range);
} else { } else {
sd.ackPositions.remove(tx, subscriptionKey); sd.ackPositions.remove(tx, subscriptionKey);
} }
MessageKeys key = sd.orderIndex.get(tx, messageSequence); // Only decrement the statistics if the message was removed
decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, // from the ack set for the subscription
// Fix for AMQ-9420
if (removed) {
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey,
key.location.getSize()); key.location.getSize());
} else {
LOG.warn("Received unexpected duplicate ack: messageId: {}, Sub: {}, Dest: {}",
command.getMessageId(), subscriptionKey, command.getDestination());
}
// Check if the message is reference by any other subscription. // Check if the message is reference by any other subscription.
// If removed was previously false then we could return before
// this check as this should always return true (should still be
// a reference) but removed being false is unexpected in the first
// place so this is a good second check to verify.
if (isSequenceReferenced(tx, sd, messageSequence)) { if (isSequenceReferenced(tx, sd, messageSequence)) {
return; return;
} }

View File

@ -533,7 +533,7 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
return publishTestMessagesDurable(connection, subNames, defaultTopicName, return publishTestMessagesDurable(connection, subNames, defaultTopicName,
publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize,
publishedMessageSize, false, deliveryMode); publishedMessageSize, null, false, deliveryMode);
} }
protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize,

View File

@ -18,19 +18,22 @@ package org.apache.activemq.store;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.jms.Connection; import jakarta.jms.Connection;
import jakarta.jms.DeliveryMode; import jakarta.jms.DeliveryMode;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -47,11 +50,20 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
protected static final Logger LOG = LoggerFactory protected static final Logger LOG = LoggerFactory
.getLogger(AbstractMessageStoreSizeStatTest.class); .getLogger(AbstractMessageStoreSizeStatTest.class);
protected BrokerService broker; protected BrokerService broker;
protected URI brokerConnectURI; protected URI brokerConnectURI;
protected String defaultQueueName = "test.queue"; protected String defaultQueueName = "test.queue";
protected String defaultTopicName = "test.topic"; protected String defaultTopicName = "test.topic";
// Only applies to KahaDB
protected final boolean subStatsEnabled;
protected AbstractMessageStoreSizeStatTest(boolean subStatsEnabled) {
this.subStatsEnabled = subStatsEnabled;
}
protected AbstractMessageStoreSizeStatTest() {
this.subStatsEnabled = false;
}
@Before @Before
public void startBroker() throws Exception { public void startBroker() throws Exception {
@ -119,17 +131,40 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
connection.setClientID("clientId"); connection.setClientID("clientId");
connection.start(); connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize); ActiveMQTopic topic = new ActiveMQTopic(defaultTopicName);
Set<String> publishedMessages = new HashSet<>();
Topic dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200,
publishedMessageSize, publishedMessages);
PersistenceAdapter adapter = this.broker.getPersistenceAdapter();
TopicMessageStore store = adapter.createTopicMessageStore(topic);
//verify the count and size //verify the count and size
verifyStats(dest, 200, publishedMessageSize.get()); verifyStats(dest, 200, publishedMessageSize.get());
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 200, publishedMessageSize.get());
}
//consume all messages //consume all messages
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//All messages should now be gone //All messages should now be gone
verifyStats(dest, 0, 0); verifyStats(dest, 0, 0);
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
}
// Send 10 duplicates to verify our metrics are not broken
sendAcks(store, publishedMessages);
// Stats should still show 0 after duplicates
verifyStats(dest, 0, publishedMessageSize.get());
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
}
connection.close(); connection.close();
} }
@ -140,17 +175,46 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
connection.setClientID("clientId"); connection.setClientID("clientId");
connection.start(); connection.start();
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize); ActiveMQTopic topic = new ActiveMQTopic(defaultTopicName);
Set<String> publishedMessages = new HashSet<>();
Topic dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200,
publishedMessageSize, publishedMessages);
PersistenceAdapter adapter = this.broker.getPersistenceAdapter();
TopicMessageStore store = adapter.createTopicMessageStore(topic);
//verify the count and size //verify the count and size
verifyStats(dest, 200, publishedMessageSize.get()); verifyStats(dest, 200, publishedMessageSize.get());
// Verify each subscription counter is correct
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 200, publishedMessageSize.get());
verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get());
}
//consume messages just for sub1 //consume messages just for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
//There is still a durable that hasn't consumed so the messages should exist //There is still a durable that hasn't consumed so the messages should exist
verifyStats(dest, 200, publishedMessageSize.get()); verifyStats(dest, 200, publishedMessageSize.get());
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get());
}
// Send 10 duplicates to verify our metrics are not broken
// This used to break in memory subscription statistics before AMQ-9420
sendAcks(store, publishedMessages);
// Stats should still show 200 after duplicates
verifyStats(dest, 200, publishedMessageSize.get());
// Verify each subscription counter is correct
// Sub 2 should still have 200 messages
if (subStatsEnabled) {
verifyDurableStats(dest, "clientId:sub1", 0, publishedMessageSize.get());
verifyDurableStats(dest, "clientId:sub2", 200, publishedMessageSize.get());
}
connection.stop(); connection.stop();
} }
@ -179,30 +243,30 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
final MessageStore messageStore = dest.getMessageStore(); final MessageStore messageStore = dest.getMessageStore();
final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics(); final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
assertTrue(Wait.waitFor(new Condition() { assertTrue(Wait.waitFor(
@Override () -> (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() ==
public boolean isSatisified() throws Exception { storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() ==
return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() == messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize())));
storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() ==
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
}
}));
if (count > 0) { if (count > 0) {
assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize); assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
assertTrue(Wait.waitFor(new Condition() { assertTrue(Wait.waitFor(() -> storeStats.getMessageSize().getTotalSize() > minimumSize));
@Override
public boolean isSatisified() throws Exception {
return storeStats.getMessageSize().getTotalSize() > minimumSize;
}
}));
} else { } else {
assertTrue(Wait.waitFor(new Condition() { assertTrue(Wait.waitFor(() -> storeStats.getMessageSize().getTotalSize() == 0));
@Override }
public boolean isSatisified() throws Exception { }
return storeStats.getMessageSize().getTotalSize() == 0;
} protected void verifyDurableStats(Topic dest, String subKey, final int count, final long minimumSize) throws Exception {
})); final TopicMessageStore messageStore = (TopicMessageStore) dest.getMessageStore();
MessageStoreSubscriptionStatistics subStats = messageStore.getMessageStoreSubStatistics();
assertTrue(Wait.waitFor(
() -> count == subStats.getMessageCount(subKey).getCount()));
if (count > 0) {
assertTrue(subStats.getMessageSize(subKey).getTotalSize() > minimumSize);
} else {
assertTrue(Wait.waitFor(() -> subStats.getMessageSize(subKey).getTotalSize() == 0));
} }
} }
@ -226,11 +290,27 @@ public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStat
return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize); return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize);
} }
protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, protected Topic publishTestMessagesDurable(Connection connection, String[] subNames,
int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception { int publishSize, int expectedSize, AtomicLong publishedMessageSize, Set<String> publishedMessages) throws Exception {
return publishTestMessagesDurable(connection, subNames, defaultTopicName, return publishTestMessagesDurable(connection, subNames, defaultTopicName,
publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize, publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize,
publishedMessageSize, true); publishedMessageSize, publishedMessages, true);
}
protected void sendAcks(TopicMessageStore store, Set<String> publishedMessages) {
publishedMessages.stream().limit(10).forEach(id -> {
try {
MessageId messageId = new MessageId(id);
MessageAck ack = new MessageAck();
ack.setMessageID(messageId);
ack.setDestination(store.getDestination());
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageCount(1);
store.acknowledge(broker.getAdminConnectionContext(), "clientId", "sub1", messageId, ack);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} }
} }

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import java.net.URI; import java.net.URI;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import jakarta.jms.BytesMessage; import jakarta.jms.BytesMessage;
@ -189,14 +190,14 @@ public abstract class AbstractStoreStatTestSupport {
} }
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, Set<String> publishedMessages,
boolean verifyBrowsing) throws Exception { boolean verifyBrowsing) throws Exception {
return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize, return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize,
publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT); publishedMessageSize, publishedMessages, verifyBrowsing, DeliveryMode.PERSISTENT);
} }
protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName,
int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, Set<String> publishedMessages,
boolean verifyBrowsing, int deliveryMode) throws Exception { boolean verifyBrowsing, int deliveryMode) throws Exception {
// create a new queue // create a new queue
final ActiveMQDestination activeMqTopic = new ActiveMQTopic( final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
@ -228,7 +229,11 @@ public abstract class AbstractStoreStatTestSupport {
MessageProducer prod = session.createProducer(topic); MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode); prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) { for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(i, session, messageSize, publishedMessageSize)); Message message = createMessage(i, session, messageSize, publishedMessageSize);
prod.send(message);
if (publishedMessages != null) {
publishedMessages.add(message.getJMSMessageID());
}
} }
//verify the view has expected messages //verify the view has expected messages

View File

@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -27,6 +29,9 @@ import org.apache.commons.io.FileUtils;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,14 +42,29 @@ import org.slf4j.LoggerFactory;
* AMQ-5748 * AMQ-5748
* *
*/ */
@RunWith(Parameterized.class)
public class KahaDBMessageStoreSizeStatTest extends public class KahaDBMessageStoreSizeStatTest extends
AbstractMessageStoreSizeStatTest { AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory protected static final Logger LOG = LoggerFactory
.getLogger(KahaDBMessageStoreSizeStatTest.class); .getLogger(KahaDBMessageStoreSizeStatTest.class);
@Parameters(name = "subStatsEnabled={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Subscription stats on
{true},
// Subscription stats off
{false}
});
}
@Rule @Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
public KahaDBMessageStoreSizeStatTest(boolean subStatsEnabled) {
super(subStatsEnabled);
}
@Override @Override
protected void setUpBroker(boolean clearDataDir) throws Exception { protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.getRoot().exists()) if (clearDataDir && dataFileDir.getRoot().exists())
@ -57,6 +77,8 @@ public class KahaDBMessageStoreSizeStatTest extends
throws IOException { throws IOException {
broker.setPersistent(true); broker.setPersistent(true);
broker.setDataDirectoryFile(dataFileDir.getRoot()); broker.setDataDirectoryFile(dataFileDir.getRoot());
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
adapter.setEnableSubscriptionStatistics(subStatsEnabled);
} }
/** /**

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -31,6 +33,9 @@ import org.apache.commons.io.FileUtils;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -41,14 +46,29 @@ import org.slf4j.LoggerFactory;
* AMQ-5748 * AMQ-5748
* *
*/ */
@RunWith(Parameterized.class)
public class MultiKahaDBMessageStoreSizeStatTest extends public class MultiKahaDBMessageStoreSizeStatTest extends
AbstractMessageStoreSizeStatTest { AbstractMessageStoreSizeStatTest {
protected static final Logger LOG = LoggerFactory protected static final Logger LOG = LoggerFactory
.getLogger(MultiKahaDBMessageStoreSizeStatTest.class); .getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
@Parameters(name = "subStatsEnabled={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Subscription stats on
{true},
// Subscription stats off
{false}
});
}
@Rule @Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
public MultiKahaDBMessageStoreSizeStatTest(boolean subStatsEnabled) {
super(subStatsEnabled);
}
@Override @Override
protected void setUpBroker(boolean clearDataDir) throws Exception { protected void setUpBroker(boolean clearDataDir) throws Exception {
if (clearDataDir && dataFileDir.getRoot().exists()) if (clearDataDir && dataFileDir.getRoot().exists())
@ -67,6 +87,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setJournalMaxFileLength(1024 * 512); kahaStore.setJournalMaxFileLength(1024 * 512);
kahaStore.setEnableSubscriptionStatistics(subStatsEnabled);
//set up a store per destination //set up a store per destination
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();

View File

@ -55,7 +55,8 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
//The expected value is only 100 because for durables a LRUCache is being used //The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100 //with a max size of 100
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, publishedMessageSize); Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100,
publishedMessageSize, null);
//verify the count and size, should be 100 because of the LRUCache //verify the count and size, should be 100 because of the LRUCache
//verify size is at least the minimum of 100 messages times 100 bytes //verify size is at least the minimum of 100 messages times 100 bytes
@ -80,7 +81,8 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat
//The expected value is only 100 because for durables a LRUCache is being used //The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100, so only 100 messages are kept //with a max size of 100, so only 100 messages are kept
Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, publishedMessageSize); Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100,
publishedMessageSize, null);
//verify the count and size //verify the count and size
//verify size is at least the minimum of 100 messages times 100 bytes //verify size is at least the minimum of 100 messages times 100 bytes