AMQ-9436 - Ensure message audit in queue store cursor is shared

This commit fixes the initialization of the StoreQueueCursor message
audit object to make sure it's shared between the persistent and non
persistent cursors. It also adds a check to ensure that duplicate calls
to start will not try and init more than once.

(cherry picked from commit 75de932116)
This commit is contained in:
Christopher L. Shannon 2024-02-15 17:27:52 -05:00
parent 722a075e2c
commit 4bae9df5bd
2 changed files with 21 additions and 2 deletions

View File

@ -53,7 +53,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
@Override
public synchronized void start() throws Exception {
started = true;
if (isStarted()) {
return;
}
super.start();
if (nonPersistent == null) {
if (broker.getBrokerService().isPersistent()) {
@ -76,7 +78,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
@Override
public synchronized void stop() throws Exception {
started = false;
if (!isStarted()) {
return;
}
if (nonPersistent != null) {
nonPersistent.destroy();
}

View File

@ -29,10 +29,13 @@ import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.SubscriptionStatistics;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -117,6 +120,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
queue.initialize();
queue.start();
// verify that the cursor message audit is created and set with the
// correct audit depth and shared with the persistent and non peristent
// cursors
final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages();
ActiveMQMessageAudit messageAudit = messages.getMessageAudit();
assertNotNull(messageAudit);
assertEquals(auditDepth, messageAudit.getAuditDepth());
assertSame(messageAudit, messages.getPersistent().getMessageAudit());
assertSame(messageAudit, messages.getNonPersistent().getMessageAudit());
// Verify calling start again doesn't re-initial the audit
messages.start();
assertSame(messageAudit, messages.getMessageAudit());
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
ProducerInfo producerInfo = new ProducerInfo();